Node

Node

The Node[I, O] type wraps a Temporal Activity with typed input and output. Nodes are the building blocks of flows.

Types

Node[I, O]

type Node[I, O any] struct {
    // unexported fields
}

A generic node wrapping an activity function with input type I and output type O.

ExecutableNode

type ExecutableNode interface {
    Name() string
    OutputKey() string
    Execute(ctx workflow.Context, state *FlowState) error
    Compensate(ctx workflow.Context, state *FlowState) error
    HasCompensation() bool
    Compensation() ExecutableNode
    Input() any
    RateLimiterID() string
}

Interface implemented by all nodes. Allows type-erased storage in flow steps.

ActivityOptions

type ActivityOptions struct {
    RetryPolicy         *RetryPolicy
    StartToCloseTimeout time.Duration
    HeartbeatTimeout    time.Duration
    TaskQueue           string
}

Configures retry and timeout behavior for a node.

RetryPolicy

type RetryPolicy struct {
    InitialInterval    time.Duration
    BackoffCoefficient float64
    MaximumInterval    time.Duration
    MaximumAttempts    int32
}

Defines retry behavior for failed activities.

CursorUpdateConfig

type CursorUpdateConfig struct {
    Source string
    Field  string
}

Defines how a node updates a cursor after execution. Source identifies the data source and Field names the output struct field to extract as the cursor position.

Window

type Window struct {
    Size int // max items per batch (0 = disabled, fetch all)
}

Configures batched/windowed processing for a node. When attached to a node in a parallel step, the framework loops: fetch batch, run downstream pipeline, persist cursors, repeat.

Constructor

NewNode

func NewNode[I, O any](name string, activity func(context.Context, I) (O, error), input I) *Node[I, O]

Creates a node wrapping an activity function.

Type Parameters:

  • I - Input type
  • O - Output type

Parameters:

  • name - Activity name (used for registration and debugging)
  • activity - The activity function to execute
  • input - Initial input value (may contain magic markers)

Returns: *Node[I, O] configured with default options

Example:

node := core.NewNode("fetch-issues", jira.FetchIssuesActivity, jira.FetchInput{
    JQL: "project = PLATFORM",
})

DefaultActivityOptions

func DefaultActivityOptions() ActivityOptions

Returns sensible defaults for activity execution:

  • StartToCloseTimeout: 5 minutes
  • RetryPolicy.InitialInterval: 1 second
  • RetryPolicy.BackoffCoefficient: 2.0
  • RetryPolicy.MaximumInterval: 1 minute
  • RetryPolicy.MaximumAttempts: 3

Node Methods

WithRetry

func (n *Node[I, O]) WithRetry(policy RetryPolicy) *Node[I, O]

Configures the retry policy for this node.

Parameters:

  • policy - Custom retry configuration

Returns: *Node[I, O] for method chaining

Example:

node := jira.FetchIssues(input).WithRetry(core.RetryPolicy{
    InitialInterval:    2 * time.Second,
    BackoffCoefficient: 1.5,
    MaximumInterval:    30 * time.Second,
    MaximumAttempts:    5,
})

WithTimeout

func (n *Node[I, O]) WithTimeout(d time.Duration) *Node[I, O]

Sets the start-to-close timeout for this node.

Parameters:

  • d - Maximum duration for activity execution

Returns: *Node[I, O] for method chaining

Example:

node := longRunningActivity(input).WithTimeout(30 * time.Minute)

OnError

func (n *Node[I, O]) OnError(compensation ExecutableNode) *Node[I, O]

Attaches a compensation node to run if subsequent steps fail (Saga pattern).

Parameters:

  • compensation - Node to execute for rollback

Returns: *Node[I, O] for method chaining

Example:

createOrder := orders.Create(input).OnError(orders.Cancel(cancelInput))
chargePayment := payments.Charge(paymentInput).OnError(payments.Refund(refundInput))

flow := core.NewFlow("order").
    TriggeredBy(core.Manual("api")).
    Then(createOrder).    // If charge fails, order is cancelled
    Then(chargePayment).  // If ship fails, payment is refunded
    Then(shipOrder).
    Build()

WithRateLimit

func (n *Node[I, O]) WithRateLimit(requests int, per time.Duration) *Node[I, O]

Configures rate limiting for this node. Creates a rate limiter unique to this node instance.

Parameters:

  • requests - Maximum number of requests allowed
  • per - Time window for the rate limit

Returns: *Node[I, O] for method chaining

Example:

// Limit to 100 requests per minute
node := jira.FetchIssues(input).WithRateLimit(100, time.Minute)

WithSharedRateLimit

func (n *Node[I, O]) WithSharedRateLimit(limiter *SharedRateLimiter) *Node[I, O]

Configures this node to use a shared rate limiter. Multiple nodes can share the same rate limiter to coordinate request rates.

Parameters:

  • limiter - Pre-created shared rate limiter

Returns: *Node[I, O] for method chaining

Example:

// Multiple nodes share one rate limit
limiter := core.NewSharedRateLimiter("jira-api", 100, time.Minute)

fetchNode := jira.FetchIssues(fetchInput).WithSharedRateLimit(limiter)
searchNode := jira.SearchJQL(searchInput).WithSharedRateLimit(limiter)

WithValidation

func (n *Node[I, O]) WithValidation() *Node[I, O]

Enables input validation using struct tags before execution.

Supported validation tags: required, min=N, max=N, minlen=N, maxlen=N, oneof=a|b|c.

Returns: *Node[I, O] for method chaining

