v0.2.0-alpha

v0.2.0-alpha

Released: February 2026

Windowed processing, automatic scheduling, and production deployment features.

New Features

Windowed/Batched Processing

Nodes in parallel steps can now process data in batches instead of fetching everything at once. The framework loops: fetch batch, run downstream pipeline, persist cursors, repeat.

jira.FetchIssues(input).
    WithWindow(core.Window{Size: 100}).
    WithCursorUpdate("jira", "LastUpdated")

New APIs:

APIDescription
WithWindow(Window)Attach to a fetch node to enable batched processing
Window{Size: N}Configure batch size
WindowOutput()DataRef marker that resolves to the current batch’s output
WindowConfig()Returns the node’s window configuration
WindowedNode interfaceExtends ExecutableNode for windowed execution
FlowState.NewBatchState()Creates isolated state per batch, inheriting parent cursors
FlowState.SetWindowMeta() / GetWindowMeta()Ephemeral cursor and size tracking per batch

Cursor-Based Incremental Sync

Nodes can now automatically update cursors from their output after execution.

node := jira.FetchIssues(input).
    WithCursorUpdate("jira", "LastUpdated")

The named field is extracted from the activity output and persisted as the cursor position. Supports time.Time, *time.Time, and string field types.

Automatic Temporal Schedule Management

Workers with core.Schedule(...) triggered flows now automatically create or update Temporal Schedules on startup.

  • Idempotent: creates on first run, updates cron expression on subsequent runs
  • Overlap policy set to SKIP (no concurrent executions)
  • Schedule ID derived from flow name: <flow-name>-schedule
flow := core.NewFlow("data-sync").
    TriggeredBy(core.Schedule("*/30 * * * *")).
    // ...
    Build()

// Worker automatically creates/updates the "data-sync-schedule" Temporal Schedule
core.NewWorker().
    WithFlow(flow).
    Run()

Input Validation

Nodes can validate input structs using struct tags before execution.

type Input struct {
    Name string `validate:"required"`
    Age  int    `validate:"min=0,max=150"`
}

node := core.NewNode("validate", myActivity, input).WithValidation()

Error Classification

Nodes can classify errors for Temporal retry decisions. Terminal errors are marked as non-retryable.

node := jira.FetchIssues(input).WithErrorClassifier(core.HTTPErrorClassifier)

Improvements

Parallel Execution Rewrite

Replaced Temporal Selector-based parallel execution with workflow.Go + BufferedChannel pattern.

  • Proper per-node error attribution (error messages now include the failing node name and input)
  • Compensation snapshots taken before parallel execution for correct rollback state

State Management

  • Added inMemoryBackend as default when no state backend is configured
  • mergeCursors() correctly updates parent cursors from batch state, keeping the chronologically later position

Workflow Registration

Workflows are now registered with explicit names via RegisterWorkflowWithOptions, enabling schedule and trigger references by flow name.

Makefile

  • Added test-flows target for testing flow modules
  • All targets (lint, fmt, build, verify, update-deps, vet) now include ../flows/* modules

Dependencies

  • go.temporal.io/api promoted from indirect to direct dependency (required for schedule overlap policy)

Installation

go get github.com/resolute-sh/resolute@v0.2.0-alpha

Migration from v0.1.0-alpha

No breaking changes to existing APIs. All new features are additive:

  • WithWindow, WithCursorUpdate, WithValidation, WithErrorClassifier are optional node modifiers
  • Automatic schedule creation only activates when a flow uses core.Schedule(...) trigger
  • Parallel execution behavior is unchanged from the caller’s perspective (internal implementation improved)

Full Changelog

v0.1.0-alpha…v0.2.0-alpha