workflow API

workflow

package

API reference for the workflow package.

S
struct

RetryPolicy

pkg/workflow/retry.go:10-14
type RetryPolicy struct

Fields

Name Type Description
MaxAttempts int
Delay time.Duration
Multiplier float64
F
function

WithRetry

Parameters

policy
do
func(context.Context) error

Returns

func(context.Context)
error
pkg/workflow/retry.go:16-26
func WithRetry(policy RetryPolicy, do func(context.Context) error) func(context.Context) error

{
	return func(ctx context.Context) error {
		return resiliency.Retry(ctx, func() error {
			return do(ctx)
		},
			resiliency.WithAttempts(policy.MaxAttempts),
			resiliency.WithDelay(policy.Delay, 24*time.Hour),
			resiliency.WithFactor(policy.Multiplier),
		)
	}
}
S
struct

Step

pkg/workflow/workflow.go:10-14
type Step struct

Fields

Name Type Description
Name string
Do func(ctx context.Context) error
Compensate func(ctx context.Context) error
T
type

Group

pkg/workflow/workflow.go:16-16
type Group []Step
S
struct

Workflow

pkg/workflow/workflow.go:18-22
type Workflow struct

Methods

Add
Method

Parameters

name string
do func(ctx context.Context) error
compensate func(ctx context.Context) error
func (*Workflow) Add(name string, do, compensate func(ctx context.Context) error)
{
	w.steps = append(w.steps, Step{
		Name:       name,
		Do:         do,
		Compensate: compensate,
	})
}
AddGroup
Method

Parameters

g Group
func (*Workflow) AddGroup(g Group)
{
	w.steps = append(w.steps, g)
}
Run
Method

Parameters

Returns

error
func (*Workflow) Run(ctx context.Context) error
{
	for _, item := range w.steps {
		// Check Context before starting step
		if ctx.Err() != nil {
			return w.rollback(ctx, ctx.Err())
		}

		var err error
		switch v := item.(type) {
		case Step:
			err = w.runStep(ctx, v)
		case Group:
			err = w.runGroup(ctx, v)
		}

		if err != nil {
			return w.rollback(ctx, err)
		}
	}
	return nil
}
runStep
Method

Parameters

Returns

err error
func (*Workflow) runStep(ctx context.Context, step Step) (err error)
{
	defer func() {
		if r := recover(); r != nil {
			err = fmt.Errorf("panic in step '%s': %v", step.Name, r)
		}
	}()

	if err := step.Do(ctx); err != nil {
		return fmt.Errorf("step '%s' failed: %w", step.Name, err)
	}

	w.mu.Lock()
	if step.Compensate != nil {
		w.stack = append(w.stack, step)
	}
	w.mu.Unlock()

	return nil
}
runGroup
Method

Parameters

Returns

error
func (*Workflow) runGroup(ctx context.Context, group Group) error
{
	var wg sync.WaitGroup
	errChan := make(chan error, len(group))

	// Temporarily store successful steps in this group to add to stack later
	// If the group fails, we only compensate what succeeded inside the group?
	// Actually, if we use w.runStep() inside goroutine, it appends to w.stack safely.
	// But we must handle partial failure rollback within the group logic or rely on main rollback?
	// For simplicity, we let them append to stack. If one fails, Run() returns error and triggers rollback of everything in stack.

	for _, step := range group {
		wg.Add(1)
		go func(s Step) {
			defer wg.Done()
			if err := w.runStep(ctx, s); err != nil {
				errChan <- err
			}
		}(step)
	}

	wg.Wait()
	close(errChan)

	if len(errChan) > 0 {
		var errs []error
		for e := range errChan {
			errs = append(errs, e)
		}
		return errors.Join(errs...)
	}
	return nil
}
rollback
Method

Parameters

triggerErr error

Returns

