Worker
Worker
The Worker API provides a fluent builder for constructing and running Temporal workers with Resolute flows.
Types
WorkerConfig
type WorkerConfig struct {
TemporalHost string // Default: TEMPORAL_HOST env or "localhost:7233"
TaskQueue string // Required - no default
Namespace string // Default: TEMPORAL_NAMESPACE env or "default"
MaxConcurrent int // Default: 0 (unlimited)
}Configuration for connecting to Temporal and running the worker.
WorkerBuilder
type WorkerBuilder struct {
// unexported fields
}Fluent API for constructing and running workers.
Constructor
NewWorker
func NewWorker() *WorkerBuilderCreates a new worker builder with environment defaults loaded.
Returns: *WorkerBuilder for method chaining
Example:
worker := core.NewWorker()WorkerConfig Methods
Validate
func (c *WorkerConfig) Validate() errorChecks that required fields are set.
Returns: Error if TaskQueue is empty
WorkerBuilder Methods
WithConfig
func (b *WorkerBuilder) WithConfig(cfg WorkerConfig) *WorkerBuilderSets the worker configuration. Empty fields are populated from environment variables or defaults.
Parameters:
cfg- Worker configuration
Returns: *WorkerBuilder for method chaining
Example:
worker := core.NewWorker().
WithConfig(core.WorkerConfig{
TaskQueue: "my-queue",
TemporalHost: "localhost:7233",
Namespace: "production",
MaxConcurrent: 50,
})WithFlow
func (b *WorkerBuilder) WithFlow(f *Flow) *WorkerBuilderSets the flow to be executed by this worker.
Parameters:
f- Flow to register
Returns: *WorkerBuilder for method chaining
Example:
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(myFlow)WithProviders
func (b *WorkerBuilder) WithProviders(providers ...Provider) *WorkerBuilderAdds providers whose activities will be registered with the worker.
Parameters:
providers- One or more providers
Returns: *WorkerBuilder for method chaining
Example:
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(myFlow).
WithProviders(
jira.Provider(),
slack.Provider(),
github.Provider(),
)WithWebhookServer
func (b *WorkerBuilder) WithWebhookServer(addr string) *WorkerBuilderEnables the webhook server on the specified address. If the flow has a webhook trigger, incoming webhooks will start workflow executions.
Parameters:
addr- Server address (e.g., “:8080”)
Returns: *WorkerBuilder for method chaining
Example:
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(webhookFlow).
WithWebhookServer(":8080")Build
func (b *WorkerBuilder) Build() errorCreates the Temporal client and worker without starting them. Useful for testing or custom lifecycle management.
Returns: Error if configuration invalid or connection fails
Example:
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(flow)
if err := worker.Build(); err != nil {
log.Fatal(err)
}
// Access underlying objects
client := worker.Client()
temporalWorker := worker.Worker()Run
func (b *WorkerBuilder) Run() errorBuilds and runs the worker, blocking until interrupted. This is the typical entry point for a worker process.
Returns: Error if startup or execution fails
Example:
err := core.NewWorker().
WithConfig(core.WorkerConfig{
TaskQueue: "my-queue",
}).
WithFlow(myFlow).
WithProviders(jira.Provider()).
Run()
if err != nil {
log.Fatal(err)
}RunAsync
func (b *WorkerBuilder) RunAsync() (shutdown func(), err error)Builds and starts the worker in the background. Returns a shutdown function for graceful termination.
Returns:
shutdown- Function to call for graceful shutdownerr- Error if startup fails
Example:
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(flow)
shutdown, err := worker.RunAsync()
if err != nil {
log.Fatal(err)
}
defer shutdown()
// Do other work...
// Wait for signal
<-make(chan os.Signal, 1)Client
func (b *WorkerBuilder) Client() client.ClientReturns the underlying Temporal client after Build() has been called.
Returns: client.Client or nil if Build() not called
Worker
func (b *WorkerBuilder) Worker() worker.WorkerReturns the underlying Temporal worker after Build() has been called.
Returns: worker.Worker or nil if Build() not called
WebhookServer
func (b *WorkerBuilder) WebhookServer() *WebhookServerReturns the webhook server if configured.
Returns: *WebhookServer or nil if not enabled or Build() not called
Environment Variables
| Variable | Description | Default |
|---|---|---|
TEMPORAL_HOST | Temporal server address | localhost:7233 |
TEMPORAL_NAMESPACE | Temporal namespace | default |
Usage Patterns
Basic Worker
func main() {
err := core.NewWorker().
WithConfig(core.WorkerConfig{
TaskQueue: "my-queue",
}).
WithFlow(myFlow).
WithProviders(myProvider).
Run()
if err != nil {
log.Fatal(err)
}
}Worker with Webhook Server
func main() {
err := core.NewWorker().
WithConfig(core.WorkerConfig{
TaskQueue: "webhooks",
}).
WithFlow(webhookFlow).
WithWebhookServer(":8080").
Run()
if err != nil {
log.Fatal(err)
}
}Multiple Providers
func main() {
// Configure providers
jiraProvider := jira.NewProvider(jira.Config{
BaseURL: os.Getenv("JIRA_BASE_URL"),
APIToken: os.Getenv("JIRA_API_TOKEN"),
})
slackProvider := slack.NewProvider(slack.Config{
Token: os.Getenv("SLACK_TOKEN"),
})
// Run worker
err := core.NewWorker().
WithConfig(core.WorkerConfig{
TaskQueue: "notifications",
}).
WithFlow(notificationFlow).
WithProviders(jiraProvider, slackProvider).
Run()
if err != nil {
log.Fatal(err)
}
}Custom Lifecycle
func main() {
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(flow)
if err := worker.Build(); err != nil {
log.Fatal(err)
}
// Access Temporal primitives
c := worker.Client()
w := worker.Worker()
// Register additional workflows
w.RegisterWorkflow(anotherWorkflow)
// Custom startup logic
log.Println("Starting worker...")
// Run with custom interrupt handling
if err := w.Run(worker.InterruptCh()); err != nil {
log.Fatal(err)
}
c.Close()
}Background Worker with Shutdown
func main() {
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(flow)
shutdown, err := worker.RunAsync()
if err != nil {
log.Fatal(err)
}
// Handle shutdown signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down...")
shutdown()
}Complete Example
package main
import (
"log"
"os"
"os/signal"
"syscall"
"github.com/resolute/resolute/core"
"myapp/flows"
"myapp/providers/jira"
"myapp/providers/slack"
)
func main() {
// Load configuration
cfg := core.WorkerConfig{
TemporalHost: os.Getenv("TEMPORAL_HOST"),
Namespace: os.Getenv("TEMPORAL_NAMESPACE"),
TaskQueue: os.Getenv("TASK_QUEUE"),
MaxConcurrent: 50,
}
// Configure providers
jiraProvider := jira.NewProvider(jira.Config{
BaseURL: os.Getenv("JIRA_BASE_URL"),
Email: os.Getenv("JIRA_EMAIL"),
APIToken: os.Getenv("JIRA_API_TOKEN"),
})
slackProvider := slack.NewProvider(slack.Config{
Token: os.Getenv("SLACK_TOKEN"),
})
// Build and run worker
err := core.NewWorker().
WithConfig(cfg).
WithFlow(flows.DataSyncFlow).
WithProviders(jiraProvider, slackProvider).
WithWebhookServer(":8080").
Run()
if err != nil {
log.Fatal(err)
}
}See Also
- Flow - Flow builder
- Providers - Provider reference
- Worker Configuration - Deployment guide
- Temporal Cloud - Cloud deployment