projection

package
v0.0.0-...-8cc5ba1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package projection provides tools for managing and rebuilding projections.

Per-Projection Databases

Dir manages per-projection SQLite databases in a directory. Each projection gets its own .db file, enabling parallel rebuilds (no shared write lock), independent failure isolation, and atomic rebuild by deleting a single file.

dir, _ := projection.NewDir("/data/projections")
db, _ := dir.DB("orders")
checkpoint, _ := projection.NewCheckpoint(db)
view := sqlview.From[any](db, ordersConfig)

SQLiteCheckpoint stores checkpoints in the same database as the projection data. Deleting the .db file resets both the projection and its checkpoint.

Rebuild

Rebuild replays all events from a store through projection handlers, resetting checkpoints and processing events in batches for efficiency. Progress reporting and concurrent rebuilds of independent projections are supported.

Index

Constants

View Source
const (
	// DefaultBatchSize is the default number of events per batch during rebuild.
	DefaultBatchSize = 500

	// MaxBatchSize prevents absurd batch sizes.
	MaxBatchSize = 50_000

	// MaxConcurrentRebuilds limits parallel rebuilds.
	MaxConcurrentRebuilds = 32
)

Bounds — TigerStyle.

Variables

This section is empty.

Functions

func Rebuild

func Rebuild[E any](ctx context.Context, cfg RebuildConfig[E]) (uint64, error)

Rebuild replays all events from the store through the handler, resetting the checkpoint to 0 first. Events are processed in batches. Returns the total number of events processed.

Types

type Dir

type Dir struct {
	// contains filtered or unexported fields
}

Dir manages per-projection SQLite databases in a directory. Each projection gets its own .db file: {dir}/{name}.db

This enables parallel rebuilds (no shared write lock), independent failure isolation, and atomic rebuild by deleting a single file.

func NewDir

func NewDir(path string) (*Dir, error)

NewDir creates a projection directory manager. Creates the directory if it doesn't exist.

func (*Dir) Close

func (d *Dir) Close() error

Close closes all open databases.

func (*Dir) DB

func (d *Dir) DB(name string) (*sql.DB, error)

DB returns (or creates) the SQLite database for a projection. The DB is opened with WAL mode and busy_timeout for concurrent reads. File: {dir}/{name}.db

func (*Dir) List

func (d *Dir) List() ([]string, error)

List returns names of all projection databases in the directory.

func (*Dir) Remove

func (d *Dir) Remove(name string) error

Remove closes and deletes a projection's database file (for rebuild). The next call to DB will create a fresh database.

func (*Dir) RemoveAll

func (d *Dir) RemoveAll() error

RemoveAll closes and deletes all projection databases.

type Handler

type Handler[E any] func(ctx context.Context, events []subscription.GlobalEvent[E]) error

Handler processes a batch of events during rebuild.

type Progress

type Progress struct {
	// ProjectionName identifies which projection is being rebuilt.
	ProjectionName string

	// Processed is the number of events processed so far.
	Processed uint64

	// Total is the total number of events to process (approximate).
	Total uint64

	// Done is true when the rebuild is complete.
	Done bool

	// Err is set if the rebuild failed.
	Err error
}

Progress reports rebuild progress to the caller.

type ProgressFunc

type ProgressFunc func(Progress)

ProgressFunc receives progress updates during rebuild.

type RebuildConfig

type RebuildConfig[E any] struct {
	// Name identifies this projection. Required.
	Name string

	// Reader provides the global event stream. Required.
	Reader subscription.GlobalReader[E]

	// Checkpoint to reset and update. Required.
	Checkpoint subscription.Checkpoint

	// ConsumerID is the checkpoint consumer ID to reset. Required.
	ConsumerID string

	// Handler processes batches of events. Required.
	Handler Handler[E]

	// BatchSize controls how many events are read per batch. Default: 500.
	BatchSize int

	// OnProgress receives progress updates. Optional.
	OnProgress ProgressFunc
}

RebuildConfig configures a single projection rebuild.

type RebuildResult

type RebuildResult struct {
	Name      string
	Processed uint64
	Err       error
}

RebuildResult holds the outcome of a single projection rebuild.

func RebuildConcurrent

func RebuildConcurrent[E any](ctx context.Context, configs []RebuildConfig[E]) []RebuildResult

RebuildConcurrent rebuilds multiple independent projections in parallel. Each projection is rebuilt from scratch. Returns results for all projections. The function returns after all rebuilds complete (or fail).

type SQLiteCheckpoint

type SQLiteCheckpoint struct {
	// contains filtered or unexported fields
}

SQLiteCheckpoint implements subscription.Checkpoint for a single projection DB. The checkpoint table is colocated with the projection data, so deleting the .db file resets both the projection and its checkpoint atomically.

func NewCheckpoint

func NewCheckpoint(db *sql.DB) (*SQLiteCheckpoint, error)

NewCheckpoint creates a checkpoint backed by the given SQLite database. Creates the _checkpoint table if it doesn't exist.

func (*SQLiteCheckpoint) Load

func (c *SQLiteCheckpoint) Load(ctx context.Context, consumerID string) (uint64, error)

Load returns the last processed sequence for a consumer. Returns 0 if new.

func (*SQLiteCheckpoint) Save

func (c *SQLiteCheckpoint) Save(ctx context.Context, consumerID string, sequence uint64) error

Save persists the consumer's position atomically.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL