Sequential Steps
Sequential Steps
Sequential steps are the foundation of Resolute flows. Each step executes after the previous one completes, with outputs automatically available to subsequent steps.
Basic Pattern
Use Then() to add sequential steps:
flow := core.NewFlow("data-pipeline").
TriggeredBy(core.Manual("api")).
Then(fetchNode).
Then(processNode).
Then(storeNode).
Build()Execution order:
fetchNoderuns- When complete,
processNoderuns - When complete,
storeNoderuns
Passing Data Between Steps
Each node’s output is automatically stored in FlowState under its name (or a custom key via .As()).
Using WithInputFunc
The most common pattern uses WithInputFunc to read previous outputs:
// First node: fetch data
type FetchOutput struct {
Items []Item
Total int
}
fetchNode := core.NewNode("fetch", fetchData, FetchInput{
Source: "api",
})
// Second node: process data from first node
type ProcessInput struct {
Items []Item
Count int
}
processNode := core.NewNode("process", processData, ProcessInput{}).
WithInputFunc(func(state *core.FlowState) ProcessInput {
// Type-safe retrieval of previous output
result := core.Get[FetchOutput](state, "fetch")
return ProcessInput{
Items: result.Items,
Count: result.Total,
}
})
// Third node: store results
storeNode := core.NewNode("store", storeData, StoreInput{}).
WithInputFunc(func(state *core.FlowState) StoreInput {
processed := core.Get[ProcessOutput](state, "process")
return StoreInput{
Records: processed.Records,
}
})Using Output Markers
For simpler cases, use Output() markers to reference previous outputs directly:
// Reference entire output from a previous node
processNode := core.NewNode("process", processData, ProcessInput{
Data: core.Output[FetchOutput]("fetch"),
})
// Reference specific field
storeNode := core.NewNode("store", storeData, StoreInput{
Records: core.OutputRef[[]Record]("process", "Records"),
})Custom Output Keys
Use .As() to name outputs for clarity:
fetchNode := core.NewNode("fetch-jira-issues", fetchIssues, input).
As("issues") // Store output as "issues" instead of "fetch-jira-issues"
// Reference by the custom key
processNode := core.NewNode("process", processData, ProcessInput{}).
WithInputFunc(func(state *core.FlowState) ProcessInput {
issues := core.Get[FetchOutput](state, "issues")
return ProcessInput{Items: issues.Items}
})Complete Example
A data synchronization flow that fetches, transforms, and stores data:
package main
import (
"context"
"time"
"github.com/resolute/resolute/core"
)
// Input/Output types
type FetchInput struct {
Since time.Time
}
type FetchOutput struct {
Records []Record
Latest time.Time
}
type TransformInput struct {
Records []Record
}
type TransformOutput struct {
Transformed []TransformedRecord
Skipped int
}
type StoreInput struct {
Records []TransformedRecord
}
type StoreOutput struct {
Stored int
}
// Activity implementations
func fetchRecords(ctx context.Context, input FetchInput) (FetchOutput, error) {
// Fetch from external API
records, err := api.FetchSince(ctx, input.Since)
if err != nil {
return FetchOutput{}, err
}
var latest time.Time
for _, r := range records {
if r.UpdatedAt.After(latest) {
latest = r.UpdatedAt
}
}
return FetchOutput{
Records: records,
Latest: latest,
}, nil
}
func transformRecords(ctx context.Context, input TransformInput) (TransformOutput, error) {
var transformed []TransformedRecord
var skipped int
for _, r := range input.Records {
if t, ok := transform(r); ok {
transformed = append(transformed, t)
} else {
skipped++
}
}
return TransformOutput{
Transformed: transformed,
Skipped: skipped,
}, nil
}
func storeRecords(ctx context.Context, input StoreInput) (StoreOutput, error) {
count, err := db.BulkInsert(ctx, input.Records)
if err != nil {
return StoreOutput{}, err
}
return StoreOutput{Stored: count}, nil
}
func main() {
// Build nodes
fetchNode := core.NewNode("fetch", fetchRecords, FetchInput{}).
WithInputFunc(func(state *core.FlowState) FetchInput {
// Use cursor for incremental sync
cursor := state.GetCursor("records")
return FetchInput{
Since: cursor.TimeOr(time.Now().AddDate(0, -1, 0)),
}
}).
WithTimeout(5 * time.Minute).
As("fetched")
transformNode := core.NewNode("transform", transformRecords, TransformInput{}).
WithInputFunc(func(state *core.FlowState) TransformInput {
fetched := core.Get[FetchOutput](state, "fetched")
return TransformInput{Records: fetched.Records}
}).
WithTimeout(2 * time.Minute).
As("transformed")
storeNode := core.NewNode("store", storeRecords, StoreInput{}).
WithInputFunc(func(state *core.FlowState) StoreInput {
transformed := core.Get[TransformOutput](state, "transformed")
return StoreInput{Records: transformed.Transformed}
}).
WithTimeout(3 * time.Minute)
// Update cursor after successful store
updateCursorNode := core.NewNode("update-cursor", updateCursor, UpdateCursorInput{}).
WithInputFunc(func(state *core.FlowState) UpdateCursorInput {
fetched := core.Get[FetchOutput](state, "fetched")
return UpdateCursorInput{
Source: "records",
Position: fetched.Latest.Format(time.RFC3339),
}
})
// Build flow
flow := core.NewFlow("sync-records").
TriggeredBy(core.Schedule("*/15 * * * *")). // Every 15 minutes
Then(fetchNode).
Then(transformNode).
Then(storeNode).
Then(updateCursorNode).
Build()
// Run worker
core.NewWorker().
WithConfig(core.WorkerConfig{
TaskQueue: "sync-queue",
}).
WithFlow(flow).
Run()
}Step Configuration
Each node can be configured independently:
fetchNode := core.NewNode("fetch", fetchData, input).
WithTimeout(5 * time.Minute). // Activity timeout
WithRetry(core.RetryPolicy{ // Retry configuration
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 5,
}).
WithRateLimit(100, time.Minute). // Rate limiting
As("fetched") // Output keyError Behavior
When a sequential step fails:
- The activity retries according to its
RetryPolicy - If all retries exhaust, the error propagates
- Compensation runs for any completed steps with
.OnError()handlers - The workflow fails with the original error
// Add compensation for rollback on failure
createNode := core.NewNode("create", createRecord, input).
OnError(deleteNode) // Runs if subsequent steps fail
flow := core.NewFlow("transactional").
TriggeredBy(core.Manual("api")).
Then(createNode). // If this succeeds...
Then(notifyNode). // ...and this fails...
Build() // ...deleteNode runs to compensateBest Practices
1. Keep Steps Focused
Each node should do one thing:
// Good: Separate concerns
flow := core.NewFlow("pipeline").
Then(fetchNode). // Fetch data
Then(validateNode). // Validate data
Then(transformNode). // Transform data
Then(storeNode). // Store data
Build()
// Avoid: Monolithic steps
flow := core.NewFlow("pipeline").
Then(doEverythingNode). // Too much in one step
Build()2. Use Meaningful Names
Node names appear in Temporal UI and logs:
// Good: Descriptive names
fetchNode := core.NewNode("fetch-jira-issues", fetch, input)
processNode := core.NewNode("generate-embeddings", process, input)
// Avoid: Generic names
fetchNode := core.NewNode("step1", fetch, input)
processNode := core.NewNode("step2", process, input)3. Handle Empty Data
Check for empty results before processing:
processNode := core.NewNode("process", processData, ProcessInput{}).
WithInputFunc(func(state *core.FlowState) ProcessInput {
fetched := core.Get[FetchOutput](state, "fetch")
if len(fetched.Records) == 0 {
return ProcessInput{Skip: true} // Signal to skip processing
}
return ProcessInput{Records: fetched.Records}
})4. Configure Appropriate Timeouts
Match timeouts to expected operation duration:
// External API: may be slow
fetchNode := core.NewNode("fetch", fetch, input).
WithTimeout(5 * time.Minute)
// Local processing: should be fast
processNode := core.NewNode("process", process, input).
WithTimeout(30 * time.Second)
// Database write: moderate
storeNode := core.NewNode("store", store, input).
WithTimeout(2 * time.Minute)See Also
- Parallel Execution - Execute nodes concurrently
- Conditional Logic - Branch based on conditions
- Error Handling - Handle failures gracefully
- FlowState - State management details