Title here
Summary here
This example demonstrates incremental data synchronization between systems using Resolute’s cursor management and pagination patterns.
The sync workflow:
Sync Jira issues to an internal data warehouse, running hourly to capture updates.
package main
import (
"context"
"os"
"time"
"github.com/resolute/resolute/core"
"github.com/resolute/resolute/providers/jira"
"github.com/resolute/resolute/providers/transform"
)
func main() {
// Configure providers
jiraProvider := jira.NewProvider(jira.JiraConfig{
BaseURL: os.Getenv("JIRA_BASE_URL"),
Email: os.Getenv("JIRA_EMAIL"),
APIToken: os.Getenv("JIRA_API_TOKEN"),
})
transformProvider := transform.NewProvider()
// Build incremental sync flow
flow := core.NewFlow("jira-warehouse-sync").
TriggeredBy(core.Schedule("0 * * * *")). // Every hour
// Fetch issues updated since last sync
Then(jira.FetchIssues(jira.FetchInput{
JQL: "project = PLATFORM AND updated >= ${cursor}",
Cursor: core.CursorFor("jira-sync"),
Limit: 100,
}).As("issues")).
// Process if there are updates
When(hasUpdates).
// Transform to warehouse schema
Then(transform.Map(transform.MapInput{
Items: core.Output("issues.Items"),
Template: warehouseTemplate,
}).As("transformed")).
// Deduplicate by issue key
Then(transform.Unique(transform.UniqueInput{
Items: core.Output("transformed.Results"),
Key: "issue_key",
}).As("unique")).
// Upsert to warehouse in batches
Then(upsertToWarehouseNode.As("upserted")).
// Log sync stats
Then(logSyncStatsNode).
EndWhen().
Build()
// Run worker
err := core.NewWorker().
WithConfig(core.WorkerConfig{
TaskQueue: "data-sync",
}).
WithFlow(flow).
WithProviders(jiraProvider, transformProvider).
Run()
if err != nil {
panic(err)
}
}
func hasUpdates(s *core.FlowState) bool {
issues := core.Get[jira.FetchOutput](s, "issues")
return len(issues.Items) > 0
}
const warehouseTemplate = `{
"issue_key": "{{.Key}}",
"project": "{{.Project}}",
"summary": "{{.Summary}}",
"status": "{{.Status}}",
"assignee": "{{.Assignee}}",
"priority": "{{.Priority}}",
"created_at": "{{.Created}}",
"updated_at": "{{.Updated}}",
"story_points": {{.StoryPoints}},
"labels": {{json .Labels}},
"sync_timestamp": "{{now}}"
}`
// Custom node for warehouse upsert
type UpsertInput struct {
Records []map[string]any `json:"records"`
}
type UpsertOutput struct {
Inserted int `json:"inserted"`
Updated int `json:"updated"`
Failed int `json:"failed"`
}
var upsertToWarehouseNode = core.NewNode("upsert-warehouse", upsertToWarehouse)
func upsertToWarehouse(ctx context.Context, input UpsertInput) (UpsertOutput, error) {
// Implementation: batch upsert to your warehouse
// (Postgres, BigQuery, Snowflake, etc.)
return UpsertOutput{
Inserted: len(input.Records),
}, nil
}
var logSyncStatsNode = core.NewNode("log-stats", logSyncStats)
func logSyncStats(ctx context.Context, input UpsertOutput) (struct{}, error) {
log.Printf("Sync complete: %d inserted, %d updated, %d failed",
input.Inserted, input.Updated, input.Failed)
return struct{}{}, nil
}jira.FetchIssues(jira.FetchInput{
JQL: "project = PLATFORM AND updated >= ${cursor}",
Cursor: core.CursorFor("jira-sync"),
})The cursor automatically:
${cursor} in queriesCursors are persisted to Temporal’s workflow state:
// First run: cursor is empty, fetches all
// Subsequent runs: cursor = last updated timestampYou can also use custom cursor storage:
core.CursorFor("jira-sync").
WithStorage(redisStorage).
WithFormat("2006-01-02T15:04:05Z")For APIs with pagination:
flow := core.NewFlow("paginated-sync").
TriggeredBy(core.Schedule("0 * * * *")).
Then(fetchFirstPageNode.As("page")).
While(hasMorePages).
Then(processPageNode).
Then(fetchNextPageNode.As("page")).
EndWhile().
Build()
func hasMorePages(s *core.FlowState) bool {
page := core.Get[PageOutput](s, "page")
return page.HasMore
}Sync from multiple sources in parallel:
flow := core.NewFlow("multi-source-sync").
TriggeredBy(core.Schedule("0 * * * *")).
// Parallel fetch from multiple sources
Parallel().
Then(fetchJiraNode.As("jira")).
Then(fetchConfluenceNode.As("confluence")).
Then(fetchGitHubNode.As("github")).
EndParallel().
// Merge all sources
Then(transform.Merge(transform.MergeInput{
Collections: [][]any{
core.Output("jira.Items"),
core.Output("confluence.Items"),
core.Output("github.Items"),
},
}).As("merged")).
// Deduplicate across sources
Then(transform.Unique(transform.UniqueInput{
Items: core.Output("merged.Results"),
Key: "canonical_id",
}).As("unique")).
Then(upsertToWarehouseNode).
Build()Then(upsertToWarehouseNode.
OnError(func(err error, state *core.FlowState) *core.Node {
// Log failed records for retry
return logFailedRecordsNode
}).
WithRetry(3, time.Minute))flow := core.NewFlow("sync-with-dlq").
TriggeredBy(core.Schedule("0 * * * *")).
Then(fetchDataNode.As("data")).
Then(processDataNode.
OnError(sendToDLQNode)).
Build()flow := core.NewFlow("safe-sync").
TriggeredBy(core.Schedule("0 * * * *")).
Then(fetchWithCursor.As("data")).
Then(processData.
OnError(core.RollbackCursor("jira-sync"))).
Build()type SyncMetrics struct {
SourceRecords int `json:"source_records"`
Processed int `json:"processed"`
Inserted int `json:"inserted"`
Updated int `json:"updated"`
Failed int `json:"failed"`
Duration float64 `json:"duration_seconds"`
CursorPosition string `json:"cursor_position"`
}
var emitMetricsNode = core.NewNode("emit-metrics", emitMetrics)
func emitMetrics(ctx context.Context, input SyncMetrics) (struct{}, error) {
// Push to Prometheus, Datadog, etc.
return struct{}{}, nil
}flow := core.NewFlow("monitored-sync").
TriggeredBy(core.Schedule("0 * * * *")).
Then(recordStartTimeNode.As("start")).
Then(fetchDataNode.As("data")).
Then(processDataNode.As("processed")).
Then(upsertDataNode.As("upserted")).
Then(emitMetricsNode).
Build()For occasional full reindex (e.g., schema changes):
reindexFlow := core.NewFlow("full-reindex").
TriggeredBy(core.Manual("reindex")).
// Reset cursor to beginning
Then(core.ResetCursor("jira-sync")).
// Clear destination table
Then(truncateWarehouseTableNode).
// Fetch all with pagination
Then(fetchAllIssuesNode.As("page")).
While(hasMorePages).
Then(transformPageNode).
Then(upsertPageNode).
Then(fetchNextPageNode.As("page")).
EndWhile().
// Rebuild indexes
Then(rebuildIndexesNode).
Build()# Jira
export JIRA_BASE_URL="https://your-org.atlassian.net"
export JIRA_EMAIL="your-email@company.com"
export JIRA_API_TOKEN="your-api-token"
# Warehouse (example: Postgres)
export WAREHOUSE_HOST="localhost"
export WAREHOUSE_PORT="5432"
export WAREHOUSE_DB="analytics"
export WAREHOUSE_USER="sync_user"
export WAREHOUSE_PASSWORD="..."| Practice | Rationale |
|---|---|
| Use cursors, not timestamps in code | Cursors survive restarts and handle edge cases |
| Batch upserts | Reduce database round trips |
| Deduplicate before insert | APIs may return duplicates across pages |
| Log sync stats | Enable debugging and monitoring |
| Test with small batches | Verify transforms before full sync |