error
func (*Workflow) rollback(ctx context.Context, triggerErr error) error
{
	rollbackCtx := context.WithoutCancel(ctx)
	var errs []error
	errs = append(errs, triggerErr)

	// LIFO
	w.mu.Lock()
	defer w.mu.Unlock()

	for i := len(w.stack) - 1; i >= 0; i-- {
		step := w.stack[i]
		if err := w.safeCompensate(rollbackCtx, step); err != nil {
			errs = append(errs, fmt.Errorf("rollback failed for '%s': %w", step.Name, err))
		}
	}

	return errors.Join(errs...)
}

Parameters

Returns

err error
func (*Workflow) safeCompensate(ctx context.Context, step Step) (err error)
{
	defer func() {
		if r := recover(); r != nil {
			err = fmt.Errorf("panic during compensation: %v", r)
		}
	}()
	return step.Compensate(ctx)
}

Fields

Name Type Description
steps []any
stack []Step
mu sync.Mutex
F
function

New

Returns

pkg/workflow/workflow.go:24-28
func New() *Workflow

{
	return &Workflow{
		stack: make([]Step, 0),
	}
}
F
function

TestWorkflow_Run_AllStepsSucceed

Parameters

pkg/workflow/workflow_test.go:11-33
func TestWorkflow_Run_AllStepsSucceed(t *testing.T)

{
	var executed []string
	wf := New()
	wf.Add("step1",
		func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
		func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
	)
	wf.Add("step2",
		func(ctx context.Context) error { executed = append(executed, "step2"); return nil },
		func(ctx context.Context) error { executed = append(executed, "undo2"); return nil },
	)

	err := wf.Run(context.Background())
	if err != nil {
		t.Fatalf("Run failed: %v", err)
	}
	if len(executed) != 2 {
		t.Fatalf("expected 2 steps executed, got %v", executed)
	}
	if executed[0] != "step1" || executed[1] != "step2" {
		t.Errorf("unexpected order: %v", executed)
	}
}
F
function

TestWorkflow_Run_StepFails_Compensates

Parameters

pkg/workflow/workflow_test.go:35-64
func TestWorkflow_Run_StepFails_Compensates(t *testing.T)

{
	var executed []string
	wf := New()
	wf.Add("step1",
		func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
		func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
	)
	wf.Add("step2",
		func(ctx context.Context) error { executed = append(executed, "step2"); return errors.New("fail") },
		func(ctx context.Context) error { executed = append(executed, "undo2"); return nil },
	)
	wf.Add("step3",
		func(ctx context.Context) error { executed = append(executed, "step3"); return nil },
		func(ctx context.Context) error { executed = append(executed, "undo3"); return nil },
	)

	err := wf.Run(context.Background())
	if err == nil {
		t.Fatal("expected error")
	}
	expect := []string{"step1", "step2", "undo1"}
	if len(executed) != len(expect) {
		t.Fatalf("expected %v, got %v", expect, executed)
	}
	for i, v := range expect {
		if executed[i] != v {
			t.Errorf("executed[%d] = %q, want %q", i, executed[i], v)
		}
	}
}
F
function

TestWorkflow_Run_ContextCancelled

Parameters

pkg/workflow/workflow_test.go:66-90
func TestWorkflow_Run_ContextCancelled(t *testing.T)

{
	var executed []string
	ctx, cancel := context.WithCancel(context.Background())

	wf := New()
	wf.Add("step1",
		func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
		func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
	)
	wf.Add("step2",
		func(ctx context.Context) error {
			cancel()
			return ctx.Err()
		},
		func(ctx context.Context) error { return nil },
	)

	err := wf.Run(ctx)
	if err == nil {
		t.Fatal("expected error from cancelled context")
	}
	if len(executed) != 2 {
		t.Fatalf("expected step1 and step2 executed, got %v", executed)
	}
}
F
function

TestWorkflow_Run_PanicInStep

Parameters

pkg/workflow/workflow_test.go:92-119
func TestWorkflow_Run_PanicInStep(t *testing.T)

