types

package
v0.0.0-...-d81d4e0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Default retry settings
	DefaultMaxRetries    = 3
	DefaultRetryBackoff  = "exponential" // exponential, linear, fixed
	DefaultInitialDelay  = "1s"
	DefaultMaxDelay      = "5m"
	DefaultBackoffFactor = 2.0

	// Default timeouts
	DefaultTaskTimeout     = "5m"
	DefaultDequeueTimeout  = "30s"
	DefaultShutdownTimeout = "30s"

	// Default queue settings
	DefaultQueueName    = "default"
	DefaultPriority     = PriorityNormal
	DefaultBatchSize    = 10
	DefaultMaxQueueSize = 10000

	// Default worker settings
	DefaultWorkerConcurrency = 5
	DefaultHeartbeatInterval = "30s"
	DefaultWorkerTimeout     = "5m"

	// Circuit breaker defaults
	DefaultCircuitBreakerThreshold   = 5     // failures before opening
	DefaultCircuitBreakerTimeout     = "60s" // how long to stay open
	DefaultCircuitBreakerMaxRequests = 3     // max requests in half-open state

	// Rate limiting defaults
	DefaultRateLimit = 100 // requests per second
	DefaultRateBurst = 10  // burst size

	// Monitoring defaults
	DefaultMetricsPort = 9090
	DefaultHealthPath  = "/health"
	DefaultMetricsPath = "/metrics"
	DefaultReadyPath   = "/ready"
)

Constants for default values

Variables

View Source
var (
	// Task errors
	ErrTaskNotFound      = errors.New("task not found")
	ErrTaskInvalidStatus = errors.New("invalid task status")
	ErrTaskTimeout       = errors.New("task execution timeout")
	ErrTaskCancelled     = errors.New("task was cancelled")
	ErrTaskDeadline      = errors.New("task deadline exceeded")
	ErrTaskRetryExceeded = errors.New("maximum retries exceeded")
	ErrTaskDuplicate     = errors.New("duplicate task detected")

	// Queue errors
	ErrQueueNotFound = errors.New("queue not found")
	ErrQueueFull     = errors.New("queue is full")
	ErrQueueEmpty    = errors.New("queue is empty")
	ErrQueueClosed   = errors.New("queue is closed")

	// Worker errors
	ErrWorkerNotFound   = errors.New("worker not found")
	ErrWorkerOffline    = errors.New("worker is offline")
	ErrWorkerOverloaded = errors.New("worker is overloaded")
	ErrWorkerShutdown   = errors.New("worker is shutting down")

	// Processor errors
	ErrProcessorNotFound = errors.New("no processor found for task type")
	ErrProcessorFailed   = errors.New("task processor failed")

	// Validation errors
	ErrInvalidPayload  = errors.New("invalid task payload")
	ErrInvalidPriority = errors.New("invalid task priority")
	ErrInvalidTaskType = errors.New("invalid task type")
	ErrMissingRequired = errors.New("missing required field")

	// Backend errors
	ErrBackendUnavailable = errors.New("backend is unavailable")
	ErrBackendTimeout     = errors.New("backend operation timeout")
	ErrBackendConnection  = errors.New("backend connection error")

	// Rate limiting errors
	ErrRateLimitExceeded = errors.New("rate limit exceeded")

	// Circuit breaker errors
	ErrCircuitBreakerOpen = errors.New("circuit breaker is open")
)

Common error variables

QueuePriorities defines the processing order for different priority levels

RetryableErrorCodes lists all error codes that should trigger retries

SupportedTaskTypes lists all supported task types

Functions

func CalculateNextRetry

func CalculateNextRetry(currentRetry int, strategy string, initialDelay, maxDelay time.Duration, factor float64) time.Time

CalculateNextRetry calculates the next retry time based on backoff strategy

func GenerateCorrelationID

func GenerateCorrelationID() string

GenerateCorrelationID generates a correlation ID for request tracing

func GenerateDedupeKey

func GenerateDedupeKey(taskType TaskType, payload []byte) string

GenerateDedupeKey generates a deduplication key based on task content

func GetPriorityWeight

func GetPriorityWeight(priority Priority) int

GetPriorityWeight returns a numeric weight for priority-based sorting

func IsTerminalStatus

func IsTerminalStatus(status TaskStatus) bool

