Transform Provider

Transform Provider

The Transform provider offers data transformation utilities for converting, mapping, filtering, and aggregating data within flows.

Installation

go get github.com/resolute/resolute/providers/transform

Configuration

The Transform provider requires no configuration as all operations are performed locally.

Provider Constructor

NewProvider

func NewProvider() *TransformProvider

Creates a new Transform provider.

Returns: *TransformProvider implementing core.Provider

Example:

provider := transform.NewProvider()

Activities

Map

Applies a transformation function to each element in a collection.

Input:

type MapInput struct {
    Items      []any  `json:"items"`
    Expression string `json:"expression"` // JSONPath or expression
    Template   string `json:"template"`   // Go template for transformation
}

Output:

type MapOutput struct {
    Results []any `json:"results"`
    Count   int   `json:"count"`
}

Node Factory:

func Map(input MapInput) *core.Node[MapInput, MapOutput]

Example:

mapNode := transform.Map(transform.MapInput{
    Items:    core.Output("issues.Items"),
    Template: `{"key": "{{.Key}}", "title": "{{.Summary}}"}`,
})

Filter

Filters a collection based on a condition.

Input:

type FilterInput struct {
    Items      []any  `json:"items"`
    Expression string `json:"expression"` // Filter expression
}

Output:

type FilterOutput struct {
    Results []any `json:"results"`
    Count   int   `json:"count"`
    Removed int   `json:"removed"`
}

Node Factory:

func Filter(input FilterInput) *core.Node[FilterInput, FilterOutput]

Example:

filterNode := transform.Filter(transform.FilterInput{
    Items:      core.Output("issues.Items"),
    Expression: ".Status == 'Open' && .Priority == 'High'",
})

Reduce

Reduces a collection to a single value.

Input:

type ReduceInput struct {
    Items       []any  `json:"items"`
    Expression  string `json:"expression"`
    InitialValue any   `json:"initial_value"`
}

Output:

type ReduceOutput struct {
    Result any `json:"result"`
}

Node Factory:

func Reduce(input ReduceInput) *core.Node[ReduceInput, ReduceOutput]

Example:

reduceNode := transform.Reduce(transform.ReduceInput{
    Items:        core.Output("orders.Items"),
    Expression:   "acc + item.Total",
    InitialValue: 0,
})

GroupBy

Groups items by a key.

Input:

type GroupByInput struct {
    Items []any  `json:"items"`
    Key   string `json:"key"` // Field to group by
}

Output:

type GroupByOutput struct {
    Groups map[string][]any `json:"groups"`
    Keys   []string         `json:"keys"`
}

Node Factory:

func GroupBy(input GroupByInput) *core.Node[GroupByInput, GroupByOutput]

Example:

groupNode := transform.GroupBy(transform.GroupByInput{
    Items: core.Output("issues.Items"),
    Key:   "Status",
})

Sort

Sorts a collection by a field.

Input:

type SortInput struct {
    Items      []any  `json:"items"`
    Key        string `json:"key"`
    Descending bool   `json:"descending"`
}

Output:

type SortOutput struct {
    Results []any `json:"results"`
}

Node Factory:

func Sort(input SortInput) *core.Node[SortInput, SortOutput]

Example:

sortNode := transform.Sort(transform.SortInput{
    Items:      core.Output("issues.Items"),
    Key:        "Priority",
    Descending: true,
})

Flatten

Flattens nested arrays.

Input:

type FlattenInput struct {
    Items []any `json:"items"`
    Depth int   `json:"depth"` // Flatten depth (default: 1, -1 for unlimited)
}

Output:

type FlattenOutput struct {
    Results []any `json:"results"`
    Count   int   `json:"count"`
}

Node Factory:

func Flatten(input FlattenInput) *core.Node[FlattenInput, FlattenOutput]

Unique

Removes duplicate items.

Input:

type UniqueInput struct {
    Items []any  `json:"items"`
    Key   string `json:"key"` // Optional: field to use for uniqueness
}

Output:

type UniqueOutput struct {
    Results    []any `json:"results"`
    Duplicates int   `json:"duplicates"`
}

Node Factory:

func Unique(input UniqueInput) *core.Node[UniqueInput, UniqueOutput]

Chunk

Splits a collection into chunks.

Input:

type ChunkInput struct {
    Items []any `json:"items"`
    Size  int   `json:"size"` // Chunk size
}

Output:

type ChunkOutput struct {
    Chunks [][]any `json:"chunks"`
    Count  int     `json:"count"`
}

Node Factory:

func Chunk(input ChunkInput) *core.Node[ChunkInput, ChunkOutput]

Example:

chunkNode := transform.Chunk(transform.ChunkInput{
    Items: core.Output("documents.Items"),
    Size:  100,
})