Example:

type Input struct {
    Name string `validate:"required"`
    Age  int    `validate:"min=0,max=150"`
}

node := core.NewNode("validate-input", myActivity, input).WithValidation()

WithErrorClassifier

func (n *Node[I, O]) WithErrorClassifier(fn ErrorClassifier) *Node[I, O]

Sets a function to classify errors for retry decisions. Terminal errors are marked as non-retryable for Temporal.

Parameters:

  • fn - Error classification function

Returns: *Node[I, O] for method chaining

Example:

node := jira.FetchIssues(input).WithErrorClassifier(core.HTTPErrorClassifier)

WithCursorUpdate

func (n *Node[I, O]) WithCursorUpdate(source, field string) *Node[I, O]

Configures the node to update a cursor after successful execution. The named field is extracted from the activity output and persisted as the cursor position. Supports time.Time, *time.Time, and string field types.

Parameters:

  • source - Cursor source identifier (e.g., “jira”, “confluence”)
  • field - Output struct field name to extract

Returns: *Node[I, O] for method chaining

Example:

node := jira.FetchIssues(input).
    WithCursorUpdate("jira", "LastUpdated")

WithWindow

func (n *Node[I, O]) WithWindow(w Window) *Node[I, O]

Configures batched/windowed processing for this node. When used in a parallel step, the framework runs the downstream pipeline per batch instead of waiting for all data.

The framework injects WindowCursor and WindowSize fields into the node’s input struct via reflection.

Parameters:

  • w - Window configuration

Returns: *Node[I, O] for method chaining

Example:

node := jira.FetchIssues(input).
    WithWindow(core.Window{Size: 100}).
    WithCursorUpdate("jira", "LastUpdated")

WindowConfig

func (n *Node[I, O]) WindowConfig() Window

Returns the window configuration. Returns zero Window if not set.

Returns: Window

As

func (n *Node[I, O]) As(outputKey string) *Node[I, O]

Names the output of this node for reference by downstream nodes.

Parameters:

  • outputKey - Key to store output in FlowState

Returns: *Node[I, O] for method chaining

Example:

flow := core.NewFlow("pipeline").
    TriggeredBy(core.Manual("api")).
    Then(jira.FetchIssues(input).As("issues")).  // Output stored as "issues"
    Then(processNode).  // Can reference "issues" via magic markers
    Build()

Name

func (n *Node[I, O]) Name() string

Returns the node’s identifier.

OutputKey

func (n *Node[I, O]) OutputKey() string

Returns the key used to store this node’s output. Returns the custom key set via As(), or the node’s name if not set.

HasCompensation

func (n *Node[I, O]) HasCompensation() bool

Returns true if this node has a compensation handler.

Compensation

func (n *Node[I, O]) Compensation() ExecutableNode

Returns the compensation node, if any.

Input

func (n *Node[I, O]) Input() any

Returns the node’s input value (used for testing).

RateLimiterID

func (n *Node[I, O]) RateLimiterID() string

Returns the rate limiter ID for this node, or empty string if not configured.

Execute

func (n *Node[I, O]) Execute(ctx workflow.Context, state *FlowState) error

Runs the activity within a Temporal workflow context.

Execution steps:

  1. Apply rate limiting if configured
  2. Resolve magic markers in input
  3. Configure activity options (timeout, retry)
  4. Execute activity via Temporal
  5. Store result in FlowState

Parameters:

  • ctx - Temporal workflow context
  • state - Current flow state

Returns: Error if execution fails

Compensate

func (n *Node[I, O]) Compensate(ctx workflow.Context, state *FlowState) error

Runs the compensation activity if one is configured.

Parameters:

  • ctx - Temporal workflow context
  • state - Flow state snapshot from when the node executed

Returns: Error if compensation fails

Provider Pattern

Providers typically expose factory functions that return configured nodes:

package jira

// Provider function returns a ready-to-use node
func FetchIssues(input FetchInput) *core.Node[FetchInput, FetchOutput] {
    return core.NewNode("jira.FetchIssues", FetchIssuesActivity, input)
}

// Activity function (registered with worker)
func FetchIssuesActivity(ctx context.Context, input FetchInput) (FetchOutput, error) {
    // Implementation
}

Usage in flows:

flow := core.NewFlow("sync").
    TriggeredBy(core.Manual("api")).
    Then(jira.FetchIssues(jira.FetchInput{
        JQL: "project = PLATFORM",
    })).
    Build()

Complete Example

package main

import (
    "time"
    "github.com/resolute/resolute/core"
)

func main() {
    // Shared rate limiter for Jira API
    jiraLimiter := core.NewSharedRateLimiter("jira", 100, time.Minute)

    // Configure nodes with various options
    fetchNode := jira.FetchIssues(jira.FetchInput{
        JQL:    "project = PLATFORM",
        Cursor: core.CursorFor("jira"),
    }).
        As("issues").
        WithSharedRateLimit(jiraLimiter).
        WithTimeout(10 * time.Minute).
        WithRetry(core.RetryPolicy{
            MaximumAttempts: 5,
        }).
        WithWindow(core.Window{Size: 100}).
        WithCursorUpdate("jira", "LastUpdated")

    // Node with compensation
    createNode := orders.Create(orderInput).
        OnError(orders.Cancel(cancelInput))

    // Build flow with configured nodes
    flow := core.NewFlow("order-pipeline").
        TriggeredBy(core.Manual("api")).
        Then(fetchNode).
        Then(createNode).
        Build()
}

See Also