{
	var executed []string
	wf := New()
	wf.Add("step1",
		func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
		func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
	)
	wf.Add("panic-step",
		func(ctx context.Context) error {
			panic("something went wrong")
		},
		func(ctx context.Context) error { return nil },
	)

	err := wf.Run(context.Background())
	if err == nil {
		t.Fatal("expected error from panic")
	}
	expect := []string{"step1", "undo1"}
	if len(executed) != len(expect) {
		t.Fatalf("expected %v, got %v", expect, executed)
	}
	for i, v := range expect {
		if executed[i] != v {
			t.Errorf("executed[%d] = %q, want %q", i, executed[i], v)
		}
	}
}
F
function

TestWorkflow_Run_GroupParallel

Parameters

pkg/workflow/workflow_test.go:121-141
func TestWorkflow_Run_GroupParallel(t *testing.T)

{
	var mu sync.Mutex
	var executed []string
	wf := New()

	group := Group{
		{Name: "g1", Do: func(ctx context.Context) error { mu.Lock(); executed = append(executed, "g1"); mu.Unlock(); return nil },
			Compensate: func(ctx context.Context) error { return nil }},
		{Name: "g2", Do: func(ctx context.Context) error { mu.Lock(); executed = append(executed, "g2"); mu.Unlock(); return nil },
			Compensate: func(ctx context.Context) error { return nil }},
	}
	wf.AddGroup(group)

	err := wf.Run(context.Background())
	if err != nil {
		t.Fatalf("Run failed: %v", err)
	}
	if len(executed) != 2 {
		t.Errorf("expected 2 group steps, got %v", executed)
	}
}
F
function

TestWorkflow_Run_CompensatePanicSafe

Parameters

pkg/workflow/workflow_test.go:143-158
func TestWorkflow_Run_CompensatePanicSafe(t *testing.T)

{
	wf := New()
	wf.Add("step1",
		func(ctx context.Context) error { return nil },
		func(ctx context.Context) error { panic("compensate panic") },
	)
	wf.Add("step2",
		func(ctx context.Context) error { return errors.New("fail") },
		nil,
	)

	err := wf.Run(context.Background())
	if err == nil {
		t.Fatal("expected error")
	}
}
F
function

TestWorkflow_Run_NoCompensateOnSuccess

Parameters

pkg/workflow/workflow_test.go:160-175
func TestWorkflow_Run_NoCompensateOnSuccess(t *testing.T)

{
	var compensated bool
	wf := New()
	wf.Add("step1",
		func(ctx context.Context) error { return nil },
		func(ctx context.Context) error { compensated = true; return nil },
	)

	err := wf.Run(context.Background())
	if err != nil {
		t.Fatalf("Run failed: %v", err)
	}
	if compensated {
		t.Error("expected no compensation on success")
	}
}
F
function

TestWorkflow_Compensate_WithRetry

Parameters

pkg/workflow/workflow_test.go:177-193
func TestWorkflow_Compensate_WithRetry(t *testing.T)

{
	attempts := 0
	do := WithRetry(RetryPolicy{MaxAttempts: 3, Delay: 10 * time.Millisecond, Multiplier: 1.0},
		func(ctx context.Context) error {
			attempts++
			return nil
		},
	)

	err := do(context.Background())
	if err != nil {
		t.Fatalf("WithRetry failed: %v", err)
	}
	if attempts != 1 {
		t.Errorf("expected 1 attempt on success, got %d", attempts)
	}
}
F
function

TestWorkflow_Compensate_WithRetryExhausted

Parameters

pkg/workflow/workflow_test.go:195-211
func TestWorkflow_Compensate_WithRetryExhausted(t *testing.T)

{
	attempts := 0
	do := WithRetry(RetryPolicy{MaxAttempts: 3, Delay: 10 * time.Millisecond, Multiplier: 1.0},
		func(ctx context.Context) error {
			attempts++
			return errors.New("always fail")
		},
	)

	err := do(context.Background())
	if err == nil {
		t.Fatal("expected error after retry exhausted")
	}
	if attempts != 3 {
		t.Errorf("expected 3 attempts, got %d", attempts)
	}
}