queue

package
v5.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2026 License: Apache-2.0, BSD-3-Clause Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCancel indicates the task was canceled.
	ErrCancel = errors.New("queue: task canceled")

	// ErrNotFound indicates the task was not found in the queue.
	ErrNotFound = errors.New("queue: task not found")

	// ErrAgentMissMatch indicates a task is assigned to a different agent.
	ErrAgentMissMatch = errors.New("task assigned to different agent")
)
View Source
var ErrWorkerKicked = fmt.Errorf("worker was kicked")

Functions

This section is empty.

Types

type Config

type Config struct {
	Backend       Type
	Store         store.Store
	LockService   LockService
	InstanceID    string
	LeaderLockTTL time.Duration // TTL for leader election lock
}

Config holds the configuration for the queue.

type DatabaseQueue

type DatabaseQueue struct {
	// contains filtered or unexported fields
}

DatabaseQueue implements a distributed queue using the database as the backend. This replaces the in-memory FIFO queue to enable horizontal scaling.

func (*DatabaseQueue) Done

func (q *DatabaseQueue) Done(ctx context.Context, id string, exitStatus model.StatusValue) error

Done marks a task as completed and removes it from the database.

func (*DatabaseQueue) Error

func (q *DatabaseQueue) Error(ctx context.Context, id string, err error) error

Error marks a task as failed and removes it from the database.

func (*DatabaseQueue) ErrorAtOnce

func (q *DatabaseQueue) ErrorAtOnce(ctx context.Context, ids []string, err error) error

ErrorAtOnce marks multiple tasks as failed.

func (*DatabaseQueue) Evict

func (q *DatabaseQueue) Evict(ctx context.Context, taskID string) error

Evict removes a pending task from the queue.

func (*DatabaseQueue) EvictAtOnce

func (q *DatabaseQueue) EvictAtOnce(ctx context.Context, taskIDs []string) error

EvictAtOnce removes multiple pending tasks from the queue.

func (*DatabaseQueue) Extend

func (q *DatabaseQueue) Extend(ctx context.Context, agentID int64, taskID string) error

Extend extends the task execution deadline by updating assignment time.

func (*DatabaseQueue) GetRunningWorkflowIDs

func (q *DatabaseQueue) GetRunningWorkflowIDs() []string

GetRunningWorkflowIDs returns workflow IDs that are currently assigned.

func (*DatabaseQueue) Info

func (q *DatabaseQueue) Info(ctx context.Context) InfoT

Info returns queue information from the database.

func (*DatabaseQueue) KickAgentWorkers

func (q *DatabaseQueue) KickAgentWorkers(agentID int64)

KickAgentWorkers removes tasks assigned to a specific agent.

func (*DatabaseQueue) Pause

func (q *DatabaseQueue) Pause()

Pause stops the queue from processing new tasks.

func (*DatabaseQueue) Poll

func (q *DatabaseQueue) Poll(ctx context.Context, agentID int64, filter FilterFn) (*model.Task, error)

Poll retrieves and assigns a task to an agent.

func (*DatabaseQueue) Push

func (q *DatabaseQueue) Push(ctx context.Context, task *model.Task) error

Push pushes a task to the queue by storing it in the database.

func (*DatabaseQueue) PushAtOnce

func (q *DatabaseQueue) PushAtOnce(ctx context.Context, tasks []*model.Task) error

PushAtOnce pushes multiple tasks to the queue.

func (*DatabaseQueue) Reclaim

func (q *DatabaseQueue) Reclaim(_ context.Context, task *model.Task) error

Reclaim re-adds a task to the running state after agent restart. For the database queue, this is a no-op since the database already has the task.

func (*DatabaseQueue) Resume

func (q *DatabaseQueue) Resume()

Resume starts the queue processing again.

func (*DatabaseQueue) Wait

func (q *DatabaseQueue) Wait(ctx context.Context, taskID string) error

