Pagination
Pagination
Pagination handles fetching data from paginated APIs that return results across multiple pages. Resolute provides built-in support for accumulating all pages into a single result.
Why Pagination Support?
Many APIs limit response size and require pagination:
- Jira returns max 100 issues per request
- GitHub limits to 100 items per page
- Most REST APIs paginate large collections
Without pagination support, you’d need to write loops and handle cursors manually. Resolute’s Paginate function handles this automatically.
Basic Pattern
Use core.Paginate to create a node that fetches all pages:
// Define how to fetch a single page
fetcher := func(ctx context.Context, cursor string) (core.PageResult[Issue], error) {
startAt := 0
if cursor != "" {
startAt, _ = strconv.Atoi(cursor)
}
result, err := jiraClient.Search(ctx, jql, startAt, 100)
if err != nil {
return core.PageResult[Issue]{}, err
}
nextCursor := ""
hasMore := startAt+len(result.Issues) < result.Total
if hasMore {
nextCursor = strconv.Itoa(startAt + len(result.Issues))
}
return core.PageResult[Issue]{
Items: result.Issues,
NextCursor: nextCursor,
HasMore: hasMore,
}, nil
}
// Create paginated node
fetchAllIssues := core.Paginate("fetch-all-issues", fetcher)PageResult Structure
The PageFetcher function returns a PageResult:
type PageResult[T any] struct {
Items []T // Results from this page
NextCursor string // Cursor for next page (empty = no more pages)
HasMore bool // Whether more pages exist
}Pagination stops when:
HasMoreisfalse, ORNextCursoris empty, ORMaxPageslimit is reached
Configuration Options
Limit Pages
// Fetch at most 10 pages
fetchNode := core.Paginate("fetch-issues", fetcher,
core.WithMaxPages(10),
)Set Page Size Hint
// Hint for fetcher (not enforced by Resolute)
fetchNode := core.Paginate("fetch-issues", fetcher,
core.WithPageSize(100),
)Cursor Persistence
Persist the final cursor for incremental fetching on next run:
fetchNode := core.Paginate("fetch-issues", fetcher,
core.WithCursorSource("jira-issues"), // Saves cursor to FlowState
)Complete Example: Jira Issue Sync
package main
import (
"context"
"strconv"
"time"
"github.com/resolute/resolute/core"
)
type Issue struct {
ID string
Key string
Summary string
Description string
UpdatedAt time.Time
}
type JiraSearchResult struct {
Issues []Issue
Total int
StartAt int
MaxResults int
}
func fetchIssuePage(client *JiraClient, jql string) core.PageFetcher[Issue] {
return func(ctx context.Context, cursor string) (core.PageResult[Issue], error) {
startAt := 0
if cursor != "" {
startAt, _ = strconv.Atoi(cursor)
}
result, err := client.Search(ctx, jql, startAt, 100)
if err != nil {
return core.PageResult[Issue]{}, err
}
nextCursor := ""
hasMore := startAt+len(result.Issues) < result.Total
if hasMore {
nextCursor = strconv.Itoa(startAt + len(result.Issues))
}
return core.PageResult[Issue]{
Items: result.Issues,
NextCursor: nextCursor,
HasMore: hasMore,
}, nil
}
}
func main() {
jiraClient := NewJiraClient(os.Getenv("JIRA_URL"), os.Getenv("JIRA_TOKEN"))
// Paginated fetch node
fetchIssues := core.Paginate(
"fetch-jira-issues",
fetchIssuePage(jiraClient, "project = PLATFORM ORDER BY updated DESC"),
core.WithMaxPages(50), // Safety limit
).WithTimeout(30 * time.Minute)
// Process all fetched issues
processIssues := core.NewNode("process-issues", processIssuesFn, ProcessInput{}).
WithInputFunc(func(s *core.FlowState) ProcessInput {
result := core.Get[core.PaginateOutput[Issue]](s, "fetch-jira-issues")
return ProcessInput{
Issues: result.Items,
TotalCount: result.TotalItems,
PageCount: result.PageCount,
}
})
flow := core.NewFlow("jira-sync").
TriggeredBy(core.Schedule("0 */2 * * *")). // Every 2 hours
Then(fetchIssues).
Then(processIssues).
Build()
core.NewWorker().
WithConfig(core.WorkerConfig{TaskQueue: "jira-sync"}).
WithFlow(flow).
Run()
}PaginateOutput
The paginated node produces a PaginateOutput:
type PaginateOutput[T any] struct {
Items []T // All collected items across all pages
FinalCursor string // Cursor after last page
PageCount int // Number of pages fetched
TotalItems int // Total count of items
}Access in subsequent nodes:
processNode := core.NewNode("process", processFn, ProcessInput{}).
WithInputFunc(func(s *core.FlowState) ProcessInput {
result := core.Get[core.PaginateOutput[Issue]](s, "fetch-issues")
return ProcessInput{
Items: result.Items, // All items
Count: result.TotalItems, // Total count
Pages: result.PageCount, // Pages fetched
}
})Pagination with Configuration
When the fetcher needs configuration (API credentials, filters):
type FetchConfig struct {
APIURL string
APIToken string
ProjectID string
JQL string
}
func fetchConfiguredPage(ctx context.Context, cfg FetchConfig, cursor string) (core.PageResult[Issue], error) {
client := NewClient(cfg.APIURL, cfg.APIToken)
startAt := 0
if cursor != "" {
startAt, _ = strconv.Atoi(cursor)
}
result, err := client.Search(ctx, cfg.JQL, startAt, 100)
if err != nil {
return core.PageResult[Issue]{}, err
}
// ... build PageResult
return pageResult, nil
}
// Create node with configuration
fetchNode := core.PaginateWithConfig[Issue, FetchConfig](
"fetch-issues",
fetchConfiguredPage,
core.WithMaxPages(100),
)
// Use in flow
flow := core.NewFlow("sync").
TriggeredBy(core.Manual("api")).
Then(fetchNode.WithInputFunc(func(s *core.FlowState) core.PaginateWithInputParams[FetchConfig] {
return core.PaginateWithInputParams[FetchConfig]{
Config: FetchConfig{
APIURL: os.Getenv("JIRA_URL"),
APIToken: os.Getenv("JIRA_TOKEN"),
ProjectID: "PLATFORM",
JQL: "project = PLATFORM",
},
StartCursor: "", // Start from beginning
}
})).
Then(processNode).
Build()Incremental Pagination
Resume from where the last run stopped using cursors:
fetchNode := core.Paginate("fetch-issues", fetcher).
WithInputFunc(func(s *core.FlowState) core.PaginateInput {
// Get cursor from previous run
cursor := s.GetCursor("jira-issues")
return core.PaginateInput{
StartCursor: cursor.Position,
}
})
// After pagination, update cursor
updateCursor := core.NewNode("update-cursor", updateCursorFn, UpdateCursorInput{}).
WithInputFunc(func(s *core.FlowState) UpdateCursorInput {
result := core.Get[core.PaginateOutput[Issue]](s, "fetch-issues")
return UpdateCursorInput{
Source: "jira-issues",
Position: result.FinalCursor,
}
})
flow := core.NewFlow("incremental-sync").
Then(fetchNode).
Then(processNode).
Then(updateCursor). // Save cursor for next run
Build()Cursor Types
Different APIs use different cursor types:
Offset-Based (Jira, many REST APIs)
fetcher := func(ctx context.Context, cursor string) (core.PageResult[Item], error) {
offset := 0
if cursor != "" {
offset, _ = strconv.Atoi(cursor)
}
result, _ := api.List(ctx, offset, 100)
nextCursor := ""
if offset+len(result.Items) < result.Total {
nextCursor = strconv.Itoa(offset + len(result.Items))
}
return core.PageResult[Item]{
Items: result.Items,
NextCursor: nextCursor,
HasMore: nextCursor != "",
}, nil
}Token-Based (GitHub, many modern APIs)
fetcher := func(ctx context.Context, cursor string) (core.PageResult[Item], error) {
result, _ := api.List(ctx, cursor, 100)
return core.PageResult[Item]{
Items: result.Items,
NextCursor: result.NextPageToken, // Opaque token from API
HasMore: result.HasNextPage,
}, nil
}Timestamp-Based
fetcher := func(ctx context.Context, cursor string) (core.PageResult[Item], error) {
since := time.Time{}
if cursor != "" {
since, _ = time.Parse(time.RFC3339, cursor)
}
result, _ := api.ListSince(ctx, since, 100)
nextCursor := ""
if len(result.Items) > 0 {
// Use last item's timestamp as cursor
lastItem := result.Items[len(result.Items)-1]
nextCursor = lastItem.UpdatedAt.Format(time.RFC3339)
}
return core.PageResult[Item]{
Items: result.Items,
NextCursor: nextCursor,
HasMore: len(result.Items) == 100, // Full page = probably more
}, nil
}Error Handling
Pagination handles errors gracefully:
fetcher := func(ctx context.Context, cursor string) (core.PageResult[Item], error) {
result, err := api.List(ctx, cursor)
if err != nil {
// Error stops pagination and triggers retry policy
return core.PageResult[Item]{}, fmt.Errorf("fetch page (cursor=%s): %w", cursor, err)
}
return result, nil
}
// Configure retries for the paginated node
fetchNode := core.Paginate("fetch", fetcher).
WithRetry(core.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumAttempts: 5,
})If a page fetch fails:
- Retry according to retry policy
- If all retries fail, node fails (partial results lost)
- On next flow run, pagination restarts (use cursor persistence to resume)
Rate Limiting with Pagination
Combine pagination with rate limiting:
fetchNode := core.Paginate("fetch-issues", fetcher).
WithRateLimit(50, time.Minute) // Max 50 pages per minuteFor API rate limits (vs page rate limits):
// Rate limit in the fetcher
limiter := rate.NewLimiter(rate.Every(time.Minute/100), 10) // 100/min with burst of 10
fetcher := func(ctx context.Context, cursor string) (core.PageResult[Item], error) {
if err := limiter.Wait(ctx); err != nil {
return core.PageResult[Item]{}, err
}
return api.List(ctx, cursor)
}Memory Considerations
All items are collected in memory. For very large datasets:
Option 1: Limit Pages
fetchNode := core.Paginate("fetch", fetcher,
core.WithMaxPages(100), // Cap at 100 pages
)Option 2: Process in Batches
Instead of fetching everything, process page by page:
// Custom activity that processes each page
func processAllPages(ctx context.Context, input ProcessInput) (ProcessOutput, error) {
cursor := input.StartCursor
var processedCount int
for {
page, err := api.List(ctx, cursor, 100)
if err != nil {
return ProcessOutput{}, err
}
// Process this page immediately (don't accumulate)
for _, item := range page.Items {
if err := processItem(ctx, item); err != nil {
return ProcessOutput{}, err
}
processedCount++
}
if !page.HasMore {
break
}
cursor = page.NextCursor
}
return ProcessOutput{Processed: processedCount}, nil
}Option 3: Use Data References
Store large datasets externally:
func fetchAndStore(ctx context.Context, input FetchInput) (FetchOutput, error) {
var allItems []Item
cursor := ""
for {
page, _ := api.List(ctx, cursor, 100)
allItems = append(allItems, page.Items...)
if !page.HasMore {
break
}
cursor = page.NextCursor
}
// Store to S3 instead of returning
ref, err := storage.Store(ctx, allItems)
if err != nil {
return FetchOutput{}, err
}
return FetchOutput{
Ref: ref, // Reference to S3 object
Count: len(allItems),
}, nil
}Best Practices
1. Set Reasonable Page Limits
// Prevent runaway pagination
fetchNode := core.Paginate("fetch", fetcher,
core.WithMaxPages(100),
)2. Configure Appropriate Timeouts
Pagination can take a long time:
fetchNode := core.Paginate("fetch", fetcher).
WithTimeout(30 * time.Minute) // Allow time for many pages3. Log Progress for Debugging
fetcher := func(ctx context.Context, cursor string) (core.PageResult[Item], error) {
result, _ := api.List(ctx, cursor, 100)
log.Printf("Fetched page: cursor=%s, items=%d, hasMore=%v",
cursor, len(result.Items), result.HasMore)
return result, nil
}4. Handle Empty Pages
fetcher := func(ctx context.Context, cursor string) (core.PageResult[Item], error) {
result, _ := api.List(ctx, cursor, 100)
// Empty page might not mean "done" for some APIs
if len(result.Items) == 0 && result.HasMore {
log.Printf("Warning: empty page with hasMore=true at cursor=%s", cursor)
}
return result, nil
}See Also
- Sequential Steps - Using pagination in flows
- Rate Limiting - Combining with rate limits
- Data References - Handling large datasets
- FlowState - Cursor persistence