Documentation
¶
Index ¶
- func ShouldDropMessageByAge(msg Message, now time.Time, maxAge time.Duration) bool
- type ConsumerConfig
- type Handler
- type InMemoryBroker
- func (b *InMemoryBroker) Consume(ctx context.Context, queue string, cfg ConsumerConfig, handler Handler)
- func (b *InMemoryBroker) Depth(queue string) int
- func (b *InMemoryBroker) Health(_ context.Context, queue string) error
- func (b *InMemoryBroker) OldestAge(queue string, now time.Time) time.Duration
- func (b *InMemoryBroker) Publish(_ context.Context, queue string, msg Message) error
- type Message
- type RabbitMQHTTPBroker
- func (b *RabbitMQHTTPBroker) Consume(ctx context.Context, queueName string, cfg ConsumerConfig, handler Handler)
- func (b *RabbitMQHTTPBroker) Depth(queueName string) int
- func (b *RabbitMQHTTPBroker) EnsureTopology(ctx context.Context, cfg TopologyConfig) error
- func (b *RabbitMQHTTPBroker) Health(ctx context.Context, queueName string) error
- func (b *RabbitMQHTTPBroker) OldestAge(queueName string, now time.Time) time.Duration
- func (b *RabbitMQHTTPBroker) Publish(ctx context.Context, queueName string, msg Message) error
- type RabbitMQHTTPConfig
- type RetryPolicy
- type RetryQueueSpec
- type TopologyConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ConsumerConfig ¶
type ConsumerConfig struct {
MaxMessageAge time.Duration
RetryPolicy RetryPolicy
RetryQueues []string
DeadLetterQueue string
Now func() time.Time
Sleep func(time.Duration)
}
ConsumerConfig controls in-memory consumer behavior.
type InMemoryBroker ¶
type InMemoryBroker struct {
// contains filtered or unexported fields
}
InMemoryBroker is a simple named-queue broker for tests and local development.
func NewInMemoryBroker ¶
func NewInMemoryBroker(buffer int) *InMemoryBroker
NewInMemoryBroker creates an in-memory broker.
func (*InMemoryBroker) Consume ¶
func (b *InMemoryBroker) Consume(ctx context.Context, queue string, cfg ConsumerConfig, handler Handler)
Consume processes messages from the named queue until context cancellation.
func (*InMemoryBroker) Depth ¶
func (b *InMemoryBroker) Depth(queue string) int
Depth returns the queued item count for one queue.
func (*InMemoryBroker) Health ¶
func (b *InMemoryBroker) Health(_ context.Context, queue string) error
Health reports broker readiness for the requested queue.
type Message ¶
type Message struct {
ID string
Body []byte
Headers map[string]string
CreatedAt time.Time
Attempt int
}
Message is a queue payload with retry metadata.
type RabbitMQHTTPBroker ¶
type RabbitMQHTTPBroker struct {
// contains filtered or unexported fields
}
RabbitMQHTTPBroker uses RabbitMQ management endpoints for queue operations.
func NewRabbitMQHTTPBroker ¶
func NewRabbitMQHTTPBroker(cfg RabbitMQHTTPConfig) (*RabbitMQHTTPBroker, error)
NewRabbitMQHTTPBroker builds a RabbitMQ management API broker.
func (*RabbitMQHTTPBroker) Consume ¶
func (b *RabbitMQHTTPBroker) Consume(ctx context.Context, queueName string, cfg ConsumerConfig, handler Handler)
Consume reads messages from the named queue until context cancellation.
func (*RabbitMQHTTPBroker) Depth ¶
func (b *RabbitMQHTTPBroker) Depth(queueName string) int
Depth returns queued item count for one queue.
func (*RabbitMQHTTPBroker) EnsureTopology ¶
func (b *RabbitMQHTTPBroker) EnsureTopology(ctx context.Context, cfg TopologyConfig) error
EnsureTopology declares the exchange, queues, and bindings required by backfill processing.
func (*RabbitMQHTTPBroker) Health ¶
func (b *RabbitMQHTTPBroker) Health(ctx context.Context, queueName string) error
Health reports broker connectivity by checking queue metadata.
type RabbitMQHTTPConfig ¶
type RabbitMQHTTPConfig struct {
ManagementURL string
VHost string
Exchange string
Username string
//nolint:gosec // Password is required for RabbitMQ basic auth.
Password string
PollInterval time.Duration
HTTPClient *http.Client
Now func() time.Time
Sleep func(time.Duration)
}
RabbitMQHTTPConfig configures the RabbitMQ management API broker.
func RabbitMQHTTPConfigFromAMQPURL ¶
func RabbitMQHTTPConfigFromAMQPURL(amqpURL, exchange string) (RabbitMQHTTPConfig, error)
RabbitMQHTTPConfigFromAMQPURL derives management API config from an AMQP connection URL.
type RetryPolicy ¶
RetryPolicy controls consumer retry behavior.
type RetryQueueSpec ¶
RetryQueueSpec configures one delayed retry queue.
type TopologyConfig ¶
type TopologyConfig struct {
MainQueue string
DeadLetterQueue string
RetryQueues []RetryQueueSpec
}
TopologyConfig declares AMQP exchange/queue resources required by the worker.