Merge

Merges multiple collections.

Input:

type MergeInput struct {
    Collections [][]any `json:"collections"`
    Unique      bool    `json:"unique"`
}

Output:

type MergeOutput struct {
    Results []any `json:"results"`
    Count   int   `json:"count"`
}

Node Factory:

func Merge(input MergeInput) *core.Node[MergeInput, MergeOutput]

Pick

Extracts specific fields from objects.

Input:

type PickInput struct {
    Items  []any    `json:"items"`
    Fields []string `json:"fields"`
}

Output:

type PickOutput struct {
    Results []map[string]any `json:"results"`
}

Node Factory:

func Pick(input PickInput) *core.Node[PickInput, PickOutput]

Example:

pickNode := transform.Pick(transform.PickInput{
    Items:  core.Output("issues.Items"),
    Fields: []string{"Key", "Summary", "Status"},
})

Omit

Removes specific fields from objects.

Input:

type OmitInput struct {
    Items  []any    `json:"items"`
    Fields []string `json:"fields"`
}

Output:

type OmitOutput struct {
    Results []map[string]any `json:"results"`
}

Node Factory:

func Omit(input OmitInput) *core.Node[OmitInput, OmitOutput]

JSONPath

Extracts data using JSONPath expressions.

Input:

type JSONPathInput struct {
    Data       any    `json:"data"`
    Expression string `json:"expression"`
}

Output:

type JSONPathOutput struct {
    Results []any `json:"results"`
    Count   int   `json:"count"`
}

Node Factory:

func JSONPath(input JSONPathInput) *core.Node[JSONPathInput, JSONPathOutput]

Example:

jsonPathNode := transform.JSONPath(transform.JSONPathInput{
    Data:       core.Output("response"),
    Expression: "$.items[*].metadata.name",
})

Template

Applies a Go template to data.

Input:

type TemplateInput struct {
    Data     any    `json:"data"`
    Template string `json:"template"`
}

Output:

type TemplateOutput struct {
    Result string `json:"result"`
}

Node Factory:

func Template(input TemplateInput) *core.Node[TemplateInput, TemplateOutput]

Example:

templateNode := transform.Template(transform.TemplateInput{
    Data: core.Output("issues"),
    Template: `
# Issue Report

Total Issues: {{len .Items}}

{{range .Items}}
- [{{.Key}}] {{.Summary}} ({{.Status}})
{{end}}
`,
})

Aggregate

Performs aggregation operations on numeric data.

Input:

type AggregateInput struct {
    Items     []any    `json:"items"`
    Field     string   `json:"field"`
    Operation string   `json:"operation"` // "sum", "avg", "min", "max", "count"
}

Output:

type AggregateOutput struct {
    Result float64 `json:"result"`
}

Node Factory:

func Aggregate(input AggregateInput) *core.Node[AggregateInput, AggregateOutput]

Example:

avgNode := transform.Aggregate(transform.AggregateInput{
    Items:     core.Output("orders.Items"),
    Field:     "Total",
    Operation: "avg",
})

Usage Patterns

Data Processing Pipeline

flow := core.NewFlow("data-pipeline").
    TriggeredBy(core.Schedule("0 * * * *")).
    Then(fetchDataNode.As("raw")).
    Then(transform.Filter(transform.FilterInput{
        Items:      core.Output("raw.Items"),
        Expression: ".Status == 'active'",
    }).As("filtered")).
    Then(transform.Map(transform.MapInput{
        Items:    core.Output("filtered.Results"),
        Template: `{"id": "{{.ID}}", "value": "{{.Value}}"}`,
    }).As("mapped")).
    Then(transform.GroupBy(transform.GroupByInput{
        Items: core.Output("mapped.Results"),
        Key:   "category",
    }).As("grouped")).
    Then(storeResultsNode).
    Build()

Batch Processing with Chunking

flow := core.NewFlow("batch-processor").
    TriggeredBy(core.Manual("api")).
    Then(fetchAllRecordsNode.As("records")).
    Then(transform.Chunk(transform.ChunkInput{
        Items: core.Output("records.Items"),
        Size:  100,
    }).As("batches")).
    ForEach(core.Output("batches.Chunks")).
        Then(processBatchNode).
    EndForEach().
    Build()

Report Generation

flow := core.NewFlow("report-generator").
    TriggeredBy(core.Schedule("0 9 * * 1")).
    Then(jira.FetchIssues(jira.FetchInput{
        Project: "PLATFORM",
    }).As("issues")).
    Then(transform.GroupBy(transform.GroupByInput{
        Items: core.Output("issues.Items"),
        Key:   "Status",
    }).As("by-status")).
    Then(transform.Aggregate(transform.AggregateInput{
        Items:     core.Output("issues.Items"),
        Field:     "StoryPoints",
        Operation: "sum",
    }).As("total-points")).
    Then(transform.Template(transform.TemplateInput{
        Data: map[string]any{
            "groups": core.Output("by-status.Groups"),
            "total":  core.Output("total-points.Result"),
        },
        Template: weeklyReportTemplate,
    }).As("report")).
    Then(sendReportNode).
    Build()

Data Deduplication

flow := core.NewFlow("deduplicate").
    TriggeredBy(core.Manual("api")).
    Then(fetchFromSourceANode.As("source-a")).
    Then(fetchFromSourceBNode.As("source-b")).
    Then(transform.Merge(transform.MergeInput{
        Collections: [][]any{
            core.Output("source-a.Items"),
            core.Output("source-b.Items"),
        },
    }).As("merged")).
    Then(transform.Unique(transform.UniqueInput{
        Items: core.Output("merged.Results"),
        Key:   "id",
    }).As("unique")).
    Then(storeUniqueItemsNode).
    Build()

Field Projection

flow := core.NewFlow("field-projection").
    TriggeredBy(core.Manual("api")).
    Then(fetchUserDataNode.As("users")).
    Then(transform.Pick(transform.PickInput{
        Items:  core.Output("users.Items"),
        Fields: []string{"id", "email", "name"},
    }).As("public")).
    Then(transform.Omit(transform.OmitInput{
        Items:  core.Output("users.Items"),
        Fields: []string{"password", "ssn", "creditCard"},
    }).As("safe")).
    Build()

Complete Example

package main

import (
    "github.com/resolute/resolute/core"
    "github.com/resolute/resolute/providers/jira"
    "github.com/resolute/resolute/providers/transform"
)

func main() {
    // Configure providers
    jiraProvider := jira.NewProvider(jira.JiraConfig{
        BaseURL:  os.Getenv("JIRA_BASE_URL"),
        Email:    os.Getenv("JIRA_EMAIL"),
        APIToken: os.Getenv("JIRA_API_TOKEN"),
    })

    transformProvider := transform.NewProvider()

    // Build analytics flow
    flow := core.NewFlow("sprint-analytics").
        TriggeredBy(core.Schedule("0 18 * * 5")).
        // Fetch sprint issues
        Then(jira.FetchIssues(jira.FetchInput{
            JQL: "project = PLATFORM AND sprint in openSprints()",
        }).As("issues")).
        // Group by assignee
        Then(transform.GroupBy(transform.GroupByInput{
            Items: core.Output("issues.Items"),
            Key:   "Assignee",
        }).As("by-assignee")).
        // Group by status
        Then(transform.GroupBy(transform.GroupByInput{
            Items: core.Output("issues.Items"),
            Key:   "Status",
        }).As("by-status")).
        // Calculate total story points
        Then(transform.Aggregate(transform.AggregateInput{
            Items:     core.Output("issues.Items"),
            Field:     "StoryPoints",
            Operation: "sum",
        }).As("total-points")).
        // Filter completed issues
        Then(transform.Filter(transform.FilterInput{
            Items:      core.Output("issues.Items"),
            Expression: ".Status == 'Done'",
        }).As("completed")).
        // Calculate completed points
        Then(transform.Aggregate(transform.AggregateInput{
            Items:     core.Output("completed.Results"),
            Field:     "StoryPoints",
            Operation: "sum",
        }).As("completed-points")).
        // Generate report
        Then(transform.Template(transform.TemplateInput{
            Data: map[string]any{
                "total":     core.Output("issues.Count"),
                "byStatus":  core.Output("by-status.Groups"),
                "byAssignee": core.Output("by-assignee.Groups"),
                "totalPoints": core.Output("total-points.Result"),
                "completedPoints": core.Output("completed-points.Result"),
            },
            Template: sprintReportTemplate,
        }).As("report")).
        // Send report
        Then(sendSlackReportNode).
        Build()

    // Run worker
    err := core.NewWorker().
        WithConfig(core.WorkerConfig{
            TaskQueue: "analytics",
        }).
        WithFlow(flow).
        WithProviders(jiraProvider, transformProvider).
        Run()

    if err != nil {
        panic(err)
    }
}

const sprintReportTemplate = `
# Sprint Report

## Summary
- Total Issues: {{.total}}
- Total Story Points: {{.totalPoints}}
- Completed Story Points: {{.completedPoints}}
- Velocity: {{printf "%.1f" (div .completedPoints .totalPoints | mul 100)}}%

## By Status
{{range $status, $issues := .byStatus}}
### {{$status}}: {{len $issues}} issues
{{end}}

## By Assignee
{{range $assignee, $issues := .byAssignee}}
- {{$assignee}}: {{len $issues}} issues
{{end}}
`

See Also