IsTerminalStatus checks if a task status is terminal (won't change)

func IsValidPriority

func IsValidPriority(priority Priority) bool

IsValidPriority checks if a priority is valid

func IsValidTaskStatus

func IsValidTaskStatus(status TaskStatus) bool

IsValidTaskStatus checks if a task status is valid

func IsValidTaskType

func IsValidTaskType(taskType TaskType) bool

IsValidTaskType checks if a task type is valid

Types

type APIConfig

type APIConfig struct {
	Enabled bool      `json:"enabled" yaml:"enabled"`
	Host    string    `json:"host" yaml:"host"`
	Port    int       `json:"port" yaml:"port"`
	TLS     TLSConfig `json:"tls" yaml:"tls"`

	// Timeouts
	ReadTimeout  time.Duration `json:"read_timeout" yaml:"read_timeout"`
	WriteTimeout time.Duration `json:"write_timeout" yaml:"write_timeout"`
	IdleTimeout  time.Duration `json:"idle_timeout" yaml:"idle_timeout"`

	// Middleware
	EnableCORS    bool `json:"enable_cors" yaml:"enable_cors"`
	EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics"`
	EnableAuth    bool `json:"enable_auth" yaml:"enable_auth"`

	// Rate limiting
	RateLimit int `json:"rate_limit" yaml:"rate_limit"` // requests per second
	RateBurst int `json:"rate_burst" yaml:"rate_burst"` // burst size
}

APIConfig contains API server configuration

type APIKeyConfig

type APIKeyConfig struct {
	HeaderName string   `json:"header_name" yaml:"header_name"`
	Keys       []string `json:"keys" yaml:"keys"`
}

APIKeyConfig contains API key configuration

type AppConfig

type AppConfig struct {
	Name        string `json:"name" yaml:"name"`
	Version     string `json:"version" yaml:"version"`
	Environment string `json:"environment" yaml:"environment"` // dev, staging, prod
	Debug       bool   `json:"debug" yaml:"debug"`
}

AppConfig contains general application settings

type AuthConfig

type AuthConfig struct {
	Type     AuthType `json:"type" yaml:"type"`
	Enabled  bool     `json:"enabled" yaml:"enabled"`
	Required bool     `json:"required" yaml:"required"`
}

AuthConfig contains authentication configuration

type AuthType

type AuthType string

AuthType represents the authentication method

const (
	AuthTypeNone   AuthType = "none"
	AuthTypeBasic  AuthType = "basic"
	AuthTypeJWT    AuthType = "jwt"
	AuthTypeAPIKey AuthType = "api_key"
)

type BatchPayload

type BatchPayload struct {
	Tasks       []BatchTask `json:"tasks" validate:"required,min=1"`
	Sequential  bool        `json:"sequential,omitempty"`    // Process tasks in sequence
	StopOnError bool        `json:"stop_on_error,omitempty"` // Stop batch if any task fails

	// Progress tracking
	CallbackURL     string            `json:"callback_url,omitempty"` // Progress webhook
	CallbackHeaders map[string]string `json:"callback_headers,omitempty"`
}

BatchPayload represents the payload for batch operations

type BatchTask

type BatchTask struct {
	ID       string      `json:"id"` // Unique ID within batch
	Type     TaskType    `json:"type" validate:"required"`
	Payload  interface{} `json:"payload" validate:"required"`
	Priority Priority    `json:"priority,omitempty"`
	Options  TaskOptions `json:"options,omitempty"`
}

BatchTask represents a single task within a batch

type CircuitBreaker

type CircuitBreaker interface {
	// Execute runs the function with circuit breaker protection
	Execute(fn func() error) error

	// State returns the current circuit breaker state
	State() CircuitBreakerState

	// Reset manually resets the circuit breaker
	Reset()
}

CircuitBreaker defines the interface for circuit breaker pattern

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	Enabled      bool          `json:"enabled" yaml:"enabled"`
	Threshold    int           `json:"threshold" yaml:"threshold"`         // failures before opening
	Timeout      time.Duration `json:"timeout" yaml:"timeout"`             // how long to stay open
	MaxRequests  int           `json:"max_requests" yaml:"max_requests"`   // max requests in half-open
	ResetTimeout time.Duration `json:"reset_timeout" yaml:"reset_timeout"` // time to reset counters

	// Per-service settings
	Services map[string]ServiceCircuitConfig `json:"services" yaml:"services"`
}

CircuitBreakerConfig contains circuit breaker configuration

type CircuitBreakerState

type CircuitBreakerState string

CircuitBreakerState represents the state of a circuit breaker

const (
	CircuitBreakerClosed   CircuitBreakerState = "closed"    // Normal operation
	CircuitBreakerOpen     CircuitBreakerState = "open"      // Failing fast
	CircuitBreakerHalfOpen CircuitBreakerState = "half_open" // Testing if service recovered
)

type Config

type Config struct {
	// Application settings
	App AppConfig `json:"app" yaml:"app"`

	// Queue backend configuration
	Queue QueueConfig `json:"queue" yaml:"queue"`

	// Worker configuration
	Worker WorkerConfig `json:"worker" yaml:"worker"`

	// Scheduler configuration
	Scheduler SchedulerConfig `json:"scheduler" yaml:"scheduler"`

	// API server configuration
	API APIConfig `json:"api" yaml:"api"`

	// Monitoring and observability
	Metrics MetricsConfig `json:"metrics" yaml:"metrics"`
	Logging LoggingConfig `json:"logging" yaml:"logging"`

	// Circuit breaker configuration
	CircuitBreaker CircuitBreakerConfig `json:"circuit_breaker" yaml:"circuit_breaker"`

	// Rate limiting configuration
	RateLimit RateLimitConfig `json:"rate_limit" yaml:"rate_limit"`

	// Security configuration
	Security SecurityConfig `json:"security" yaml:"security"`
}

Config represents the main configuration for TaskForge

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a configuration with sensible defaults

type DataOperation

type DataOperation struct {
	Type       DataOperationType      `json:"type" validate:"required"`
	Parameters map[string]interface{} `json:"parameters,omitempty"`
}

DataOperation represents a data transformation operation

type DataOperationType

type DataOperationType string

DataOperationType defines available data operations

const (
	DataOpFilter    DataOperationType = "filter"    // Filter rows
	DataOpTransform DataOperationType = "transform" // Transform columns
	DataOpAggregate DataOperationType = "aggregate" // Group and aggregate
	DataOpJoin      DataOperationType = "join"      // Join with other data
	DataOpSort      DataOperationType = "sort"      // Sort data
	DataOpDedupe    DataOperationType = "dedupe"    // Remove duplicates
	DataOpValidate  DataOperationType = "validate"  // Validate data quality
)

type DataProcessPayload

type DataProcessPayload struct {
	SourceType   DataSourceType   `json:"source_type" validate:"required"`
	SourceConfig DataSourceConfig `json:"source_config" validate:"required"`

	TargetType   DataTargetType   `json:"target_type" validate:"required"`
	TargetConfig DataTargetConfig `json:"target_config" validate:"required"`

	Operations []DataOperation `json:"operations,omitempty"`

	// Processing options
	BatchSize int `json:"batch_size,omitempty"` // Records per batch
	MaxErrors int `json:"max_errors,omitempty"` // Max errors before failing

	// Progress tracking
	CallbackURL string `json:"callback_url,omitempty"` // Progress updates
}

DataProcessPayload represents the payload for data processing tasks

type DataSourceConfig

type DataSourceConfig struct {
	URL        string                 `json:"url,omitempty"`
	Path       string                 `json:"path,omitempty"`
	Query      string                 `json:"query,omitempty"`      // SQL query for database
	Headers    map[string]string      `json:"headers,omitempty"`    // HTTP headers for API
	Auth       AuthConfig             `json:"auth,omitempty"`       // Authentication config
	Parameters map[string]interface{} `json:"parameters,omitempty"` // Source-specific params
}

DataSourceConfig contains source-specific configuration

