Node
Node
The Node[I, O] type wraps a Temporal Activity with typed input and output. Nodes are the building blocks of flows.
Types
Node[I, O]
type Node[I, O any] struct {
// unexported fields
}A generic node wrapping an activity function with input type I and output type O.
ExecutableNode
type ExecutableNode interface {
Name() string
OutputKey() string
Execute(ctx workflow.Context, state *FlowState) error
Compensate(ctx workflow.Context, state *FlowState) error
HasCompensation() bool
Compensation() ExecutableNode
Input() any
RateLimiterID() string
}Interface implemented by all nodes. Allows type-erased storage in flow steps.
ActivityOptions
type ActivityOptions struct {
RetryPolicy *RetryPolicy
StartToCloseTimeout time.Duration
HeartbeatTimeout time.Duration
TaskQueue string
}Configures retry and timeout behavior for a node.
RetryPolicy
type RetryPolicy struct {
InitialInterval time.Duration
BackoffCoefficient float64
MaximumInterval time.Duration
MaximumAttempts int32
}Defines retry behavior for failed activities.
Constructor
NewNode
func NewNode[I, O any](name string, activity func(context.Context, I) (O, error), input I) *Node[I, O]Creates a node wrapping an activity function.
Type Parameters:
I- Input typeO- Output type
Parameters:
name- Activity name (used for registration and debugging)activity- The activity function to executeinput- Initial input value (may contain magic markers)
Returns: *Node[I, O] configured with default options
Example:
node := core.NewNode("fetch-issues", jira.FetchIssuesActivity, jira.FetchInput{
JQL: "project = PLATFORM",
})DefaultActivityOptions
func DefaultActivityOptions() ActivityOptionsReturns sensible defaults for activity execution:
StartToCloseTimeout: 5 minutesRetryPolicy.InitialInterval: 1 secondRetryPolicy.BackoffCoefficient: 2.0RetryPolicy.MaximumInterval: 1 minuteRetryPolicy.MaximumAttempts: 3
Node Methods
WithRetry
func (n *Node[I, O]) WithRetry(policy RetryPolicy) *Node[I, O]Configures the retry policy for this node.
Parameters:
policy- Custom retry configuration
Returns: *Node[I, O] for method chaining
Example:
node := jira.FetchIssues(input).WithRetry(core.RetryPolicy{
InitialInterval: 2 * time.Second,
BackoffCoefficient: 1.5,
MaximumInterval: 30 * time.Second,
MaximumAttempts: 5,
})WithTimeout
func (n *Node[I, O]) WithTimeout(d time.Duration) *Node[I, O]Sets the start-to-close timeout for this node.
Parameters:
d- Maximum duration for activity execution
Returns: *Node[I, O] for method chaining
Example:
node := longRunningActivity(input).WithTimeout(30 * time.Minute)OnError
func (n *Node[I, O]) OnError(compensation ExecutableNode) *Node[I, O]Attaches a compensation node to run if subsequent steps fail (Saga pattern).
Parameters:
compensation- Node to execute for rollback
Returns: *Node[I, O] for method chaining
Example:
createOrder := orders.Create(input).OnError(orders.Cancel(cancelInput))
chargePayment := payments.Charge(paymentInput).OnError(payments.Refund(refundInput))
flow := core.NewFlow("order").
TriggeredBy(core.Manual("api")).
Then(createOrder). // If charge fails, order is cancelled
Then(chargePayment). // If ship fails, payment is refunded
Then(shipOrder).
Build()WithRateLimit
func (n *Node[I, O]) WithRateLimit(requests int, per time.Duration) *Node[I, O]Configures rate limiting for this node. Creates a rate limiter unique to this node instance.
Parameters:
requests- Maximum number of requests allowedper- Time window for the rate limit
Returns: *Node[I, O] for method chaining
Example:
// Limit to 100 requests per minute
node := jira.FetchIssues(input).WithRateLimit(100, time.Minute)WithSharedRateLimit
func (n *Node[I, O]) WithSharedRateLimit(limiter *SharedRateLimiter) *Node[I, O]Configures this node to use a shared rate limiter. Multiple nodes can share the same rate limiter to coordinate request rates.
Parameters:
limiter- Pre-created shared rate limiter
Returns: *Node[I, O] for method chaining
Example:
// Multiple nodes share one rate limit
limiter := core.NewSharedRateLimiter("jira-api", 100, time.Minute)
fetchNode := jira.FetchIssues(fetchInput).WithSharedRateLimit(limiter)
searchNode := jira.SearchJQL(searchInput).WithSharedRateLimit(limiter)As
func (n *Node[I, O]) As(outputKey string) *Node[I, O]Names the output of this node for reference by downstream nodes.
Parameters:
outputKey- Key to store output in FlowState
Returns: *Node[I, O] for method chaining
Example:
flow := core.NewFlow("pipeline").
TriggeredBy(core.Manual("api")).
Then(jira.FetchIssues(input).As("issues")). // Output stored as "issues"
Then(processNode). // Can reference "issues" via magic markers
Build()Name
func (n *Node[I, O]) Name() stringReturns the node’s identifier.
OutputKey
func (n *Node[I, O]) OutputKey() stringReturns the key used to store this node’s output. Returns the custom key set via As(), or the node’s name if not set.
HasCompensation
func (n *Node[I, O]) HasCompensation() boolReturns true if this node has a compensation handler.
Compensation
func (n *Node[I, O]) Compensation() ExecutableNodeReturns the compensation node, if any.
Input
func (n *Node[I, O]) Input() anyReturns the node’s input value (used for testing).
RateLimiterID
func (n *Node[I, O]) RateLimiterID() stringReturns the rate limiter ID for this node, or empty string if not configured.
Execute
func (n *Node[I, O]) Execute(ctx workflow.Context, state *FlowState) errorRuns the activity within a Temporal workflow context.
Execution steps:
- Apply rate limiting if configured
- Resolve magic markers in input
- Configure activity options (timeout, retry)
- Execute activity via Temporal
- Store result in FlowState
Parameters:
ctx- Temporal workflow contextstate- Current flow state
Returns: Error if execution fails
Compensate
func (n *Node[I, O]) Compensate(ctx workflow.Context, state *FlowState) errorRuns the compensation activity if one is configured.
Parameters:
ctx- Temporal workflow contextstate- Flow state snapshot from when the node executed
Returns: Error if compensation fails
Provider Pattern
Providers typically expose factory functions that return configured nodes:
package jira
// Provider function returns a ready-to-use node
func FetchIssues(input FetchInput) *core.Node[FetchInput, FetchOutput] {
return core.NewNode("jira.FetchIssues", FetchIssuesActivity, input)
}
// Activity function (registered with worker)
func FetchIssuesActivity(ctx context.Context, input FetchInput) (FetchOutput, error) {
// Implementation
}Usage in flows:
flow := core.NewFlow("sync").
TriggeredBy(core.Manual("api")).
Then(jira.FetchIssues(jira.FetchInput{
JQL: "project = PLATFORM",
})).
Build()Complete Example
package main
import (
"time"
"github.com/resolute/resolute/core"
)
func main() {
// Shared rate limiter for Jira API
jiraLimiter := core.NewSharedRateLimiter("jira", 100, time.Minute)
// Configure nodes with various options
fetchNode := jira.FetchIssues(jira.FetchInput{
JQL: "project = PLATFORM",
Cursor: core.CursorFor("jira"),
}).
As("issues").
WithSharedRateLimit(jiraLimiter).
WithTimeout(10 * time.Minute).
WithRetry(core.RetryPolicy{
MaximumAttempts: 5,
})
// Node with compensation
createNode := orders.Create(orderInput).
OnError(orders.Cancel(cancelInput))
// Build flow with configured nodes
flow := core.NewFlow("order-pipeline").
TriggeredBy(core.Manual("api")).
Then(fetchNode).
Then(createNode).
Build()
}See Also
- Flow - Flow builder
- State - FlowState and result access
- Rate Limiting - Rate limit patterns
- Compensation - Saga pattern