Documentation
¶
Index ¶
- Constants
- Variables
- func GetAllJobStates() ([]State, State)
- func Sanitize[E constraints.Ordered](element E, all func() ([]E, E)) (E, bool)
- func UID() (string, error)
- type Config
- type Definition
- type Executor
- type Handler
- type Job
- type Priority
- type Progress
- type ProgressReporter
- type Scheduler
- func (s *Scheduler) AddRecurring(ctx context.Context, jobUID, jobType, cronDef string, maxDur time.Duration) error
- func (s *Scheduler) CancelJob(ctx context.Context, jobUID string) error
- func (s *Scheduler) GetJobProgress(ctx context.Context, jobUID string) (Progress, error)
- func (s *Scheduler) GetJobProgressForGroup(ctx context.Context, jobGroupUID string) ([]Progress, error)
- func (s *Scheduler) PurgeJobByUID(ctx context.Context, jobUID string) error
- func (s *Scheduler) PurgeJobsByGroupID(ctx context.Context, jobGroupID string) (int64, error)
- func (s *Scheduler) Run(ctx context.Context) error
- func (s *Scheduler) RunJob(ctx context.Context, def Definition) error
- func (s *Scheduler) RunJobs(ctx context.Context, groupID string, defs []Definition) error
- func (s *Scheduler) WaitJobsDone(ctx context.Context)
- type State
- type StateChange
- type Store
Constants ¶
const ( ProgressMin = 0 ProgressMax = 100 )
const ( PubSubTopicCancelJob = "gitfox:job:cancel_job" PubSubTopicStateChange = "gitfox:job:state_change" )
Variables ¶
var WireSet = wire.NewSet( ProvideExecutor, ProvideScheduler, )
Functions ¶
func GetAllJobStates ¶
func Sanitize ¶
func Sanitize[E constraints.Ordered](element E, all func() ([]E, E)) (E, bool)
Types ¶
type Config ¶
type Config struct {
// InstanceID specifis the ID of the instance.
InstanceID string `envconfig:"INSTANCE_ID"`
// MaxRunning is maximum number of jobs that can be running at once.
BackgroundJobsMaxRunning int `envconfig:"JOBS_MAX_RUNNING" default:"10"`
// RetentionTime is the duration after which non-recurring,
// finished and failed jobs will be purged from the DB.
BackgroundJobsRetentionTime time.Duration `envconfig:"JOBS_RETENTION_TIME" default:"120h"` // 5 days
}
type Definition ¶
func (*Definition) Validate ¶
func (def *Definition) Validate() error
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor holds map of Handler objects per each job type registered. The Scheduler uses the Executor to start execution of jobs.
func NewExecutor ¶
NewExecutor creates new Executor.
type Handler ¶
type Handler interface {
Handle(ctx context.Context, input string, fn ProgressReporter) (result string, err error)
}
Handler is a job executor for a specific job type. An implementation should try to honor the context and try to abort the execution as soon as the context is done.
type Job ¶
type Job struct {
UID string `db:"job_uid" gorm:"column:job_uid;primaryKey"`
Created int64 `db:"job_created" gorm:"column:job_created"`
Updated int64 `db:"job_updated" gorm:"column:job_updated"`
Type string `db:"job_type" gorm:"column:job_type"`
Priority Priority `db:"job_priority" gorm:"column:job_priority"`
Data string `db:"job_data" gorm:"column:job_data"`
Result string `db:"job_result" gorm:"column:job_result"`
MaxDurationSeconds int `db:"job_max_duration_seconds" gorm:"column:job_max_duration_seconds"`
MaxRetries int `db:"job_max_retries" gorm:"column:job_max_retries"`
State State `db:"job_state" gorm:"column:job_state"`
Scheduled int64 `db:"job_scheduled" gorm:"column:job_scheduled"`
TotalExecutions int `db:"job_total_executions" gorm:"column:job_total_executions"`
RunBy string `db:"job_run_by" gorm:"column:job_run_by"`
RunDeadline int64 `db:"job_run_deadline" gorm:"column:job_run_deadline"`
RunProgress int `db:"job_run_progress" gorm:"column:job_run_progress"`
LastExecuted int64 `db:"job_last_executed" gorm:"column:job_last_executed"`
IsRecurring bool `db:"job_is_recurring" gorm:"column:job_is_recurring"`
RecurringCron string `db:"job_recurring_cron" gorm:"column:job_recurring_cron"`
ConsecutiveFailures int `db:"job_consecutive_failures" gorm:"column:job_consecutive_failures"`
LastFailureError string `db:"job_last_failure_error" gorm:"column:job_last_failure_error"`
GroupID string `db:"job_group_id" gorm:"column:job_group_id"`
}
type Progress ¶
type Progress struct {
State State `json:"state"`
Progress int `json:"progress"`
Result string `json:"result,omitempty"`
Failure string `json:"failure,omitempty"`
}
func FailProgress ¶
func FailProgress() Progress
type ProgressReporter ¶
ProgressReporter can be used by a job Handler to report back the execution progress.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler controls execution of background jobs.
func NewScheduler ¶
func ProvideScheduler ¶
func (*Scheduler) AddRecurring ¶
func (*Scheduler) GetJobProgress ¶
func (*Scheduler) GetJobProgressForGroup ¶
func (*Scheduler) PurgeJobByUID ¶
func (*Scheduler) PurgeJobsByGroupID ¶
func (*Scheduler) Run ¶
Run runs the background job scheduler. It's a blocking call. It blocks until the provided context is done.
func (*Scheduler) RunJob ¶
func (s *Scheduler) RunJob(ctx context.Context, def Definition) error
RunJob runs a single job of the type Definition.Type. All parameters a job Handler receives must be inside the Definition.Data string (as JSON or whatever the job Handler can interpret).
func (*Scheduler) RunJobs ¶
RunJobs runs a several jobs. It's more efficient than calling RunJob several times because it locks the DB only once.
func (*Scheduler) WaitJobsDone ¶
WaitJobsDone waits until execution of all jobs has finished. It is intended to be used for graceful shutdown, after the Run method has finished.
type State ¶
type State string
State represents state of a background job.
const ( JobStateScheduled State = "scheduled" JobStateRunning State = "running" JobStateFinished State = "finished" JobStateFailed State = "failed" JobStateCanceled State = "canceled" )
State enumeration.
func (State) IsCompleted ¶
type StateChange ¶
type StateChange struct {
UID string `json:"uid"`
Type string `json:"type"`
State State `json:"state"`
Progress int `json:"progress"`
Result string `json:"result"`
Failure string `json:"failure"`
}
func DecodeStateChange ¶
func DecodeStateChange(payload []byte) (*StateChange, error)
type Store ¶
type Store interface {
// Find fetches a job by its unique identifier.
Find(ctx context.Context, uid string) (*Job, error)
// ListByGroupID fetches all jobs for a group id
ListByGroupID(ctx context.Context, groupID string) ([]*Job, error)
// DeleteByGroupID deletes all jobs for a group id
DeleteByGroupID(ctx context.Context, groupID string) (int64, error)
// Create is used to create a new job.
Create(ctx context.Context, job *Job) error
// Upsert will insert the job in the database if the job didn't already exist,
// or it will update the existing one but only if its definition has changed.
Upsert(ctx context.Context, job *Job) error
// UpdateDefinition is used to update a job definition.
UpdateDefinition(ctx context.Context, job *Job) error
// UpdateExecution is used to update a job before and after execution.
UpdateExecution(ctx context.Context, job *Job) error
// UpdateProgress is used to update a job progress data.
UpdateProgress(ctx context.Context, job *Job) error
// CountRunning returns number of jobs that are currently being run.
CountRunning(ctx context.Context) (int, error)
// ListReady returns a list of jobs that are ready for execution.
ListReady(ctx context.Context, now time.Time, limit int) ([]*Job, error)
// ListDeadlineExceeded returns a list of jobs that have exceeded their execution deadline.
ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*Job, error)
// NextScheduledTime returns a scheduled time of the next ready job.
NextScheduledTime(ctx context.Context, now time.Time) (time.Time, error)
// DeleteOld removes non-recurring jobs that have finished execution or have failed.
DeleteOld(ctx context.Context, olderThan time.Time) (int64, error)
// DeleteByUID deletes a job by its unique identifier.
DeleteByUID(ctx context.Context, jobUID string) error
}