workflow
API
workflow
packageAPI reference for the workflow
package.
Imports
(7)
S
struct
RetryPolicy
pkg/workflow/retry.go:10-14
type RetryPolicy struct
Fields
| Name | Type | Description |
|---|---|---|
| MaxAttempts | int | |
| Delay | time.Duration | |
| Multiplier | float64 |
F
function
WithRetry
Parameters
policy
do
func(context.Context) error
Returns
func(context.Context)
error
pkg/workflow/retry.go:16-26
func WithRetry(policy RetryPolicy, do func(context.Context) error) func(context.Context) error
{
return func(ctx context.Context) error {
return resiliency.Retry(ctx, func() error {
return do(ctx)
},
resiliency.WithAttempts(policy.MaxAttempts),
resiliency.WithDelay(policy.Delay, 24*time.Hour),
resiliency.WithFactor(policy.Multiplier),
)
}
}
S
struct
Step
pkg/workflow/workflow.go:10-14
type Step struct
Fields
| Name | Type | Description |
|---|---|---|
| Name | string | |
| Do | func(ctx context.Context) error | |
| Compensate | func(ctx context.Context) error |
T
type
Group
pkg/workflow/workflow.go:16-16
type Group []Step
S
struct
Workflow
pkg/workflow/workflow.go:18-22
type Workflow struct
Methods
Add
Method
Parameters
name
string
do
func(ctx context.Context) error
compensate
func(ctx context.Context) error
func (*Workflow) Add(name string, do, compensate func(ctx context.Context) error)
{
w.steps = append(w.steps, Step{
Name: name,
Do: do,
Compensate: compensate,
})
}
AddGroup
Method
Parameters
g
Group
func (*Workflow) AddGroup(g Group)
{
w.steps = append(w.steps, g)
}
Run
Method
Parameters
ctx
context.Context
Returns
error
func (*Workflow) Run(ctx context.Context) error
{
for _, item := range w.steps {
// Check Context before starting step
if ctx.Err() != nil {
return w.rollback(ctx, ctx.Err())
}
var err error
switch v := item.(type) {
case Step:
err = w.runStep(ctx, v)
case Group:
err = w.runGroup(ctx, v)
}
if err != nil {
return w.rollback(ctx, err)
}
}
return nil
}
runStep
Method
Parameters
ctx
context.Context
step
Step
Returns
err
error
func (*Workflow) runStep(ctx context.Context, step Step) (err error)
{
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic in step '%s': %v", step.Name, r)
}
}()
if err := step.Do(ctx); err != nil {
return fmt.Errorf("step '%s' failed: %w", step.Name, err)
}
w.mu.Lock()
if step.Compensate != nil {
w.stack = append(w.stack, step)
}
w.mu.Unlock()
return nil
}
runGroup
Method
Parameters
ctx
context.Context
group
Group
Returns
error
func (*Workflow) runGroup(ctx context.Context, group Group) error
{
var wg sync.WaitGroup
errChan := make(chan error, len(group))
// Temporarily store successful steps in this group to add to stack later
// If the group fails, we only compensate what succeeded inside the group?
// Actually, if we use w.runStep() inside goroutine, it appends to w.stack safely.
// But we must handle partial failure rollback within the group logic or rely on main rollback?
// For simplicity, we let them append to stack. If one fails, Run() returns error and triggers rollback of everything in stack.
for _, step := range group {
wg.Add(1)
go func(s Step) {
defer wg.Done()
if err := w.runStep(ctx, s); err != nil {
errChan <- err
}
}(step)
}
wg.Wait()
close(errChan)
if len(errChan) > 0 {
var errs []error
for e := range errChan {
errs = append(errs, e)
}
return errors.Join(errs...)
}
return nil
}
rollback
Method
Parameters
ctx
context.Context
triggerErr
error
Returns
error
func (*Workflow) rollback(ctx context.Context, triggerErr error) error
{
rollbackCtx := context.WithoutCancel(ctx)
var errs []error
errs = append(errs, triggerErr)
// LIFO
w.mu.Lock()
defer w.mu.Unlock()
for i := len(w.stack) - 1; i >= 0; i-- {
step := w.stack[i]
if err := w.safeCompensate(rollbackCtx, step); err != nil {
errs = append(errs, fmt.Errorf("rollback failed for '%s': %w", step.Name, err))
}
}
return errors.Join(errs...)
}
safeCompensate
Method
Parameters
ctx
context.Context
step
Step
Returns
err
error
func (*Workflow) safeCompensate(ctx context.Context, step Step) (err error)
{
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic during compensation: %v", r)
}
}()
return step.Compensate(ctx)
}
Fields
| Name | Type | Description |
|---|---|---|
| steps | []any | |
| stack | []Step | |
| mu | sync.Mutex |
F
function
New
Returns
pkg/workflow/workflow.go:24-28
func New() *Workflow
{
return &Workflow{
stack: make([]Step, 0),
}
}
F
function
TestWorkflow_Run_AllStepsSucceed
Parameters
t
pkg/workflow/workflow_test.go:11-33
func TestWorkflow_Run_AllStepsSucceed(t *testing.T)
{
var executed []string
wf := New()
wf.Add("step1",
func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
)
wf.Add("step2",
func(ctx context.Context) error { executed = append(executed, "step2"); return nil },
func(ctx context.Context) error { executed = append(executed, "undo2"); return nil },
)
err := wf.Run(context.Background())
if err != nil {
t.Fatalf("Run failed: %v", err)
}
if len(executed) != 2 {
t.Fatalf("expected 2 steps executed, got %v", executed)
}
if executed[0] != "step1" || executed[1] != "step2" {
t.Errorf("unexpected order: %v", executed)
}
}
F
function
TestWorkflow_Run_StepFails_Compensates
Parameters
t
pkg/workflow/workflow_test.go:35-64
func TestWorkflow_Run_StepFails_Compensates(t *testing.T)
{
var executed []string
wf := New()
wf.Add("step1",
func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
)
wf.Add("step2",
func(ctx context.Context) error { executed = append(executed, "step2"); return errors.New("fail") },
func(ctx context.Context) error { executed = append(executed, "undo2"); return nil },
)
wf.Add("step3",
func(ctx context.Context) error { executed = append(executed, "step3"); return nil },
func(ctx context.Context) error { executed = append(executed, "undo3"); return nil },
)
err := wf.Run(context.Background())
if err == nil {
t.Fatal("expected error")
}
expect := []string{"step1", "step2", "undo1"}
if len(executed) != len(expect) {
t.Fatalf("expected %v, got %v", expect, executed)
}
for i, v := range expect {
if executed[i] != v {
t.Errorf("executed[%d] = %q, want %q", i, executed[i], v)
}
}
}
F
function
TestWorkflow_Run_ContextCancelled
Parameters
t
pkg/workflow/workflow_test.go:66-90
func TestWorkflow_Run_ContextCancelled(t *testing.T)
{
var executed []string
ctx, cancel := context.WithCancel(context.Background())
wf := New()
wf.Add("step1",
func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
)
wf.Add("step2",
func(ctx context.Context) error {
cancel()
return ctx.Err()
},
func(ctx context.Context) error { return nil },
)
err := wf.Run(ctx)
if err == nil {
t.Fatal("expected error from cancelled context")
}
if len(executed) != 2 {
t.Fatalf("expected step1 and step2 executed, got %v", executed)
}
}
F
function
TestWorkflow_Run_PanicInStep
Parameters
t
pkg/workflow/workflow_test.go:92-119
func TestWorkflow_Run_PanicInStep(t *testing.T)
{
var executed []string
wf := New()
wf.Add("step1",
func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
)
wf.Add("panic-step",
func(ctx context.Context) error {
panic("something went wrong")
},
func(ctx context.Context) error { return nil },
)
err := wf.Run(context.Background())
if err == nil {
t.Fatal("expected error from panic")
}
expect := []string{"step1", "undo1"}
if len(executed) != len(expect) {
t.Fatalf("expected %v, got %v", expect, executed)
}
for i, v := range expect {
if executed[i] != v {
t.Errorf("executed[%d] = %q, want %q", i, executed[i], v)
}
}
}
F
function
TestWorkflow_Run_GroupParallel
Parameters
t
pkg/workflow/workflow_test.go:121-141
func TestWorkflow_Run_GroupParallel(t *testing.T)
{
var mu sync.Mutex
var executed []string
wf := New()
group := Group{
{Name: "g1", Do: func(ctx context.Context) error { mu.Lock(); executed = append(executed, "g1"); mu.Unlock(); return nil },
Compensate: func(ctx context.Context) error { return nil }},
{Name: "g2", Do: func(ctx context.Context) error { mu.Lock(); executed = append(executed, "g2"); mu.Unlock(); return nil },
Compensate: func(ctx context.Context) error { return nil }},
}
wf.AddGroup(group)
err := wf.Run(context.Background())
if err != nil {
t.Fatalf("Run failed: %v", err)
}
if len(executed) != 2 {
t.Errorf("expected 2 group steps, got %v", executed)
}
}
F
function
TestWorkflow_Run_CompensatePanicSafe
Parameters
t
pkg/workflow/workflow_test.go:143-158
func TestWorkflow_Run_CompensatePanicSafe(t *testing.T)
{
wf := New()
wf.Add("step1",
func(ctx context.Context) error { return nil },
func(ctx context.Context) error { panic("compensate panic") },
)
wf.Add("step2",
func(ctx context.Context) error { return errors.New("fail") },
nil,
)
err := wf.Run(context.Background())
if err == nil {
t.Fatal("expected error")
}
}
F
function
TestWorkflow_Run_NoCompensateOnSuccess
Parameters
t
pkg/workflow/workflow_test.go:160-175
func TestWorkflow_Run_NoCompensateOnSuccess(t *testing.T)
{
var compensated bool
wf := New()
wf.Add("step1",
func(ctx context.Context) error { return nil },
func(ctx context.Context) error { compensated = true; return nil },
)
err := wf.Run(context.Background())
if err != nil {
t.Fatalf("Run failed: %v", err)
}
if compensated {
t.Error("expected no compensation on success")
}
}
F
function
TestWorkflow_Compensate_WithRetry
Parameters
t
pkg/workflow/workflow_test.go:177-193
func TestWorkflow_Compensate_WithRetry(t *testing.T)
{
attempts := 0
do := WithRetry(RetryPolicy{MaxAttempts: 3, Delay: 10 * time.Millisecond, Multiplier: 1.0},
func(ctx context.Context) error {
attempts++
return nil
},
)
err := do(context.Background())
if err != nil {
t.Fatalf("WithRetry failed: %v", err)
}
if attempts != 1 {
t.Errorf("expected 1 attempt on success, got %d", attempts)
}
}
F
function
TestWorkflow_Compensate_WithRetryExhausted
Parameters
t
pkg/workflow/workflow_test.go:195-211
func TestWorkflow_Compensate_WithRetryExhausted(t *testing.T)
{
attempts := 0
do := WithRetry(RetryPolicy{MaxAttempts: 3, Delay: 10 * time.Millisecond, Multiplier: 1.0},
func(ctx context.Context) error {
attempts++
return errors.New("always fail")
},
)
err := do(context.Background())
if err == nil {
t.Fatal("expected error after retry exhausted")
}
if attempts != 3 {
t.Errorf("expected 3 attempts, got %d", attempts)
}
}