type DataSourceType

type DataSourceType string

DataSourceType defines supported data sources

const (
	DataSourceCSV      DataSourceType = "csv"
	DataSourceJSON     DataSourceType = "json"
	DataSourceDatabase DataSourceType = "database"
	DataSourceS3       DataSourceType = "s3"
	DataSourceAPI      DataSourceType = "api"
)

type DataTargetConfig

type DataTargetConfig struct {
	URL        string                 `json:"url,omitempty"`
	Path       string                 `json:"path,omitempty"`
	Table      string                 `json:"table,omitempty"`      // Database table
	Headers    map[string]string      `json:"headers,omitempty"`    // HTTP headers for API
	Auth       AuthConfig             `json:"auth,omitempty"`       // Authentication config
	Parameters map[string]interface{} `json:"parameters,omitempty"` // Target-specific params
}

DataTargetConfig contains target-specific configuration

type DataTargetType

type DataTargetType string

DataTargetType defines supported data targets

const (
	DataTargetCSV      DataTargetType = "csv"
	DataTargetJSON     DataTargetType = "json"
	DataTargetDatabase DataTargetType = "database"
	DataTargetS3       DataTargetType = "s3"
	DataTargetAPI      DataTargetType = "api"
)

type EmailAttachment

type EmailAttachment struct {
	Filename    string `json:"filename" validate:"required"`
	ContentType string `json:"content_type,omitempty"`
	Content     []byte `json:"content,omitempty"`     // Base64 encoded content
	ContentURL  string `json:"content_url,omitempty"` // URL to fetch content
	Size        int64  `json:"size,omitempty"`        // Size in bytes
}

EmailAttachment represents an email attachment

type EmailPayload

type EmailPayload struct {
	To      []string `json:"to" validate:"required,min=1"`
	CC      []string `json:"cc,omitempty"`
	BCC     []string `json:"bcc,omitempty"`
	From    string   `json:"from" validate:"required,email"`
	ReplyTo string   `json:"reply_to,omitempty"`
	Subject string   `json:"subject" validate:"required"`

	// Content
	TextBody     string                 `json:"text_body,omitempty"`
	HTMLBody     string                 `json:"html_body,omitempty"`
	TemplateID   string                 `json:"template_id,omitempty"`   // Email template identifier
	TemplateData map[string]interface{} `json:"template_data,omitempty"` // Data for template rendering

	// Attachments
	Attachments []EmailAttachment `json:"attachments,omitempty"`

	// Email provider settings
	Provider string            `json:"provider,omitempty"` // sendgrid, ses, smtp, etc.
	Tags     []string          `json:"tags,omitempty"`     // For categorization/analytics
	Metadata map[string]string `json:"metadata,omitempty"` // Provider-specific metadata

	// Tracking
	TrackOpens  bool `json:"track_opens,omitempty"`
	TrackClicks bool `json:"track_clicks,omitempty"`
}

EmailPayload represents the payload for email processing tasks

type EncryptionConfig

type EncryptionConfig struct {
	Enabled   bool   `json:"enabled" yaml:"enabled"`
	Algorithm string `json:"algorithm" yaml:"algorithm"` // AES-256-GCM
	KeyFile   string `json:"key_file" yaml:"key_file"`
}

EncryptionConfig contains encryption configuration

type ErrorCode

type ErrorCode string

ErrorCode represents specific error types with retry behavior

const (
	// Retryable errors (temporary failures)
	ErrorCodeTimeout            ErrorCode = "timeout"
	ErrorCodeNetworkError       ErrorCode = "network_error"
	ErrorCodeBackendUnavailable ErrorCode = "backend_unavailable"
	ErrorCodeRateLimited        ErrorCode = "rate_limited"
	ErrorCodeCircuitOpen        ErrorCode = "circuit_open"
	ErrorCodeWorkerOverloaded   ErrorCode = "worker_overloaded"
	ErrorCodeTemporaryFailure   ErrorCode = "temporary_failure"

	// Non-retryable errors (permanent failures)
	ErrorCodeInvalidPayload   ErrorCode = "invalid_payload"
	ErrorCodeValidationFailed ErrorCode = "validation_failed"
	ErrorCodeUnauthorized     ErrorCode = "unauthorized"
	ErrorCodeForbidden        ErrorCode = "forbidden"
	ErrorCodeNotFound         ErrorCode = "not_found"
	ErrorCodeDuplicateTask    ErrorCode = "duplicate_task"
	ErrorCodeDeadlineExceeded ErrorCode = "deadline_exceeded"
	ErrorCodeCancelled        ErrorCode = "cancelled"
	ErrorCodePermanentFailure ErrorCode = "permanent_failure"

	// System errors
	ErrorCodeInternalError ErrorCode = "internal_error"
	ErrorCodeConfigError   ErrorCode = "config_error"
	ErrorCodeUnknownError  ErrorCode = "unknown_error"
)

func (ErrorCode) IsRetryable

func (e ErrorCode) IsRetryable() bool

IsRetryable returns true if the error code indicates a retryable failure

type Field

type Field struct {
	Key   string
	Value interface{}
}

Field represents a structured log field

type ImageOperation

type ImageOperation struct {
	Type       ImageOperationType     `json:"type" validate:"required"`
	Parameters map[string]interface{} `json:"parameters,omitempty"`
}

ImageOperation represents a single image processing operation

type ImageOperationType

type ImageOperationType string

ImageOperationType defines available image operations

const (
	ImageOpResize    ImageOperationType = "resize"    // Resize image
	ImageOpCrop      ImageOperationType = "crop"      // Crop image
	ImageOpWatermark ImageOperationType = "watermark" // Add watermark
	ImageOpFilter    ImageOperationType = "filter"    // Apply filters (blur, sharpen, etc.)
	ImageOpRotate    ImageOperationType = "rotate"    // Rotate image
	ImageOpFlip      ImageOperationType = "flip"      // Flip horizontal/vertical
	ImageOpCompress  ImageOperationType = "compress"  // Compress/optimize
)

type ImageProcessPayload

