Worker Configuration
Worker Configuration
Workers execute workflows and activities. This guide covers configuration options for production deployments.
WorkerBuilder
Resolute provides a fluent builder for worker configuration:
package main
import (
"log"
"github.com/resolute/resolute/core"
"myapp/flows"
"myapp/providers/jira"
"myapp/providers/slack"
)
func main() {
err := core.NewWorker().
WithConfig(core.WorkerConfig{
TaskQueue: "data-sync",
TemporalHost: "localhost:7233",
Namespace: "production",
MaxConcurrent: 50,
}).
WithFlow(flows.DataSyncFlow).
WithProviders(
jira.Provider(),
slack.Provider(),
).
Run()
if err != nil {
log.Fatal(err)
}
}Configuration Options
WorkerConfig
type WorkerConfig struct {
// TemporalHost is the Temporal server address.
// Default: TEMPORAL_HOST env or "localhost:7233"
TemporalHost string
// TaskQueue identifies the work this worker handles.
// Required - no default.
TaskQueue string
// Namespace partitions workflows in Temporal.
// Default: TEMPORAL_NAMESPACE env or "default"
Namespace string
// MaxConcurrent limits concurrent activity executions.
// Default: 0 (unlimited)
MaxConcurrent int
}Environment Variables
| Variable | Description | Default |
|---|---|---|
TEMPORAL_HOST | Temporal server address | localhost:7233 |
TEMPORAL_NAMESPACE | Temporal namespace | default |
// Config loads from environment automatically
err := core.NewWorker().
WithConfig(core.WorkerConfig{
TaskQueue: "my-queue",
// TemporalHost and Namespace loaded from env
}).
Run()Builder Methods
WithConfig
Sets worker configuration:
worker := core.NewWorker().
WithConfig(core.WorkerConfig{
TaskQueue: "my-queue",
TemporalHost: "temporal.example.com:7233",
Namespace: "production",
MaxConcurrent: 100,
})WithFlow
Registers a flow (workflow) with the worker:
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(myFlow)WithProviders
Registers provider activities:
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(myFlow).
WithProviders(
jira.Provider(),
slack.Provider(),
github.Provider(),
)WithWebhookServer
Enables HTTP server for webhook triggers:
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(webhookTriggeredFlow).
WithWebhookServer(":8080")Lifecycle Methods
Run (Blocking)
Starts the worker and blocks until interrupted:
func main() {
err := core.NewWorker().
WithConfig(cfg).
WithFlow(flow).
Run()
if err != nil {
log.Fatal(err)
}
}RunAsync (Non-Blocking)
Starts the worker in the background:
func main() {
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(flow)
shutdown, err := worker.RunAsync()
if err != nil {
log.Fatal(err)
}
defer shutdown()
// Do other work...
// Wait for signal
<-make(chan os.Signal, 1)
}Build (Manual Lifecycle)
Creates client and worker without starting:
func main() {
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(flow)
if err := worker.Build(); err != nil {
log.Fatal(err)
}
// Access underlying Temporal objects
client := worker.Client()
temporalWorker := worker.Worker()
// Custom startup logic...
temporalWorker.Run(worker.InterruptCh())
}Production Configuration
Concurrency Tuning
cfg := core.WorkerConfig{
TaskQueue: "high-throughput",
MaxConcurrent: 100, // Tune based on activity characteristics
}Consider:
- CPU-bound activities: Lower concurrency (matches cores)
- I/O-bound activities: Higher concurrency (10x-100x cores)
- Memory-intensive: Limit based on available RAM
- External rate limits: Match provider API limits
Multiple Task Queues
Run specialized workers for different workloads:
// High-priority worker
go func() {
core.NewWorker().
WithConfig(core.WorkerConfig{
TaskQueue: "priority-high",
MaxConcurrent: 10,
}).
WithFlow(criticalFlow).
Run()
}()
// Bulk processing worker
go func() {
core.NewWorker().
WithConfig(core.WorkerConfig{
TaskQueue: "bulk-processing",
MaxConcurrent: 100,
}).
WithFlow(bulkFlow).
Run()
}()Graceful Shutdown
Workers handle SIGINT/SIGTERM automatically:
// Run() blocks until interrupt signal
err := core.NewWorker().
WithConfig(cfg).
WithFlow(flow).
Run()
// Worker drains in-flight work before exitingFor custom shutdown handling:
shutdown, err := worker.RunAsync()
if err != nil {
log.Fatal(err)
}
// Custom signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down gracefully...")
shutdown() // Drains in-flight workAccessing Underlying Objects
After Build() or Run(), access Temporal primitives:
worker := core.NewWorker().
WithConfig(cfg).
WithFlow(flow)
if err := worker.Build(); err != nil {
log.Fatal(err)
}
// Temporal client for starting workflows programmatically
client := worker.Client()
// Temporal worker for advanced configuration
temporalWorker := worker.Worker()
// Webhook server if enabled
webhookServer := worker.WebhookServer()Health Checks
Implement health endpoints for orchestrators:
package main
import (
"net/http"
"sync/atomic"
"github.com/resolute/resolute/core"
)
var healthy int32 = 1
func main() {
// Health endpoint
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
if atomic.LoadInt32(&healthy) == 1 {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
})
go http.ListenAndServe(":8081", nil)
// Run worker
err := core.NewWorker().
WithConfig(cfg).
WithFlow(flow).
Run()
atomic.StoreInt32(&healthy, 0)
if err != nil {
log.Fatal(err)
}
}Logging
Workers log key events automatically:
Starting worker for task queue: data-sync (host: localhost:7233, namespace: default)
Starting webhook server on :8080For structured logging, configure Temporal SDK:
import (
"go.temporal.io/sdk/client"
"log/slog"
)
// Custom logger adapter
type slogAdapter struct {
logger *slog.Logger
}
func (a *slogAdapter) Debug(msg string, keyvals ...interface{}) {
a.logger.Debug(msg, keyvals...)
}
// ... implement other methods
// Use with Temporal client
c, err := client.Dial(client.Options{
Logger: &slogAdapter{logger: slog.Default()},
})Common Patterns
Environment-Based Configuration
package main
import (
"os"
"strconv"
"github.com/resolute/resolute/core"
)
func configFromEnv() core.WorkerConfig {
maxConcurrent := 50
if v := os.Getenv("WORKER_MAX_CONCURRENT"); v != "" {
if n, err := strconv.Atoi(v); err == nil {
maxConcurrent = n
}
}
return core.WorkerConfig{
TaskQueue: os.Getenv("TASK_QUEUE"),
TemporalHost: os.Getenv("TEMPORAL_HOST"),
Namespace: os.Getenv("TEMPORAL_NAMESPACE"),
MaxConcurrent: maxConcurrent,
}
}
func main() {
err := core.NewWorker().
WithConfig(configFromEnv()).
WithFlow(flow).
Run()
if err != nil {
log.Fatal(err)
}
}Multiple Flows Per Worker
// Register multiple workflows with underlying worker
worker := core.NewWorker().
WithConfig(cfg)
if err := worker.Build(); err != nil {
log.Fatal(err)
}
// Register additional workflows
worker.Worker().RegisterWorkflow(flow1.Execute)
worker.Worker().RegisterWorkflow(flow2.Execute)
worker.Worker().RegisterWorkflow(flow3.Execute)
worker.Worker().Run(worker.InterruptCh())Provider Configuration
// Configure providers with credentials
jiraProvider := jira.NewProvider(jira.Config{
BaseURL: os.Getenv("JIRA_BASE_URL"),
Email: os.Getenv("JIRA_EMAIL"),
APIToken: os.Getenv("JIRA_API_TOKEN"),
})
slackProvider := slack.NewProvider(slack.Config{
Token: os.Getenv("SLACK_TOKEN"),
})
err := core.NewWorker().
WithConfig(cfg).
WithFlow(flow).
WithProviders(jiraProvider, slackProvider).
Run()See Also
- Temporal Cloud - Cloud deployment
- Self-Hosted - Self-hosted deployment
- Workers - Worker concepts
- Provider Registration - Activity registration