factory

package
v0.0.0-...-83d1dff Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package factory provides configuration-based creation of NATS connections and JetStream consumer configurations.

It integrates with config.Nats and config.NatsConsumer to create NATS connections with NKey, token, or username/password authentication, optional TLS, automatic reconnection with unlimited buffer, and compression.

JetStream consumer configurations are produced by Factory.ConsumerConfigFromConfig, which maps config.NatsConsumer fields to jetstream.ConsumerConfig including delivery policy, ack policy, replay policy, backoff schedules, and performance tuning.

When a health.Coordinator is provided via WithHealthCoordinator, the factory registers a health checker that reports the connection status.

Example:

f := factory.New(factory.WithLogger(logger))
conn, err := f.CreateConnectionFromConfig(cfg.Nats)
if err != nil {
    log.Fatal(err)
}
defer conn.Close()

js, err := f.CreateJetStream(conn)
if err != nil {
    log.Fatal(err)
}

Index

Constants

View Source
const (
	// UnlimitedReconnects indicates that reconnection attempts should continue
	// indefinitely. Passed to [nats.MaxReconnects] when no explicit limit is
	// configured.
	UnlimitedReconnects = -1

	// UnlimitedReconnectBuffer indicates that the reconnect buffer size is
	// unlimited, allowing messages to be buffered during reconnection without
	// dropping. Passed to [nats.ReconnectBufSize].
	UnlimitedReconnectBuffer = -1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Factory

type Factory struct {
	corefactory.Base
	// contains filtered or unexported fields
}

Factory creates NATS connections and JetStream consumer configurations from config.Nats and config.NatsConsumer. It delegates TLS setup to tlsfactory.Factory when configured.

func New

func New(opts ...Option) *Factory

New creates a new Factory with the given functional options. By default the factory uses a discard logger and no TLS factory; provide WithLogger and WithTlsFactory to override.

func (*Factory) ConsumerConfigFromConfig

func (f *Factory) ConsumerConfigFromConfig(
	cfg *config.NatsConsumer,
) (*jetstream.ConsumerConfig, error)

ConsumerConfigFromConfig converts config.NatsConsumer to jetstream.ConsumerConfig. It maps delivery policy, ack policy, replay policy, filter subjects, backoff schedules, and performance tuning. When the delivery policy is config.DeliverPolicyByStartSequence or config.DeliverPolicyByStartTime, the corresponding OptStartSeq or OptStartTime field is set.

func (*Factory) CreateConnectionFromConfig

func (f *Factory) CreateConnectionFromConfig(cfg *config.Nats) (*nats.Conn, error)

CreateConnectionFromConfig creates a NATS connection from configuration. When config.Nats.ConnectionURI is set, it is used directly as the connection URL. Otherwise, the URL is built by joining the configured Hosts. If a health.Coordinator was provided via WithHealthCoordinator, a health checker is registered under the service name "nats".

func (*Factory) NatsOptionsFromConfig

func (f *Factory) NatsOptionsFromConfig(cfg *config.Nats) ([]nats.Option, error)

NatsOptionsFromConfig creates nats.Option values from configuration. It configures timeouts, reconnection behavior (unlimited by default), compression, and logging handlers for disconnect/reconnect/error events.

Authentication is selected automatically based on which credential fields are set in config.Nats: NKey seed, token, or username/password. When config.Nats.ConnectionURI is set, authentication fields are ignored because they are embedded in the URI.

type Option

type Option func(o *options)

Option is a functional option for configuring options.

func WithHealthCoordinator

func WithHealthCoordinator(v *health.Coordinator) Option

WithHealthCoordinator sets the healthCoordinator option.

func WithLogger

func WithLogger(v *slog.Logger) Option

WithLogger sets the logger option.

func WithTlsFactory

func WithTlsFactory(v *tlsfactory.Factory) Option

WithTlsFactory sets the tlsFactory option.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL