Documentation
¶
Index ¶
- Variables
- type Config
- type DatabaseQueue
- func (q *DatabaseQueue) Done(ctx context.Context, id string, exitStatus model.StatusValue) error
- func (q *DatabaseQueue) Error(ctx context.Context, id string, err error) error
- func (q *DatabaseQueue) ErrorAtOnce(ctx context.Context, ids []string, err error) error
- func (q *DatabaseQueue) Evict(ctx context.Context, taskID string) error
- func (q *DatabaseQueue) EvictAtOnce(ctx context.Context, taskIDs []string) error
- func (q *DatabaseQueue) Extend(ctx context.Context, agentID int64, taskID string) error
- func (q *DatabaseQueue) GetRunningWorkflowIDs() []string
- func (q *DatabaseQueue) Info(ctx context.Context) InfoT
- func (q *DatabaseQueue) KickAgentWorkers(agentID int64)
- func (q *DatabaseQueue) Pause()
- func (q *DatabaseQueue) Poll(ctx context.Context, agentID int64, filter FilterFn) (*model.Task, error)
- func (q *DatabaseQueue) Push(ctx context.Context, task *model.Task) error
- func (q *DatabaseQueue) PushAtOnce(ctx context.Context, tasks []*model.Task) error
- func (q *DatabaseQueue) Reclaim(_ context.Context, task *model.Task) error
- func (q *DatabaseQueue) Resume()
- func (q *DatabaseQueue) Wait(ctx context.Context, taskID string) error
- type FilterFn
- type InfoT
- type LockService
- type Queue
- type Type
Constants ¶
This section is empty.
Variables ¶
var ( // ErrCancel indicates the task was canceled. ErrCancel = errors.New("queue: task canceled") // ErrNotFound indicates the task was not found in the queue. ErrNotFound = errors.New("queue: task not found") // ErrAgentMissMatch indicates a task is assigned to a different agent. ErrAgentMissMatch = errors.New("task assigned to different agent") )
var ErrWorkerKicked = fmt.Errorf("worker was kicked")
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
Backend Type
Store store.Store
LockService LockService
InstanceID string
LeaderLockTTL time.Duration // TTL for leader election lock
}
Config holds the configuration for the queue.
type DatabaseQueue ¶
type DatabaseQueue struct {
// contains filtered or unexported fields
}
DatabaseQueue implements a distributed queue using the database as the backend. This replaces the in-memory FIFO queue to enable horizontal scaling.
func (*DatabaseQueue) Done ¶
func (q *DatabaseQueue) Done(ctx context.Context, id string, exitStatus model.StatusValue) error
Done marks a task as completed and removes it from the database.
func (*DatabaseQueue) ErrorAtOnce ¶
ErrorAtOnce marks multiple tasks as failed.
func (*DatabaseQueue) Evict ¶
func (q *DatabaseQueue) Evict(ctx context.Context, taskID string) error
Evict removes a pending task from the queue.
func (*DatabaseQueue) EvictAtOnce ¶
func (q *DatabaseQueue) EvictAtOnce(ctx context.Context, taskIDs []string) error
EvictAtOnce removes multiple pending tasks from the queue.
func (*DatabaseQueue) Extend ¶
Extend extends the task execution deadline by updating assignment time.
func (*DatabaseQueue) GetRunningWorkflowIDs ¶
func (q *DatabaseQueue) GetRunningWorkflowIDs() []string
GetRunningWorkflowIDs returns workflow IDs that are currently assigned.
func (*DatabaseQueue) Info ¶
func (q *DatabaseQueue) Info(ctx context.Context) InfoT
Info returns queue information from the database.
func (*DatabaseQueue) KickAgentWorkers ¶
func (q *DatabaseQueue) KickAgentWorkers(agentID int64)
KickAgentWorkers removes tasks assigned to a specific agent.
func (*DatabaseQueue) Pause ¶
func (q *DatabaseQueue) Pause()
Pause stops the queue from processing new tasks.
func (*DatabaseQueue) Poll ¶
func (q *DatabaseQueue) Poll(ctx context.Context, agentID int64, filter FilterFn) (*model.Task, error)
Poll retrieves and assigns a task to an agent.
func (*DatabaseQueue) PushAtOnce ¶
PushAtOnce pushes multiple tasks to the queue.
func (*DatabaseQueue) Reclaim ¶
Reclaim re-adds a task to the running state after agent restart. For the database queue, this is a no-op since the database already has the task.
func (*DatabaseQueue) Resume ¶
func (q *DatabaseQueue) Resume()
Resume starts the queue processing again.
type FilterFn ¶
Filter filters tasks in the queue. If the Filter returns false, the Task is skipped and not returned to the subscriber. The int return value represents the matching score (higher is better). The int32 return value represents the agent priority (0 = no priority).
type InfoT ¶
type InfoT struct {
Pending []*model.Task `json:"pending"`
WaitingOnDeps []*model.Task `json:"waiting_on_deps"`
Running []*model.Task `json:"running"`
Stats struct {
Workers int `json:"worker_count"`
Pending int `json:"pending_count"`
WaitingOnDeps int `json:"waiting_on_deps_count"`
Running int `json:"running_count"`
} `json:"stats"`
Paused bool `json:"paused"`
} // @name InfoT
InfoT provides runtime information.
type LockService ¶
type LockService interface {
AcquireLock(ctx context.Context, lockName string, ttl time.Duration) (context.Context, context.CancelFunc, error)
}
LockService defines the interface for distributed locking.
type Queue ¶
type Queue interface {
// Push pushes a task to the tail of this queue.
Push(c context.Context, task *model.Task) error
// PushAtOnce pushes multiple tasks to the tail of this queue.
PushAtOnce(c context.Context, tasks []*model.Task) error
// Poll retrieves and removes a task head of this queue.
Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error)
// Extend extends the deadline for a task.
Extend(c context.Context, agentID int64, workflowID string) error
// Done signals the task is complete.
Done(c context.Context, id string, exitStatus model.StatusValue) error
// Error signals the task is done with an error.
Error(c context.Context, id string, err error) error
// ErrorAtOnce signals multiple done are complete with an error.
ErrorAtOnce(c context.Context, ids []string, err error) error
// Evict removes a pending task from the queue.
Evict(c context.Context, id string) error
// EvictAtOnce removes multiple pending tasks from the queue.
EvictAtOnce(c context.Context, ids []string) error
// Wait waits until the task is complete.
Wait(c context.Context, id string) error
// Reclaim re-adds a task to the running state after agent restart.
// This is used when an agent reclaims an orphaned task that it was previously running.
Reclaim(c context.Context, task *model.Task) error
// Info returns internal queue information.
Info(c context.Context) InfoT
// GetRunningWorkflowIDs returns a list of workflow IDs that are currently running
// This is useful for cleanup operations
GetRunningWorkflowIDs() []string
// Pause stops the queue from handing out new work items in Poll
Pause()
// Resume starts the queue again.
Resume()
// KickAgentWorkers kicks all workers for a given agent.
KickAgentWorkers(agentID int64)
}
Queue defines a task queue for scheduling tasks among a pool of workers.
func NewDatabaseQueue ¶
func NewDatabaseQueue(ctx context.Context, store store.Store, lockService LockService, instanceID string, leaderLockTTL time.Duration) Queue
NewDatabaseQueue creates a new database-backed queue for HA deployments.
func NewMemoryQueue ¶
NewMemoryQueue returns a new fifo queue.