Flow
Flow
The Flow type represents a complete workflow definition, composed of triggers, steps, and state configuration.
Types
Flow
type Flow struct {
// unexported fields
}A completed workflow definition. Created via FlowBuilder.Build().
Methods
| Method | Signature | Description |
|---|---|---|
Name | () string | Returns the flow’s identifier |
Trigger | () Trigger | Returns the flow’s trigger configuration |
Steps | () []Step | Returns the flow’s execution steps |
StateConfig | () *StateConfig | Returns state configuration, or nil for default |
Execute | (ctx workflow.Context, input FlowInput) error | Runs the flow as a Temporal workflow |
FlowBuilder
type FlowBuilder struct {
// unexported fields
}Provides a fluent API for constructing flows.
Step
type Step struct {
// unexported fields
}Represents one execution unit within a flow. A step can contain:
- One node (sequential execution)
- Multiple nodes (parallel execution)
- A conditional branch
FlowInput
type FlowInput struct {
Data map[string][]byte
}Contains the initial input to a flow execution.
Constructor
NewFlow
func NewFlow(name string) *FlowBuilderCreates a new flow builder with the given name.
Parameters:
name- Unique identifier for the flow
Returns: *FlowBuilder for method chaining
Example:
builder := core.NewFlow("data-sync")FlowBuilder Methods
TriggeredBy
func (b *FlowBuilder) TriggeredBy(t Trigger) *FlowBuilderSets the trigger that initiates this flow.
Parameters:
t- Trigger configuration (Manual, Schedule, Signal, or Webhook)
Returns: *FlowBuilder for method chaining
Example:
flow := core.NewFlow("my-flow").
TriggeredBy(core.Manual("api")).
// ...Then
func (b *FlowBuilder) Then(node ExecutableNode) *FlowBuilderAdds a sequential step with a single node.
Parameters:
node- The node to execute (cannot be nil)
Returns: *FlowBuilder for method chaining
Example:
flow := core.NewFlow("pipeline").
TriggeredBy(core.Manual("api")).
Then(fetchNode).
Then(processNode).
Then(storeNode).
Build()ThenParallel
func (b *FlowBuilder) ThenParallel(name string, nodes ...ExecutableNode) *FlowBuilderAdds a parallel step with multiple nodes executed concurrently.
Parameters:
name- Identifier for the parallel stepnodes- One or more nodes to execute in parallel
Returns: *FlowBuilder for method chaining
Example:
flow := core.NewFlow("enrichment").
TriggeredBy(core.Manual("api")).
Then(fetchNode).
ThenParallel("enrich",
enrichANode,
enrichBNode,
enrichCNode,
).
Then(aggregateNode).
Build()When
func (b *FlowBuilder) When(pred Predicate) *ConditionalBuilderStarts a conditional branch based on a predicate.
Parameters:
pred- Function that evaluates FlowState and returns bool
Returns: *ConditionalBuilder for building conditional logic
Example:
flow := core.NewFlow("order-flow").
TriggeredBy(core.Manual("api")).
Then(fetchOrder).
When(func(s *core.FlowState) bool {
return core.Get[Order](s, "order").Total > 1000
}).
Then(requireApproval).
Otherwise(autoApprove).
Then(fulfillOrder).
Build()WithState
func (b *FlowBuilder) WithState(cfg StateConfig) *FlowBuilderOverrides the default state backend (.resolute/ directory).
Parameters:
cfg- State configuration with custom backend
Returns: *FlowBuilder for method chaining
Example:
flow := core.NewFlow("production-flow").
TriggeredBy(core.Schedule("0 * * * *")).
WithState(core.StateConfig{
Backend: s3Backend,
}).
Then(syncNode).
Build()Build
func (b *FlowBuilder) Build() *FlowValidates and returns the constructed flow.
Returns: *Flow - The completed flow definition
Panics if:
- Flow has no steps
- Flow has no trigger
- Any builder errors accumulated
Example:
flow := core.NewFlow("my-flow").
TriggeredBy(core.Manual("api")).
Then(myNode).
Build()ConditionalBuilder Methods
Then (ConditionalBuilder)
func (cb *ConditionalBuilder) Then(node ExecutableNode) *ConditionalBuilderAdds a sequential step to the current branch.
ThenParallel (ConditionalBuilder)
func (cb *ConditionalBuilder) ThenParallel(name string, nodes ...ExecutableNode) *ConditionalBuilderAdds a parallel step to the current branch.
Else
func (cb *ConditionalBuilder) Else() *ConditionalBuilderSwitches to building the “else” branch. Subsequent Then/ThenParallel calls add to the else branch.
Otherwise
func (cb *ConditionalBuilder) Otherwise(node ExecutableNode) *FlowBuilderAdds a single node to the “else” branch and returns to the main flow builder.
OtherwiseParallel
func (cb *ConditionalBuilder) OtherwiseParallel(name string, nodes ...ExecutableNode) *FlowBuilderAdds parallel nodes to the “else” branch and returns to the main flow builder.
EndWhen
func (cb *ConditionalBuilder) EndWhen() *FlowBuilderCompletes the conditional block without an else branch and returns to the main flow builder.
Flow Methods
Name
func (f *Flow) Name() stringReturns the flow’s identifier.
Trigger
func (f *Flow) Trigger() TriggerReturns the flow’s trigger configuration.
Steps
func (f *Flow) Steps() []StepReturns the flow’s execution steps.
StateConfig
func (f *Flow) StateConfig() *StateConfigReturns the flow’s state configuration, or nil for default.
Execute
func (f *Flow) Execute(ctx workflow.Context, input FlowInput) errorRuns the flow as a Temporal workflow. This method:
- Initializes flow state from input
- Loads persisted state (cursors) from backend
- Executes steps in order
- Runs compensations on failure (Saga pattern)
- Persists state on successful completion
Parameters:
ctx- Temporal workflow contextinput- Initial flow input
Returns: Error if execution fails
Complete Example
package main
import (
"github.com/resolute/resolute/core"
"myapp/providers/jira"
)
func main() {
// Define the flow
flow := core.NewFlow("issue-sync").
TriggeredBy(core.Schedule("0 */6 * * *")). // Every 6 hours
WithState(core.StateConfig{
Backend: s3Backend,
}).
Then(jira.FetchIssues(jira.FetchInput{
JQL: "project = PLATFORM AND updated > {{cursor:jira}}",
Cursor: core.CursorFor("jira"),
}).As("issues")).
When(func(s *core.FlowState) bool {
issues := core.Get[jira.FetchOutput](s, "issues")
return issues.Count > 0
}).
ThenParallel("process",
processNode,
enrichNode,
).
Then(storeNode).
EndWhen().
Build()
// Run with worker
err := core.NewWorker().
WithConfig(core.WorkerConfig{TaskQueue: "sync"}).
WithFlow(flow).
WithProviders(jira.Provider()).
Run()
}See Also
- Node - Activity wrapper
- Trigger - Trigger types
- State - FlowState management
- Building Flows - Flow construction guide