type ImageProcessPayload struct {
	SourceURL  string `json:"source_url,omitempty"`  // URL to source image
	SourceData []byte `json:"source_data,omitempty"` // Raw image data
	SourcePath string `json:"source_path,omitempty"` // File system path

	Operations []ImageOperation `json:"operations" validate:"required,min=1"`
	OutputPath string           `json:"output_path,omitempty"` // Where to save result
	OutputURL  string           `json:"output_url,omitempty"`  // Where to upload result

	// Processing options
	Quality     int    `json:"quality,omitempty"`     // JPEG quality (1-100)
	Format      string `json:"format,omitempty"`      // Output format (jpg, png, webp)
	Progressive bool   `json:"progressive,omitempty"` // Progressive JPEG

	// Metadata preservation
	PreserveMetadata bool `json:"preserve_metadata,omitempty"`

	// Callback
	CallbackURL string `json:"callback_url,omitempty"` // Webhook when complete
}

ImageProcessPayload represents the payload for image processing tasks

type JWTConfig

type JWTConfig struct {
	SecretKey      string        `json:"secret_key" yaml:"secret_key"`
	ExpirationTime time.Duration `json:"expiration_time" yaml:"expiration_time"`
	Issuer         string        `json:"issuer" yaml:"issuer"`
	Audience       string        `json:"audience" yaml:"audience"`
}

JWTConfig contains JWT configuration

type LimitConfig

type LimitConfig struct {
	Limit int `json:"limit" yaml:"limit"` // requests per second
	Burst int `json:"burst" yaml:"burst"` // burst size
}

LimitConfig contains specific rate limit settings

type Logger

type Logger interface {
	Debug(msg string, fields ...Field)
	Info(msg string, fields ...Field)
	Warn(msg string, fields ...Field)
	Error(msg string, fields ...Field)
	With(fields ...Field) Logger
}

Logger defines the interface for structured logging

type LoggingConfig

type LoggingConfig struct {
	Level      string `json:"level" yaml:"level"`       // debug, info, warn, error
	Format     string `json:"format" yaml:"format"`     // json, text
	Output     string `json:"output" yaml:"output"`     // stdout, stderr, file
	File       string `json:"file" yaml:"file"`         // log file path
	MaxSize    int    `json:"max_size" yaml:"max_size"` // MB
	MaxBackups int    `json:"max_backups" yaml:"max_backups"`
	MaxAge     int    `json:"max_age" yaml:"max_age"` // days
	Compress   bool   `json:"compress" yaml:"compress"`

	// Structured logging fields
	Fields map[string]interface{} `json:"fields" yaml:"fields"`
}

LoggingConfig contains logging configuration

type MetricsCollector

type MetricsCollector interface {
	// Task metrics
	RecordTaskEnqueued(taskType TaskType, priority Priority, queue string)
	RecordTaskStarted(taskType TaskType, priority Priority, queue string)
	RecordTaskCompleted(taskType TaskType, priority Priority, queue string, duration time.Duration)
	RecordTaskFailed(taskType TaskType, priority Priority, queue string, duration time.Duration, errorType string)

	// Queue metrics
	UpdateQueueDepth(queue string, depth int64)
	UpdateActiveWorkers(queue string, count int64)

	// Worker metrics
	RecordWorkerRegistered(workerID string, queues []string)
	RecordWorkerUnregistered(workerID string)
	UpdateWorkerStatus(workerID string, status WorkerStatus)

	// Circuit breaker metrics
	RecordCircuitBreakerOpen(service string)
	RecordCircuitBreakerClosed(service string)
	RecordCircuitBreakerHalfOpen(service string)
}

MetricsCollector defines the interface for metrics collection

type MetricsConfig

type MetricsConfig struct {
	Enabled bool   `json:"enabled" yaml:"enabled"`
	Port    int    `json:"port" yaml:"port"`
	Path    string `json:"path" yaml:"path"`

	// Prometheus settings
	Namespace string `json:"namespace" yaml:"namespace"`
	Subsystem string `json:"subsystem" yaml:"subsystem"`

	// Collection intervals
	CollectInterval time.Duration `json:"collect_interval" yaml:"collect_interval"`

	// Health check endpoints
	HealthPath string `json:"health_path" yaml:"health_path"`
	ReadyPath  string `json:"ready_path" yaml:"ready_path"`
}

MetricsConfig contains monitoring configuration

type PostgresConfig

type PostgresConfig struct {
	Host           string        `json:"host" yaml:"host"`
	Port           int           `json:"port" yaml:"port"`
	Database       string        `json:"database" yaml:"database"`
	Username       string        `json:"username" yaml:"username"`
	Password       string        `json:"password" yaml:"password"`
	SSLMode        string        `json:"ssl_mode" yaml:"ssl_mode"`
	MaxConnections int           `json:"max_connections" yaml:"max_connections"`
	MaxIdleTime    time.Duration `json:"max_idle_time" yaml:"max_idle_time"`
	MaxLifetime    time.Duration `json:"max_lifetime" yaml:"max_lifetime"`
	ConnectTimeout time.Duration `json:"connect_timeout" yaml:"connect_timeout"`
}

PostgresConfig contains PostgreSQL-specific configuration

type PrefixedIDGenerator

type PrefixedIDGenerator struct {
	Prefix string
}

PrefixedIDGenerator generates IDs with a prefix

func (*PrefixedIDGenerator) Generate

func (g *PrefixedIDGenerator) Generate() string

type Priority

type Priority string

Priority defines task execution priority levels

const (
	PriorityCritical Priority = "critical" // Financial calculations, payment processing
	PriorityHigh     Priority = "high"     // User-facing operations, webhooks
	PriorityNormal   Priority = "normal"   // Background processing, reports
	PriorityLow      Priority = "low"      // Analytics, cleanup tasks
)

type QueueBackend

