Node
Node
The Node[I, O] type wraps a Temporal Activity with typed input and output. Nodes are the building blocks of flows.
Types
Node[I, O]
type Node[I, O any] struct {
// unexported fields
}A generic node wrapping an activity function with input type I and output type O.
ExecutableNode
type ExecutableNode interface {
Name() string
OutputKey() string
Execute(ctx workflow.Context, state *FlowState) error
Compensate(ctx workflow.Context, state *FlowState) error
HasCompensation() bool
Compensation() ExecutableNode
Input() any
RateLimiterID() string
}Interface implemented by all nodes. Allows type-erased storage in flow steps.
ActivityOptions
type ActivityOptions struct {
RetryPolicy *RetryPolicy
StartToCloseTimeout time.Duration
HeartbeatTimeout time.Duration
TaskQueue string
}Configures retry and timeout behavior for a node.
RetryPolicy
type RetryPolicy struct {
InitialInterval time.Duration
BackoffCoefficient float64
MaximumInterval time.Duration
MaximumAttempts int32
}Defines retry behavior for failed activities.
CursorUpdateConfig
type CursorUpdateConfig struct {
Source string
Field string
}Defines how a node updates a cursor after execution. Source identifies the data source and Field names the output struct field to extract as the cursor position.
Window
type Window struct {
Size int // max items per batch (0 = disabled, fetch all)
}Configures batched/windowed processing for a node. When attached to a node in a parallel step, the framework loops: fetch batch, run downstream pipeline, persist cursors, repeat.
Constructor
NewNode
func NewNode[I, O any](name string, activity func(context.Context, I) (O, error), input I) *Node[I, O]Creates a node wrapping an activity function.
Type Parameters:
I- Input typeO- Output type
Parameters:
name- Activity name (used for registration and debugging)activity- The activity function to executeinput- Initial input value (may contain magic markers)
Returns: *Node[I, O] configured with default options
Example:
node := core.NewNode("fetch-issues", jira.FetchIssuesActivity, jira.FetchInput{
JQL: "project = PLATFORM",
})DefaultActivityOptions
func DefaultActivityOptions() ActivityOptionsReturns sensible defaults for activity execution:
StartToCloseTimeout: 5 minutesRetryPolicy.InitialInterval: 1 secondRetryPolicy.BackoffCoefficient: 2.0RetryPolicy.MaximumInterval: 1 minuteRetryPolicy.MaximumAttempts: 3
Node Methods
WithRetry
func (n *Node[I, O]) WithRetry(policy RetryPolicy) *Node[I, O]Configures the retry policy for this node.
Parameters:
policy- Custom retry configuration
Returns: *Node[I, O] for method chaining
Example:
node := jira.FetchIssues(input).WithRetry(core.RetryPolicy{
InitialInterval: 2 * time.Second,
BackoffCoefficient: 1.5,
MaximumInterval: 30 * time.Second,
MaximumAttempts: 5,
})WithTimeout
func (n *Node[I, O]) WithTimeout(d time.Duration) *Node[I, O]Sets the start-to-close timeout for this node.
Parameters:
d- Maximum duration for activity execution
Returns: *Node[I, O] for method chaining
Example:
node := longRunningActivity(input).WithTimeout(30 * time.Minute)OnError
func (n *Node[I, O]) OnError(compensation ExecutableNode) *Node[I, O]Attaches a compensation node to run if subsequent steps fail (Saga pattern).
Parameters:
compensation- Node to execute for rollback
Returns: *Node[I, O] for method chaining
Example:
createOrder := orders.Create(input).OnError(orders.Cancel(cancelInput))
chargePayment := payments.Charge(paymentInput).OnError(payments.Refund(refundInput))
flow := core.NewFlow("order").
TriggeredBy(core.Manual("api")).
Then(createOrder). // If charge fails, order is cancelled
Then(chargePayment). // If ship fails, payment is refunded
Then(shipOrder).
Build()WithRateLimit
func (n *Node[I, O]) WithRateLimit(requests int, per time.Duration) *Node[I, O]Configures rate limiting for this node. Creates a rate limiter unique to this node instance.
Parameters:
requests- Maximum number of requests allowedper- Time window for the rate limit
Returns: *Node[I, O] for method chaining
Example:
// Limit to 100 requests per minute
node := jira.FetchIssues(input).WithRateLimit(100, time.Minute)WithSharedRateLimit
func (n *Node[I, O]) WithSharedRateLimit(limiter *SharedRateLimiter) *Node[I, O]Configures this node to use a shared rate limiter. Multiple nodes can share the same rate limiter to coordinate request rates.
Parameters:
limiter- Pre-created shared rate limiter
Returns: *Node[I, O] for method chaining
Example:
// Multiple nodes share one rate limit
limiter := core.NewSharedRateLimiter("jira-api", 100, time.Minute)
fetchNode := jira.FetchIssues(fetchInput).WithSharedRateLimit(limiter)
searchNode := jira.SearchJQL(searchInput).WithSharedRateLimit(limiter)WithValidation
func (n *Node[I, O]) WithValidation() *Node[I, O]Enables input validation using struct tags before execution.
Supported validation tags: required, min=N, max=N, minlen=N, maxlen=N, oneof=a|b|c.
Returns: *Node[I, O] for method chaining
Example:
type Input struct {
Name string `validate:"required"`
Age int `validate:"min=0,max=150"`
}
node := core.NewNode("validate-input", myActivity, input).WithValidation()WithErrorClassifier
func (n *Node[I, O]) WithErrorClassifier(fn ErrorClassifier) *Node[I, O]Sets a function to classify errors for retry decisions. Terminal errors are marked as non-retryable for Temporal.
Parameters:
fn- Error classification function
Returns: *Node[I, O] for method chaining
Example:
node := jira.FetchIssues(input).WithErrorClassifier(core.HTTPErrorClassifier)WithCursorUpdate
func (n *Node[I, O]) WithCursorUpdate(source, field string) *Node[I, O]Configures the node to update a cursor after successful execution. The named field is extracted from the activity output and persisted as the cursor position. Supports time.Time, *time.Time, and string field types.
Parameters:
source- Cursor source identifier (e.g., “jira”, “confluence”)field- Output struct field name to extract
Returns: *Node[I, O] for method chaining
Example:
node := jira.FetchIssues(input).
WithCursorUpdate("jira", "LastUpdated")WithWindow
func (n *Node[I, O]) WithWindow(w Window) *Node[I, O]Configures batched/windowed processing for this node. When used in a parallel step, the framework runs the downstream pipeline per batch instead of waiting for all data.
The framework injects WindowCursor and WindowSize fields into the node’s input struct via reflection.
Parameters:
w- Window configuration
Returns: *Node[I, O] for method chaining
Example:
node := jira.FetchIssues(input).
WithWindow(core.Window{Size: 100}).
WithCursorUpdate("jira", "LastUpdated")WindowConfig
func (n *Node[I, O]) WindowConfig() WindowReturns the window configuration. Returns zero Window if not set.
Returns: Window
As
func (n *Node[I, O]) As(outputKey string) *Node[I, O]Names the output of this node for reference by downstream nodes.
Parameters:
outputKey- Key to store output in FlowState
Returns: *Node[I, O] for method chaining
Example:
flow := core.NewFlow("pipeline").
TriggeredBy(core.Manual("api")).
Then(jira.FetchIssues(input).As("issues")). // Output stored as "issues"
Then(processNode). // Can reference "issues" via magic markers
Build()Name
func (n *Node[I, O]) Name() stringReturns the node’s identifier.
OutputKey
func (n *Node[I, O]) OutputKey() stringReturns the key used to store this node’s output. Returns the custom key set via As(), or the node’s name if not set.
HasCompensation
func (n *Node[I, O]) HasCompensation() boolReturns true if this node has a compensation handler.
Compensation
func (n *Node[I, O]) Compensation() ExecutableNodeReturns the compensation node, if any.
Input
func (n *Node[I, O]) Input() anyReturns the node’s input value (used for testing).
RateLimiterID
func (n *Node[I, O]) RateLimiterID() stringReturns the rate limiter ID for this node, or empty string if not configured.
Execute
func (n *Node[I, O]) Execute(ctx workflow.Context, state *FlowState) errorRuns the activity within a Temporal workflow context.
Execution steps:
- Apply rate limiting if configured
- Resolve magic markers in input
- Configure activity options (timeout, retry)
- Execute activity via Temporal
- Store result in FlowState
Parameters:
ctx- Temporal workflow contextstate- Current flow state
Returns: Error if execution fails
Compensate
func (n *Node[I, O]) Compensate(ctx workflow.Context, state *FlowState) errorRuns the compensation activity if one is configured.
Parameters:
ctx- Temporal workflow contextstate- Flow state snapshot from when the node executed
Returns: Error if compensation fails
Provider Pattern
Providers typically expose factory functions that return configured nodes:
package jira
// Provider function returns a ready-to-use node
func FetchIssues(input FetchInput) *core.Node[FetchInput, FetchOutput] {
return core.NewNode("jira.FetchIssues", FetchIssuesActivity, input)
}
// Activity function (registered with worker)
func FetchIssuesActivity(ctx context.Context, input FetchInput) (FetchOutput, error) {
// Implementation
}Usage in flows:
flow := core.NewFlow("sync").
TriggeredBy(core.Manual("api")).
Then(jira.FetchIssues(jira.FetchInput{
JQL: "project = PLATFORM",
})).
Build()Complete Example
package main
import (
"time"
"github.com/resolute/resolute/core"
)
func main() {
// Shared rate limiter for Jira API
jiraLimiter := core.NewSharedRateLimiter("jira", 100, time.Minute)
// Configure nodes with various options
fetchNode := jira.FetchIssues(jira.FetchInput{
JQL: "project = PLATFORM",
Cursor: core.CursorFor("jira"),
}).
As("issues").
WithSharedRateLimit(jiraLimiter).
WithTimeout(10 * time.Minute).
WithRetry(core.RetryPolicy{
MaximumAttempts: 5,
}).
WithWindow(core.Window{Size: 100}).
WithCursorUpdate("jira", "LastUpdated")
// Node with compensation
createNode := orders.Create(orderInput).
OnError(orders.Cancel(cancelInput))
// Build flow with configured nodes
flow := core.NewFlow("order-pipeline").
TriggeredBy(core.Manual("api")).
Then(fetchNode).
Then(createNode).
Build()
}See Also
- Flow - Flow builder
- State - FlowState and result access
- Rate Limiting - Rate limit patterns
- Compensation - Saga pattern