Testing
Testing
FlowTester provides a test harness for running flows without Temporal, enabling fast, deterministic unit tests with mocked activities.
Types
FlowTester
type FlowTester struct {
// unexported fields
}Test harness for running flows synchronously with mocked activities.
Constructor
NewFlowTester
func NewFlowTester() *FlowTesterCreates a new flow tester.
Returns: *FlowTester configured for testing
Example:
tester := core.NewFlowTester()Mock Methods
Mock
func (t *FlowTester) Mock(name string, fn interface{}) *FlowTesterRegisters a mock function for an activity. Use when the mock needs logic or input inspection.
Parameters:
name- Activity name to mockfn- Mock function with signature matching the activity
Returns: *FlowTester for method chaining
Example:
tester.Mock("jira.FetchIssues", func(input jira.FetchInput) (jira.FetchOutput, error) {
if input.JQL == "project = EMPTY" {
return jira.FetchOutput{Issues: []jira.Issue{}, Count: 0}, nil
}
return jira.FetchOutput{
Issues: []jira.Issue{{Key: "TEST-1"}},
Count: 1,
}, nil
})MockValue
func (t *FlowTester) MockValue(name string, value interface{}) *FlowTesterRegisters a fixed return value for an activity. Use for simple fixed returns.
Parameters:
name- Activity name to mockvalue- Value to return from the mock
Returns: *FlowTester for method chaining
Example:
tester.MockValue("fetch-issues", jira.FetchOutput{
Issues: []jira.Issue{{Key: "TEST-1"}, {Key: "TEST-2"}},
Count: 2,
})MockError
func (t *FlowTester) MockError(name string, err error) *FlowTesterRegisters an error return for an activity. Use to simulate failures.
Parameters:
name- Activity name to mockerr- Error to return
Returns: *FlowTester for method chaining
Example:
tester.MockError("fetch-issues", errors.New("connection timeout"))Execution Methods
Run
func (t *FlowTester) Run(flow *Flow, input FlowInput) (*FlowState, error)Executes the flow synchronously with mocked activities.
Parameters:
flow- Flow to executeinput- Initial flow input
Returns:
*FlowState- Final state after executionerror- Error if execution fails
Example:
state, err := tester.Run(myFlow, core.FlowInput{})
require.NoError(t, err)RunWithContext
func (t *FlowTester) RunWithContext(ctx context.Context, flow *Flow, input FlowInput) (*FlowState, error)Executes the flow with a context for cancellation or timeout.
Parameters:
ctx- Context for cancellation/timeoutflow- Flow to executeinput- Initial flow input
Returns:
*FlowState- Final state after executionerror- Error if execution fails or context cancelled
Example:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
state, err := tester.RunWithContext(ctx, myFlow, core.FlowInput{})Call Tracking Methods
WasCalled
func (t *FlowTester) WasCalled(name string) boolReturns true if the activity was called at least once.
Parameters:
name- Activity name
Returns: bool
Example:
assert.True(t, tester.WasCalled("fetch-issues"))CallCount
func (t *FlowTester) CallCount(name string) intReturns the number of times the activity was called.
Parameters:
name- Activity name
Returns: Call count
Example:
assert.Equal(t, 3, tester.CallCount("process-batch"))CallArgs
func (t *FlowTester) CallArgs(name string) []interface{}Returns all arguments from all calls to the activity.
Parameters:
name- Activity name
Returns: Slice of arguments from each call
Example:
args := tester.CallArgs("fetch-issues")
firstInput := args[0].(jira.FetchInput)
assert.Equal(t, "PLATFORM", firstInput.Project)LastCallArg
func (t *FlowTester) LastCallArg(name string) interface{}Returns the argument from the last call to the activity.
Parameters:
name- Activity name
Returns: Last call’s argument
Example:
lastArg := tester.LastCallArg("fetch-issues")
input := lastArg.(jira.FetchInput)
assert.Equal(t, "PLATFORM", input.Project)Assertion Methods
AssertCalled
func (t *FlowTester) AssertCalled(t *testing.T, name string)Asserts that the activity was called at least once.
Parameters:
t- Test instancename- Activity name
Example:
tester.AssertCalled(t, "fetch-issues")
tester.AssertCalled(t, "process-data")AssertNotCalled
func (t *FlowTester) AssertNotCalled(t *testing.T, name string)Asserts that the activity was not called.
Parameters:
t- Test instancename- Activity name
Example:
tester.AssertNotCalled(t, "skip-node")AssertCallCount
func (t *FlowTester) AssertCallCount(t *testing.T, name string, expected int)Asserts the activity was called exactly N times.
Parameters:
t- Test instancename- Activity nameexpected- Expected call count
Example:
tester.AssertCallCount(t, "fetch-page", 3)Reset Methods
Reset
func (t *FlowTester) Reset()Resets call tracking only (keeps mocks).
Example:
// Run first test
tester.Run(flow, input1)
assert.Equal(t, 1, tester.CallCount("fetch"))
// Reset and run second test
tester.Reset()
tester.Run(flow, input2)
assert.Equal(t, 1, tester.CallCount("fetch")) // Count resetResetAll
func (t *FlowTester) ResetAll()Resets everything (mocks and call tracking).
Example:
tester.ResetAll()
// Must re-register mocks
tester.MockValue("fetch", output)Configuration Methods
WithRateLimiting
func (t *FlowTester) WithRateLimiting() *FlowTesterEnables rate limiting (disabled by default for speed).
Returns: *FlowTester for method chaining
Example:
tester := core.NewFlowTester().WithRateLimiting()Usage Patterns
Basic Test
func TestDataSyncFlow(t *testing.T) {
tester := core.NewFlowTester()
// Register mocks
tester.MockValue("fetch-issues", jira.FetchOutput{
Issues: []jira.Issue{{Key: "TEST-1"}, {Key: "TEST-2"}},
Count: 2,
})
tester.MockValue("process-issues", ProcessOutput{Processed: 2})
tester.MockValue("store-results", StoreOutput{Stored: 2})
// Run flow
state, err := tester.Run(dataSyncFlow, core.FlowInput{})
// Assert success
require.NoError(t, err)
// Assert all nodes called
tester.AssertCalled(t, "fetch-issues")
tester.AssertCalled(t, "process-issues")
tester.AssertCalled(t, "store-results")
// Assert final state
result := core.Get[StoreOutput](state, "store-results")
assert.Equal(t, 2, result.Stored)
}Testing Conditionals
func TestConditionalFlow(t *testing.T) {
t.Run("takes then branch when condition true", func(t *testing.T) {
tester := core.NewFlowTester()
tester.MockValue("check", CheckOutput{NeedsProcessing: true})
tester.MockValue("process", ProcessOutput{})
tester.MockValue("skip", SkipOutput{})
_, err := tester.Run(flow, core.FlowInput{})
require.NoError(t, err)
tester.AssertCalled(t, "check")
tester.AssertCalled(t, "process")
tester.AssertNotCalled(t, "skip")
})
t.Run("takes else branch when condition false", func(t *testing.T) {
tester := core.NewFlowTester()
tester.MockValue("check", CheckOutput{NeedsProcessing: false})
tester.MockValue("process", ProcessOutput{})
tester.MockValue("skip", SkipOutput{})
_, err := tester.Run(flow, core.FlowInput{})
require.NoError(t, err)
tester.AssertCalled(t, "check")
tester.AssertNotCalled(t, "process")
tester.AssertCalled(t, "skip")
})
}Testing Error Handling
func TestFlowErrorHandling(t *testing.T) {
tester := core.NewFlowTester()
tester.MockValue("fetch", FetchOutput{Count: 5})
tester.MockError("process", errors.New("processing failed"))
tester.MockValue("store", StoreOutput{})
_, err := tester.Run(flow, core.FlowInput{})
require.Error(t, err)
assert.Contains(t, err.Error(), "processing failed")
// Verify execution stopped at error
tester.AssertCalled(t, "fetch")
tester.AssertCalled(t, "process")
tester.AssertNotCalled(t, "store")
}Testing Input Transformation
func TestInputTransformation(t *testing.T) {
tester := core.NewFlowTester()
var processInput ProcessInput
tester.MockValue("fetch", FetchOutput{RawData: "data", Count: 100})
tester.Mock("process", func(input ProcessInput) (ProcessOutput, error) {
processInput = input
return ProcessOutput{}, nil
})
tester.Run(flow, core.FlowInput{})
// Verify transformation happened
assert.Equal(t, "DATA", processInput.NormalizedData)
assert.Equal(t, 100, processInput.ItemCount)
}Table-Driven Tests
func TestConditionalRouting(t *testing.T) {
tests := []struct {
name string
condition bool
expectProcess bool
expectSkip bool
}{
{
name: "processes when condition true",
condition: true,
expectProcess: true,
expectSkip: false,
},
{
name: "skips when condition false",
condition: false,
expectProcess: false,
expectSkip: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tester := core.NewFlowTester()
tester.MockValue("check", CheckOutput{NeedsWork: tt.condition})
tester.MockValue("process", ProcessOutput{})
tester.MockValue("skip", SkipOutput{})
tester.Run(flow, core.FlowInput{})
if tt.expectProcess {
tester.AssertCalled(t, "process")
} else {
tester.AssertNotCalled(t, "process")
}
if tt.expectSkip {
tester.AssertCalled(t, "skip")
} else {
tester.AssertNotCalled(t, "skip")
}
})
}
}Complete Example
package flows_test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/resolute/resolute/core"
"myapp/flows"
"myapp/providers/jira"
)
func TestIssueEnrichmentFlow(t *testing.T) {
// Build the flow
flow := flows.BuildEnrichmentFlow()
// Create tester
tester := core.NewFlowTester()
// Mock all activities
tester.Mock("fetch-issues", func(input jira.FetchInput) (jira.FetchOutput, error) {
return jira.FetchOutput{
Issues: []jira.Issue{
{Key: "TEST-1", Summary: "First issue"},
{Key: "TEST-2", Summary: "Second issue"},
},
Count: 2,
}, nil
})
tester.Mock("generate-embeddings", func(input EmbedInput) (EmbedOutput, error) {
embeddings := make([][]float32, len(input.Texts))
for i := range embeddings {
embeddings[i] = make([]float32, 384)
}
return EmbedOutput{Embeddings: embeddings}, nil
})
tester.Mock("store-vectors", func(input StoreInput) (StoreOutput, error) {
return StoreOutput{Stored: len(input.Vectors)}, nil
})
// Run flow
state, err := tester.Run(flow, core.FlowInput{
Data: map[string][]byte{
"project": []byte("TEST"),
},
})
// Assertions
require.NoError(t, err)
// Verify execution order
tester.AssertCalled(t, "fetch-issues")
tester.AssertCalled(t, "generate-embeddings")
tester.AssertCalled(t, "store-vectors")
// Verify input passed correctly
fetchInput := tester.LastCallArg("fetch-issues").(jira.FetchInput)
assert.Equal(t, "TEST", fetchInput.Project)
// Verify output
storeResult := core.Get[StoreOutput](state, "store-vectors")
assert.Equal(t, 2, storeResult.Stored)
}See Also
- FlowTester Guide - Complete testing guide
- Mocking Activities - Mock patterns
- Integration Tests - Testing with Temporal