type QueueBackend interface {
	// Task operations
	Enqueue(ctx context.Context, task *Task) error
	Dequeue(ctx context.Context, queue string, timeout time.Duration) (*Task, error)
	Ack(ctx context.Context, taskID string) error
	Nack(ctx context.Context, taskID string, reason string) error

	// Batch operations for efficiency
	EnqueueBatch(ctx context.Context, tasks []*Task) error
	DequeueBatch(ctx context.Context, queue string, count int, timeout time.Duration) ([]*Task, error)

	// Task management
	GetTask(ctx context.Context, taskID string) (*Task, error)
	UpdateTask(ctx context.Context, task *Task) error
	DeleteTask(ctx context.Context, taskID string) error

	// Queue management
	GetQueueStats(ctx context.Context, queue string) (*QueueStats, error)
	ListQueues(ctx context.Context) ([]string, error)
	PurgeQueue(ctx context.Context, queue string) error

	// Dead letter queue operations
	MoveToDLQ(ctx context.Context, taskID string, reason string) error
	RequeueFromDLQ(ctx context.Context, taskID string) error

	// Retry and scheduling
	ScheduleRetry(ctx context.Context, taskID string, retryAt time.Time) error
	GetScheduledTasks(ctx context.Context, before time.Time, limit int) ([]*Task, error)

	// Health and monitoring
	HealthCheck(ctx context.Context) error
	Close() error
}

QueueBackend defines the interface for different queue implementations This allows us to swap between Redis, PostgreSQL, NATS, etc.

type QueueConfig

type QueueConfig struct {
	Backend    string        `json:"backend" yaml:"backend"`         // redis, postgres, nats
	URL        string        `json:"url" yaml:"url"`                 // Connection URL
	MaxRetries int           `json:"max_retries" yaml:"max_retries"` // Connection retries
	Timeout    time.Duration `json:"timeout" yaml:"timeout"`         // Operation timeout

	// Redis-specific settings
	Redis RedisConfig `json:"redis" yaml:"redis"`

	// PostgreSQL-specific settings
	Postgres PostgresConfig `json:"postgres" yaml:"postgres"`

	// Default queue settings
	DefaultQueue string `json:"default_queue" yaml:"default_queue"`
	MaxQueueSize int    `json:"max_queue_size" yaml:"max_queue_size"`

	// Task retention
	CompletedTaskTTL time.Duration `json:"completed_task_ttl" yaml:"completed_task_ttl"`
	FailedTaskTTL    time.Duration `json:"failed_task_ttl" yaml:"failed_task_ttl"`
}

QueueConfig contains queue backend configuration

type QueueStats

type QueueStats struct {
	QueueName       string             `json:"queue_name"`
	PendingTasks    int64              `json:"pending_tasks"`
	RunningTasks    int64              `json:"running_tasks"`
	CompletedTasks  int64              `json:"completed_tasks"`
	FailedTasks     int64              `json:"failed_tasks"`
	DeadLetterTasks int64              `json:"dead_letter_tasks"`
	TasksByPriority map[Priority]int64 `json:"tasks_by_priority"`
	TasksByType     map[TaskType]int64 `json:"tasks_by_type"`
	LastUpdated     time.Time          `json:"last_updated"`
}

QueueStats provides statistics about a queue

type RateLimitConfig

type RateLimitConfig struct {
	Enabled         bool          `json:"enabled" yaml:"enabled"`
	DefaultLimit    int           `json:"default_limit" yaml:"default_limit"` // requests per second
	DefaultBurst    int           `json:"default_burst" yaml:"default_burst"` // burst size
	CleanupInterval time.Duration `json:"cleanup_interval" yaml:"cleanup_interval"`

	// Per-client and per-task-type limits
	ClientLimits   map[string]LimitConfig   `json:"client_limits" yaml:"client_limits"`
	TaskTypeLimits map[TaskType]LimitConfig `json:"task_type_limits" yaml:"task_type_limits"`
}

RateLimitConfig contains rate limiting configuration

type RateLimiter

type RateLimiter interface {
	// Allow checks if an operation is allowed under the rate limit
	Allow(ctx context.Context, key string) (bool, error)

	// AllowN checks if N operations are allowed
	AllowN(ctx context.Context, key string, n int) (bool, error)

	// Reset resets the rate limiter for a specific key
	Reset(ctx context.Context, key string) error
}

RateLimiter defines the interface for rate limiting

type RedisConfig

type RedisConfig struct {
	DB           int           `json:"db" yaml:"db"`
	Password     string        `json:"password" yaml:"password"`
	MaxRetries   int           `json:"max_retries" yaml:"max_retries"`
	PoolSize     int           `json:"pool_size" yaml:"pool_size"`
	MinIdleConns int           `json:"min_idle_conns" yaml:"min_idle_conns"`
	DialTimeout  time.Duration `json:"dial_timeout" yaml:"dial_timeout"`
	ReadTimeout  time.Duration `json:"read_timeout" yaml:"read_timeout"`
	WriteTimeout time.Duration `json:"write_timeout" yaml:"write_timeout"`
	PoolTimeout  time.Duration `json:"pool_timeout" yaml:"pool_timeout"`
}

RedisConfig contains Redis-specific configuration

type ScheduledPayload

type ScheduledPayload struct {
	CronExpression string     `json:"cron_expression,omitempty"` // Cron expression for recurring
	Timezone       string     `json:"timezone,omitempty"`        // Timezone for execution
	MaxRuns        int        `json:"max_runs,omitempty"`        // Limit number of executions
	StartDate      *time.Time `json:"start_date,omitempty"`      // When to start schedule
	EndDate        *time.Time `json:"end_date,omitempty"`        // When to end schedule

	// The actual task to execute
	TaskType    TaskType    `json:"task_type" validate:"required"`
	TaskPayload interface{} `json:"task_payload" validate:"required"`
	TaskOptions TaskOptions `json:"task_options,omitempty"`
}

ScheduledPayload represents the payload for scheduled/cron tasks

type Scheduler

type Scheduler interface {
	// Start begins the scheduling process
	Start(ctx context.Context) error

	// Stop gracefully stops the scheduler
	Stop(ctx context.Context) error

	// ScheduleTask adds a task to be executed at a specific time
	ScheduleTask(ctx context.Context, task *Task, executeAt time.Time) error

	// ScheduleCron adds a recurring task with cron expression
	ScheduleCron(ctx context.Context, cronExpr string, taskTemplate *Task) error

	// CancelScheduledTask removes a scheduled task
	CancelScheduledTask(ctx context.Context, taskID string) error

	// GetScheduledTasks returns upcoming scheduled tasks
	GetScheduledTasks(ctx context.Context, from, to time.Time) ([]*Task, error)
}

Scheduler defines the interface for task scheduling

type SchedulerConfig

type SchedulerConfig struct {
	Enabled           bool          `json:"enabled" yaml:"enabled"`
	CheckInterval     time.Duration `json:"check_interval" yaml:"check_interval"`
	MaxScheduledTasks int           `json:"max_scheduled_tasks" yaml:"max_scheduled_tasks"`
	Timezone          string        `json:"timezone" yaml:"timezone"`

	// Cleanup settings
	CleanupInterval time.Duration `json:"cleanup_interval" yaml:"cleanup_interval"`
	CleanupAge      time.Duration `json:"cleanup_age" yaml:"cleanup_age"`
}

SchedulerConfig contains scheduler configuration

type SecurityConfig

type SecurityConfig struct {
	// Authentication
	Auth AuthConfig `json:"auth" yaml:"auth"`

	// JWT settings
	JWT JWTConfig `json:"jwt" yaml:"jwt"`

	// API keys
	APIKeys APIKeyConfig `json:"api_keys" yaml:"api_keys"`

	// Encryption
	Encryption EncryptionConfig `json:"encryption" yaml:"encryption"`
}

SecurityConfig contains security-related configuration

type ServiceCircuitConfig

type ServiceCircuitConfig struct {
	Threshold   int           `json:"threshold" yaml:"threshold"`
	Timeout     time.Duration `json:"timeout" yaml:"timeout"`
	MaxRequests int           `json:"max_requests" yaml:"max_requests"`
}

ServiceCircuitConfig contains service-specific circuit breaker settings

type TLSConfig

type TLSConfig struct {
	Enabled  bool   `json:"enabled" yaml:"enabled"`
	CertFile string `json:"cert_file" yaml:"cert_file"`
	KeyFile  string `json:"key_file" yaml:"key_file"`
	CAFile   string `json:"ca_file" yaml:"ca_file"`
}

TLSConfig contains TLS configuration

type Task

type Task struct {
	// Core identifiers
	ID       string     `json:"id" bson:"_id"`            // Unique task identifier
	Type     TaskType   `json:"type" bson:"type"`         // Type of task
	Priority Priority   `json:"priority" bson:"priority"` // Execution priority
	Status   TaskStatus `json:"status" bson:"status"`     // Current status
	Queue    string     `json:"queue" bson:"queue"`       // Target queue name

	// Payload and metadata
	// Payload contains task-specific data as a raw JSON object.
	// The expected structure of Payload depends on the Type field and should match
	// the corresponding typed payload struct defined in payloads.go (e.g., WebhookPayload, EmailPayload, etc.).
	// When creating or processing a Task, marshal/unmarshal Payload to/from the appropriate struct
	// based on Task.Type. See payloads.go for definitions and expected formats.
	Payload  json.RawMessage        `json:"payload" bson:"payload"`   // Task-specific data
	Metadata map[string]interface{} `json:"metadata" bson:"metadata"` // Additional metadata

	// Scheduling and timing
	CreatedAt   time.Time  `json:"created_at" bson:"created_at"`     // When task was created
	ScheduledAt *time.Time `json:"scheduled_at" bson:"scheduled_at"` // When to execute (nil = immediate)
	StartedAt   *time.Time `json:"started_at" bson:"started_at"`     // When processing began
	CompletedAt *time.Time `json:"completed_at" bson:"completed_at"` // When processing finished

	// Retry and error handling
	MaxRetries     int        `json:"max_retries" bson:"max_retries"`         // Maximum retry attempts
	CurrentRetries int        `json:"current_retries" bson:"current_retries"` // Current retry count
	LastError      string     `json:"last_error,omitempty" bson:"last_error"` // Last error message
	NextRetryAt    *time.Time `json:"next_retry_at" bson:"next_retry_at"`     // When to retry next

	// Processing context
	WorkerID      string `json:"worker_id,omitempty" bson:"worker_id"`           // ID of processing worker
	CorrelationID string `json:"correlation_id,omitempty" bson:"correlation_id"` // For request tracing

	// Multi-tenancy and isolation
	TenantID string `json:"tenant_id,omitempty" bson:"tenant_id"` // Tenant identifier

	// Timeout and deadlines
	Timeout    *time.Duration `json:"timeout,omitempty" bson:"timeout"` // Max execution time
	DeadlineAt *time.Time     `json:"deadline_at" bson:"deadline_at"`   // Hard deadline

	// Deduplication
	DedupeKey string `json:"dedupe_key,omitempty" bson:"dedupe_key"` // For preventing duplicates
}

Task represents a unit of work in the TaskForge system

func (*Task) Age

func (t *Task) Age() time.Duration

Age returns how long ago the task was created

func (*Task) CanRetry

func (t *Task) CanRetry() bool

CanRetry checks if a task can be retried

func (*Task) Clone

func (t *Task) Clone() *Task

Clone creates a deep copy of a task

func (*Task) Duration

func (t *Task) Duration() time.Duration

Duration returns how long the task took to complete

func (*Task) IsExpired

func (t *Task) IsExpired() bool

IsExpired checks if a task has exceeded its deadline

func (*Task) QueueTime

func (t *Task) QueueTime() time.Duration

QueueTime returns how long the task waited in the queue before processing

func (*Task) SetPayload

func (t *Task) SetPayload(v interface{}) error

SetPayload marshals the provided value and sets it as the task payload

func (*Task) UnmarshalPayload

func (t *Task) UnmarshalPayload(v interface{}) error

UnmarshalPayload unmarshals the task payload into the provided interface Example:

var webhook WebhookPayload
err := task.UnmarshalPayload(&webhook)

type TaskBuilder

type TaskBuilder struct {
	// contains filtered or unexported fields
}

TaskBuilder provides a fluent interface for building tasks

func NewTaskBuilder

func NewTaskBuilder(taskType TaskType) *TaskBuilder

NewTaskBuilder creates a new task builder

func (*TaskBuilder) Build

func (b *TaskBuilder) Build() (*Task, error)