Wait waits until a task is completed (removed from database).

type FilterFn

type FilterFn func(*model.Task) (bool, int, int32)

Filter filters tasks in the queue. If the Filter returns false, the Task is skipped and not returned to the subscriber. The int return value represents the matching score (higher is better). The int32 return value represents the agent priority (0 = no priority).

type InfoT

type InfoT struct {
	Pending       []*model.Task `json:"pending"`
	WaitingOnDeps []*model.Task `json:"waiting_on_deps"`
	Running       []*model.Task `json:"running"`
	Stats         struct {
		Workers       int `json:"worker_count"`
		Pending       int `json:"pending_count"`
		WaitingOnDeps int `json:"waiting_on_deps_count"`
		Running       int `json:"running_count"`
	} `json:"stats"`
	Paused bool `json:"paused"`

} //	@name	InfoT

InfoT provides runtime information.

func (*InfoT) String

func (t *InfoT) String() string

type LockService

type LockService interface {
	AcquireLock(ctx context.Context, lockName string, ttl time.Duration) (context.Context, context.CancelFunc, error)
}

LockService defines the interface for distributed locking.

type Queue

type Queue interface {
	// Push pushes a task to the tail of this queue.
	Push(c context.Context, task *model.Task) error

	// PushAtOnce pushes multiple tasks to the tail of this queue.
	PushAtOnce(c context.Context, tasks []*model.Task) error

	// Poll retrieves and removes a task head of this queue.
	Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error)

	// Extend extends the deadline for a task.
	Extend(c context.Context, agentID int64, workflowID string) error

	// Done signals the task is complete.
	Done(c context.Context, id string, exitStatus model.StatusValue) error

	// Error signals the task is done with an error.
	Error(c context.Context, id string, err error) error

	// ErrorAtOnce signals multiple done are complete with an error.
	ErrorAtOnce(c context.Context, ids []string, err error) error

	// Evict removes a pending task from the queue.
	Evict(c context.Context, id string) error

	// EvictAtOnce removes multiple pending tasks from the queue.
	EvictAtOnce(c context.Context, ids []string) error

	// Wait waits until the task is complete.
	Wait(c context.Context, id string) error

	// Reclaim re-adds a task to the running state after agent restart.
	// This is used when an agent reclaims an orphaned task that it was previously running.
	Reclaim(c context.Context, task *model.Task) error

	// Info returns internal queue information.
	Info(c context.Context) InfoT

	// GetRunningWorkflowIDs returns a list of workflow IDs that are currently running
	// This is useful for cleanup operations
	GetRunningWorkflowIDs() []string

	// Pause stops the queue from handing out new work items in Poll
	Pause()

	// Resume starts the queue again.
	Resume()

	// KickAgentWorkers kicks all workers for a given agent.
	KickAgentWorkers(agentID int64)
}

Queue defines a task queue for scheduling tasks among a pool of workers.

func New

func New(ctx context.Context, config Config) (Queue, error)

New creates a new queue based on the provided configuration.

func NewDatabaseQueue

func NewDatabaseQueue(ctx context.Context, store store.Store, lockService LockService, instanceID string, leaderLockTTL time.Duration) Queue

NewDatabaseQueue creates a new database-backed queue for HA deployments.

func NewMemoryQueue

func NewMemoryQueue(ctx context.Context, store store.Store) Queue

NewMemoryQueue returns a new fifo queue.

func WithTaskStore

func WithTaskStore(ctx context.Context, q Queue, s store.Store) Queue

WithTaskStore returns a queue that is backed by the TaskStore. This ensures the task Queue can be restored when the system starts.

type Type

type Type string

Queue type.

const (
	TypeMemory   Type = "memory"
	TypeDatabase Type = "database"

	// DefaultLeaderLockTTL is the default TTL for leader election locks.
	DefaultLeaderLockTTL = 30 * time.Second
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL