Title here
Summary here
Data References implement the “claim check” pattern for handling large datasets that shouldn’t pass through Temporal’s event history. Instead of storing large payloads in workflow state, store a reference to external storage.
Temporal has payload size limits:
For large datasets (thousands of items, large documents):
The solution: store data externally, pass references through the workflow.
Without Data References With Data References
──────────────────────── ─────────────────────────────
┌──────────┐ ┌──────────┐
│ Fetch │ │ Fetch │
│ 10K items│ │ 10K items│
└────┬─────┘ └────┬─────┘
│ │
│ 10K items │ Store to S3
│ in history │ Return ref only
▼ ▼
┌──────────┐ ┌──────────┐
│ Process │ │ Process │
│ 10K items│ │ (ref) │
└────┬─────┘ └────┬─────┘
│ │
│ 10K results │ Load from S3
│ in history │ Store results to S3
▼ ▼
┌──────────┐ ┌──────────┐
│ Store │ │ Store │
└──────────┘ └──────────┘
History: ~60MB History: ~1KBtype DataRef struct {
StorageKey string // Key in external storage (e.g., S3 path)
Backend string // Storage backend identifier
}type FetchOutput struct {
Ref core.DataRef // Reference to stored data
Count int // Metadata (small, ok in history)
}
func fetchIssues(ctx context.Context, input FetchInput) (FetchOutput, error) {
// Fetch large dataset
issues, err := jiraClient.FetchAll(ctx, input.JQL)
if err != nil {
return FetchOutput{}, err
}
// Store to S3 instead of returning directly
ref, err := storage.Store(ctx, "issues", issues)
if err != nil {
return FetchOutput{}, fmt.Errorf("store issues: %w", err)
}
return FetchOutput{
Ref: ref,
Count: len(issues),
}, nil
}type ProcessInput struct {
IssuesRef core.DataRef
BatchSize int
}
func processIssues(ctx context.Context, input ProcessInput) (ProcessOutput, error) {
// Load data from reference
var issues []Issue
if err := storage.Load(ctx, input.IssuesRef, &issues); err != nil {
return ProcessOutput{}, fmt.Errorf("load issues: %w", err)
}
// Process the data
results := make([]ProcessedIssue, 0, len(issues))
for _, issue := range issues {
result := process(issue)
results = append(results, result)
}
// Store results and return reference
ref, err := storage.Store(ctx, "processed", results)
if err != nil {
return ProcessOutput{}, err
}
return ProcessOutput{
Ref: ref,
Processed: len(results),
}, nil
}processNode := core.NewNode("process", processIssues, ProcessInput{}).
WithInputFunc(func(s *core.FlowState) ProcessInput {
fetchResult := core.Get[FetchOutput](s, "fetch")
return ProcessInput{
IssuesRef: fetchResult.Ref, // Pass reference, not data
BatchSize: 100,
}
})package storage
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/resolute/resolute/core"
)
type S3Storage struct {
client *s3.Client
bucket string
}
func NewS3Storage(client *s3.Client, bucket string) *S3Storage {
return &S3Storage{
client: client,
bucket: bucket,
}
}
func (s *S3Storage) Store(ctx context.Context, prefix string, data interface{}) (core.DataRef, error) {
// Generate unique key
key := fmt.Sprintf("%s/%s.json", prefix, uuid.New().String())
// Serialize data
body, err := json.Marshal(data)
if err != nil {
return core.DataRef{}, fmt.Errorf("marshal data: %w", err)
}
// Upload to S3
_, err = s.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: bytes.NewReader(body),
})
if err != nil {
return core.DataRef{}, fmt.Errorf("upload to s3: %w", err)
}
return core.DataRef{
StorageKey: key,
Backend: "s3",
}, nil
}
func (s *S3Storage) Load(ctx context.Context, ref core.DataRef, dest interface{}) error {
// Download from S3
result, err := s.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(ref.StorageKey),
})
if err != nil {
return fmt.Errorf("download from s3: %w", err)
}
defer result.Body.Close()
// Deserialize
if err := json.NewDecoder(result.Body).Decode(dest); err != nil {
return fmt.Errorf("decode data: %w", err)
}
return nil
}
func (s *S3Storage) Delete(ctx context.Context, ref core.DataRef) error {
_, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(ref.StorageKey),
})
return err
}Data enrichment pipeline with external storage:
package main
import (
"context"
"fmt"
"time"
"github.com/resolute/resolute/core"
)
var storage *S3Storage
type Issue struct {
ID string
Key string
Summary string
Description string
}
type EnrichedIssue struct {
Issue
Embedding []float32
Tags []string
}
type FetchOutput struct {
Ref core.DataRef
Count int
}
type EnrichOutput struct {
Ref core.DataRef
Enriched int
Skipped int
}
type StoreOutput struct {
Stored int
}
func fetchIssues(ctx context.Context, input FetchInput) (FetchOutput, error) {
issues, err := jiraClient.FetchAll(ctx, input.JQL)
if err != nil {
return FetchOutput{}, err
}
ref, err := storage.Store(ctx, "fetch/issues", issues)
if err != nil {
return FetchOutput{}, err
}
return FetchOutput{
Ref: ref,
Count: len(issues),
}, nil
}
func enrichIssues(ctx context.Context, input EnrichInput) (EnrichOutput, error) {
// Load issues from reference
var issues []Issue
if err := storage.Load(ctx, input.IssuesRef, &issues); err != nil {
return EnrichOutput{}, err
}
// Enrich each issue
enriched := make([]EnrichedIssue, 0, len(issues))
var skipped int
for _, issue := range issues {
embedding, err := ollama.Embed(ctx, issue.Description)
if err != nil {
skipped++
continue
}
enriched = append(enriched, EnrichedIssue{
Issue: issue,
Embedding: embedding,
Tags: extractTags(issue),
})
}
// Store enriched data
ref, err := storage.Store(ctx, "enrich/issues", enriched)
if err != nil {
return EnrichOutput{}, err
}
return EnrichOutput{
Ref: ref,
Enriched: len(enriched),
Skipped: skipped,
}, nil
}
func storeToVectorDB(ctx context.Context, input StoreInput) (StoreOutput, error) {
// Load enriched issues
var issues []EnrichedIssue
if err := storage.Load(ctx, input.EnrichedRef, &issues); err != nil {
return StoreOutput{}, err
}
// Upsert to vector database
for _, issue := range issues {
if err := qdrant.Upsert(ctx, issue.ID, issue.Embedding, issue); err != nil {
return StoreOutput{}, fmt.Errorf("upsert %s: %w", issue.ID, err)
}
}
return StoreOutput{Stored: len(issues)}, nil
}
func main() {
fetchNode := core.NewNode("fetch", fetchIssues, FetchInput{
JQL: "project = PLATFORM",
}).WithTimeout(30 * time.Minute)
enrichNode := core.NewNode("enrich", enrichIssues, EnrichInput{}).
WithInputFunc(func(s *core.FlowState) EnrichInput {
result := core.Get[FetchOutput](s, "fetch")
return EnrichInput{IssuesRef: result.Ref}
}).
WithTimeout(1 * time.Hour)
storeNode := core.NewNode("store", storeToVectorDB, StoreInput{}).
WithInputFunc(func(s *core.FlowState) StoreInput {
result := core.Get[EnrichOutput](s, "enrich")
return StoreInput{EnrichedRef: result.Ref}
}).
WithTimeout(30 * time.Minute)
flow := core.NewFlow("issue-enrichment").
TriggeredBy(core.Schedule("0 2 * * *")). // Daily at 2 AM
Then(fetchNode).
Then(enrichNode).
Then(storeNode).
Build()
core.NewWorker().
WithConfig(core.WorkerConfig{TaskQueue: "enrichment"}).
WithFlow(flow).
Run()
}For simpler cases, use OutputRef to automatically resolve references:
// Fetch stores data and returns output with Ref field
type FetchOutput struct {
Ref core.DataRef
Count int
}
// Enrich uses OutputRef marker
enrichNode := core.NewNode("enrich", enrichIssues, EnrichInput{
IssuesRef: core.OutputRef("fetch"), // Auto-resolves to FetchOutput.Ref
})The framework extracts the Ref field from the referenced node’s output.
Delete temporary data after workflow completes:
func cleanupRefs(ctx context.Context, input CleanupInput) (CleanupOutput, error) {
for _, ref := range input.Refs {
if err := storage.Delete(ctx, ref); err != nil {
log.Printf("Failed to delete %s: %v", ref.StorageKey, err)
}
}
return CleanupOutput{Deleted: len(input.Refs)}, nil
}
cleanupNode := core.NewNode("cleanup", cleanupRefs, CleanupInput{}).
WithInputFunc(func(s *core.FlowState) CleanupInput {
return CleanupInput{
Refs: []core.DataRef{
core.Get[FetchOutput](s, "fetch").Ref,
core.Get[EnrichOutput](s, "enrich").Ref,
},
}
})
flow := core.NewFlow("pipeline").
Then(fetchNode).
Then(enrichNode).
Then(storeNode).
Then(cleanupNode). // Clean up temporary data
Build()type FetchOutput struct {
Ref core.DataRef // Reference for large data
Count int // Item count (useful for logging/monitoring)
Size int64 // Data size in bytes
}func (s *S3Storage) StoreWithTTL(ctx context.Context, prefix string, data interface{}, ttl time.Duration) (core.DataRef, error) {
// Set S3 lifecycle rule or object expiration
expires := time.Now().Add(ttl)
_, err := s.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: bytes.NewReader(body),
Expires: aws.Time(expires),
})
// ...
}func loadWithFallback(ctx context.Context, ref core.DataRef, dest interface{}) error {
err := storage.Load(ctx, ref, dest)
if err != nil {
if isNotFoundError(err) {
// Handle gracefully - data may have expired
log.Printf("Reference %s not found, using empty data", ref.StorageKey)
return nil
}
return err
}
return nil
}func (s *S3Storage) StoreCompressed(ctx context.Context, prefix string, data interface{}) (core.DataRef, error) {
body, _ := json.Marshal(data)
var compressed bytes.Buffer
gz := gzip.NewWriter(&compressed)
gz.Write(body)
gz.Close()
// Upload compressed data
// ...
}