Build returns the constructed task or an error if building failed

func (*TaskBuilder) MustBuild

func (b *TaskBuilder) MustBuild() *Task

MustBuild returns the constructed task or panics if there's an error Use this only in tests or when you're certain the build will succeed

func (*TaskBuilder) WithCorrelationID

func (b *TaskBuilder) WithCorrelationID(correlationID string) *TaskBuilder

WithCorrelationID sets the correlation ID

func (*TaskBuilder) WithDeadline

func (b *TaskBuilder) WithDeadline(deadline time.Time) *TaskBuilder

WithDeadline sets the task deadline

func (*TaskBuilder) WithDedupeKey

func (b *TaskBuilder) WithDedupeKey(key string) *TaskBuilder

WithDedupeKey sets the deduplication key

func (*TaskBuilder) WithID

func (b *TaskBuilder) WithID(id string) *TaskBuilder

WithID sets the task ID

func (*TaskBuilder) WithMaxRetries

func (b *TaskBuilder) WithMaxRetries(maxRetries int) *TaskBuilder

WithMaxRetries sets the maximum retry attempts

func (*TaskBuilder) WithMetadata

func (b *TaskBuilder) WithMetadata(key string, value interface{}) *TaskBuilder

WithMetadata adds metadata to the task

func (*TaskBuilder) WithPayload

func (b *TaskBuilder) WithPayload(payload interface{}) *TaskBuilder

WithPayload sets the task payload The payload must be a valid JSON-serializable value

func (*TaskBuilder) WithPriority

func (b *TaskBuilder) WithPriority(priority Priority) *TaskBuilder

WithPriority sets the task priority

func (*TaskBuilder) WithQueue

func (b *TaskBuilder) WithQueue(queue string) *TaskBuilder

WithQueue sets the target queue

func (*TaskBuilder) WithRawPayload

func (b *TaskBuilder) WithRawPayload(payload json.RawMessage) *TaskBuilder

WithRawPayload sets the task payload from already-serialized JSON

func (*TaskBuilder) WithScheduledAt

func (b *TaskBuilder) WithScheduledAt(scheduledAt time.Time) *TaskBuilder

WithScheduledAt sets when the task should be executed

func (*TaskBuilder) WithTenantID

func (b *TaskBuilder) WithTenantID(tenantID string) *TaskBuilder

WithTenantID sets the tenant ID

func (*TaskBuilder) WithTimeout

func (b *TaskBuilder) WithTimeout(timeout time.Duration) *TaskBuilder

WithTimeout sets the task timeout

type TaskError

type TaskError struct {
	TaskID    string    `json:"task_id"`
	TaskType  TaskType  `json:"task_type"`
	Code      ErrorCode `json:"code"`
	Message   string    `json:"message"`
	Details   string    `json:"details,omitempty"`
	Retryable bool      `json:"retryable"`
	Cause     error     `json:"-"` // Original error (not serialized)
}

TaskError represents a task-specific error with additional context

func NewTaskError

func NewTaskError(taskID string, taskType TaskType, code ErrorCode, message string) *TaskError

NewTaskError creates a new TaskError

func NewTaskErrorWithCause

func NewTaskErrorWithCause(taskID string, taskType TaskType, code ErrorCode, message string, cause error) *TaskError

NewTaskErrorWithCause creates a new TaskError with an underlying cause

func (*TaskError) Error

func (e *TaskError) Error() string

func (*TaskError) Unwrap

func (e *TaskError) Unwrap() error

type TaskIDGenerator

type TaskIDGenerator interface {
	Generate() string
}

TaskIDGenerator generates unique task IDs

type TaskOptions

type TaskOptions struct {
	Priority      Priority               `json:"priority,omitempty"`
	Queue         string                 `json:"queue,omitempty"`
	MaxRetries    *int                   `json:"max_retries,omitempty"`
	Timeout       *time.Duration         `json:"timeout,omitempty"`
	ScheduledAt   *time.Time             `json:"scheduled_at,omitempty"`
	DeadlineAt    *time.Time             `json:"deadline_at,omitempty"`
	DedupeKey     string                 `json:"dedupe_key,omitempty"`
	Metadata      map[string]interface{} `json:"metadata,omitempty"`
	CorrelationID string                 `json:"correlation_id,omitempty"`
	TenantID      string                 `json:"tenant_id,omitempty"`
}

TaskOptions provides configuration for task creation

type TaskProcessor

type TaskProcessor interface {
	// Process executes a task and returns the result
	Process(ctx context.Context, task *Task) (*TaskResult, error)

	// GetSupportedTypes returns the task types this processor can handle
	GetSupportedTypes() []TaskType

	// GetCapabilities returns additional capabilities (e.g., "image-resize", "webhook-v2")
	GetCapabilities() []string
}

TaskProcessor defines the interface for task execution

type TaskResult

type TaskResult struct {
	TaskID      string                 `json:"task_id"`
	Status      TaskStatus             `json:"status"`
	Result      json.RawMessage        `json:"result,omitempty"`   // Success result data
	Error       string                 `json:"error,omitempty"`    // Error message if failed
	Duration    time.Duration          `json:"duration"`           // Execution time
	Metadata    map[string]interface{} `json:"metadata,omitempty"` // Additional result metadata
	CompletedAt time.Time              `json:"completed_at"`
	WorkerID    string                 `json:"worker_id"`
}

TaskResult represents the outcome of task execution

type TaskStatus

type TaskStatus string

TaskStatus represents the current state of a task

const (
	TaskStatusPending    TaskStatus = "pending"     // Queued, awaiting processing
	TaskStatusRunning    TaskStatus = "running"     // Currently being processed
	TaskStatusCompleted  TaskStatus = "completed"   // Successfully completed
	TaskStatusFailed     TaskStatus = "failed"      // Failed (will retry if attempts remain)
	TaskStatusDeadLetter TaskStatus = "dead_letter" // Failed permanently, moved to DLQ
	TaskStatusCancelled  TaskStatus = "cancelled"   // Cancelled by user
)

type TaskTransformer

