Documentation
¶
Overview ¶
Package factory provides configuration-based creation of NATS connections and JetStream consumer configurations.
It integrates with config.Nats and config.NatsConsumer to create NATS connections with NKey, token, or username/password authentication, optional TLS, automatic reconnection with unlimited buffer, and compression.
JetStream consumer configurations are produced by Factory.ConsumerConfigFromConfig, which maps config.NatsConsumer fields to jetstream.ConsumerConfig including delivery policy, ack policy, replay policy, backoff schedules, and performance tuning.
When a health.Coordinator is provided via WithHealthCoordinator, the factory registers a health checker that reports the connection status.
Example:
f := factory.New(factory.WithLogger(logger))
conn, err := f.CreateConnectionFromConfig(cfg.Nats)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
js, err := f.CreateJetStream(conn)
if err != nil {
log.Fatal(err)
}
Index ¶
Constants ¶
const ( // UnlimitedReconnects indicates that reconnection attempts should continue // indefinitely. Passed to [nats.MaxReconnects] when no explicit limit is // configured. UnlimitedReconnects = -1 // UnlimitedReconnectBuffer indicates that the reconnect buffer size is // unlimited, allowing messages to be buffered during reconnection without // dropping. Passed to [nats.ReconnectBufSize]. UnlimitedReconnectBuffer = -1 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Factory ¶
type Factory struct {
corefactory.Base
// contains filtered or unexported fields
}
Factory creates NATS connections and JetStream consumer configurations from config.Nats and config.NatsConsumer. It delegates TLS setup to tlsfactory.Factory when configured.
func New ¶
New creates a new Factory with the given functional options. By default the factory uses a discard logger and no TLS factory; provide WithLogger and WithTlsFactory to override.
func (*Factory) ConsumerConfigFromConfig ¶
func (f *Factory) ConsumerConfigFromConfig( cfg *config.NatsConsumer, ) (*jetstream.ConsumerConfig, error)
ConsumerConfigFromConfig converts config.NatsConsumer to jetstream.ConsumerConfig. It maps delivery policy, ack policy, replay policy, filter subjects, backoff schedules, and performance tuning. When the delivery policy is config.DeliverPolicyByStartSequence or config.DeliverPolicyByStartTime, the corresponding OptStartSeq or OptStartTime field is set.
func (*Factory) CreateConnectionFromConfig ¶
CreateConnectionFromConfig creates a NATS connection from configuration. When config.Nats.ConnectionURI is set, it is used directly as the connection URL. Otherwise, the URL is built by joining the configured Hosts. If a health.Coordinator was provided via WithHealthCoordinator, a health checker is registered under the service name "nats".
func (*Factory) NatsOptionsFromConfig ¶
NatsOptionsFromConfig creates nats.Option values from configuration. It configures timeouts, reconnection behavior (unlimited by default), compression, and logging handlers for disconnect/reconnect/error events.
Authentication is selected automatically based on which credential fields are set in config.Nats: NKey seed, token, or username/password. When config.Nats.ConnectionURI is set, authentication fields are ignored because they are embedded in the URI.
type Option ¶
type Option func(o *options)
Option is a functional option for configuring options.
func WithHealthCoordinator ¶
func WithHealthCoordinator(v *health.Coordinator) Option
WithHealthCoordinator sets the healthCoordinator option.
func WithTlsFactory ¶
func WithTlsFactory(v *tlsfactory.Factory) Option
WithTlsFactory sets the tlsFactory option.