type TaskTransformer interface {
	// Transform modifies a task before it's enqueued (e.g., add defaults, enrich data)
	Transform(task *Task) (*Task, error)

	// GetSupportedTypes returns the task types this transformer handles
	GetSupportedTypes() []TaskType
}

TaskTransformer defines the interface for task transformation

type TaskType

type TaskType string

TaskType defines the type of task to be executed

const (
	TaskTypeWebhook      TaskType = "webhook"       // HTTP webhook delivery
	TaskTypeEmail        TaskType = "email"         // Email processing
	TaskTypeImageProcess TaskType = "image_process" // Image operations
	TaskTypeDataProcess  TaskType = "data_process"  // Data transformations
	TaskTypeScheduled    TaskType = "scheduled"     // Cron/scheduled tasks
	TaskTypeBatch        TaskType = "batch"         // Bulk operations
)

type TaskValidator

type TaskValidator interface {
	// Validate checks if a task is valid and can be processed
	Validate(task *Task) error

	// ValidatePayload validates task-specific payload
	ValidatePayload(taskType TaskType, payload []byte) error
}

TaskValidator defines the interface for task validation

type UUIDGenerator

type UUIDGenerator struct{}

UUIDGenerator generates UUID-based task IDs

func (*UUIDGenerator) Generate

func (g *UUIDGenerator) Generate() string

type ValidationError

type ValidationError struct {
	Field   string      `json:"field"`
	Value   interface{} `json:"value"`
	Tag     string      `json:"tag"`
	Message string      `json:"message"`
}

ValidationError represents field validation errors

func (*ValidationError) Error

func (e *ValidationError) Error() string

type ValidationErrors

type ValidationErrors []*ValidationError

ValidationErrors represents multiple validation errors

func (ValidationErrors) Error

func (e ValidationErrors) Error() string

type WebhookPayload

type WebhookPayload struct {
	URL     string            `json:"url" validate:"required,url"`
	Method  string            `json:"method" validate:"required,oneof=GET POST PUT PATCH DELETE"`
	Headers map[string]string `json:"headers,omitempty"`
	Body    interface{}       `json:"body,omitempty"`

	// Webhook-specific options
	Timeout         time.Duration `json:"timeout,omitempty"`          // Request timeout
	FollowRedirects bool          `json:"follow_redirects,omitempty"` // Follow HTTP redirects
	VerifySSL       bool          `json:"verify_ssl,omitempty"`       // Verify SSL certificates
	ExpectedStatus  []int         `json:"expected_status,omitempty"`  // Expected HTTP status codes
	Secret          string        `json:"secret,omitempty"`           // For HMAC signature
	SignatureHeader string        `json:"signature_header,omitempty"` // Header name for signature

	// Circuit breaker settings
	CircuitBreakerKey string `json:"circuit_breaker_key,omitempty"` // Group webhooks for circuit breaking
}

WebhookPayload represents the payload for HTTP webhook delivery tasks

type Worker

type Worker interface {
	// Start begins processing tasks from specified queues
	Start(ctx context.Context, queues []string) error

	// Stop gracefully stops the worker, finishing current tasks
	Stop(ctx context.Context) error

	// RegisterProcessor adds a task processor for specific task types
	RegisterProcessor(taskType TaskType, processor TaskProcessor) error

	// GetInfo returns current worker information
	GetInfo() *WorkerInfo

	// Heartbeat updates the worker's status and metadata
	Heartbeat(ctx context.Context) error
}

Worker defines the interface for task workers

type WorkerConfig

type WorkerConfig struct {
	ID                string        `json:"id" yaml:"id"`                   // Worker identifier
	Queues            []string      `json:"queues" yaml:"queues"`           // Queues to process
	Concurrency       int           `json:"concurrency" yaml:"concurrency"` // Concurrent tasks
	Timeout           time.Duration `json:"timeout" yaml:"timeout"`         // Task timeout
	HeartbeatInterval time.Duration `json:"heartbeat_interval" yaml:"heartbeat_interval"`
	ShutdownTimeout   time.Duration `json:"shutdown_timeout" yaml:"shutdown_timeout"`

	// Retry configuration
	MaxRetries    int           `json:"max_retries" yaml:"max_retries"`
	RetryBackoff  string        `json:"retry_backoff" yaml:"retry_backoff"` // exponential, linear, fixed
	InitialDelay  time.Duration `json:"initial_delay" yaml:"initial_delay"`
	MaxDelay      time.Duration `json:"max_delay" yaml:"max_delay"`
	BackoffFactor float64       `json:"backoff_factor" yaml:"backoff_factor"`

	// Resource limits
	MaxMemoryMB   int `json:"max_memory_mb" yaml:"max_memory_mb"`
	MaxCPUPercent int `json:"max_cpu_percent" yaml:"max_cpu_percent"`

	// Task type filters
	SupportedTypes []TaskType `json:"supported_types" yaml:"supported_types"`
	Capabilities   []string   `json:"capabilities" yaml:"capabilities"`
}

WorkerConfig contains worker configuration

type WorkerInfo

type WorkerInfo struct {
	ID            string            `json:"id"`
	Hostname      string            `json:"hostname"`
	Version       string            `json:"version"`
	Queues        []string          `json:"queues"` // Queues this worker processes
	Status        WorkerStatus      `json:"status"`
	RegisteredAt  time.Time         `json:"registered_at"`
	LastHeartbeat time.Time         `json:"last_heartbeat"`
	CurrentTasks  []string          `json:"current_tasks"` // Currently processing task IDs
	Capabilities  []string          `json:"capabilities"`  // Task types this worker can handle
	Metadata      map[string]string `json:"metadata"`
}

WorkerInfo represents information about a worker

type WorkerStatus

type WorkerStatus string

WorkerStatus represents the current state of a worker

const (
	WorkerStatusIdle        WorkerStatus = "idle"        // Available for work
	WorkerStatusBusy        WorkerStatus = "busy"        // Processing tasks
	WorkerStatusDraining    WorkerStatus = "draining"    // Finishing current tasks, no new ones
	WorkerStatusOffline     WorkerStatus = "offline"     // Disconnected
	WorkerStatusMaintenance WorkerStatus = "maintenance" // Under maintenance
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL