store

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2026 License: MIT Imports: 16 Imported by: 0

README

internal/store

Logic overview

The store package manages shared metric state and distributed locks.

  • MemoryStore provides an in-process implementation for tests/local runs.
  • RedisStore provides shared HA-compatible storage over Redis.
  • Both implementations enforce role/source write guards and support dedup/job locks.
  • Both implementations persist org/repo checkpoints used for scrape window advancement.
  • Redis indexes metric series by shard (IndexShards) to avoid single-set hotspots at high cardinality.
  • Metric retention is handled by GC (memory) and key TTL + index cleanup (Redis).

API reference

Types
  • RuntimeRole: writer role enum (leader, follower).
  • WriteSource: write-path enum (leader_scrape, worker_backfill).
  • MetricPoint: one metric sample (Name, Labels, Value, UpdatedAt).
  • SnapshotDeltaEvent: one incremental series change (SeriesID, Point, Deleted).
  • SnapshotDelta: incremental change batch with NextCursor.
  • MemoryStore: in-memory shared store.
  • RedisStoreConfig: Redis store settings (Namespace, Retention, MaxSeries, IndexShards).
  • RedisStore: Redis-backed shared store.
Functions
  • NewMemoryStore(retention time.Duration, maxSeries int) *MemoryStore: creates memory store.
  • NewRedisStore(client redis.UniversalClient, cfg RedisStoreConfig) *RedisStore: creates Redis-backed store.
Methods
MemoryStore
  • UpsertMetric(role RuntimeRole, source WriteSource, point MetricPoint) error: validates and upserts one series sample.
  • AcquireJobLock(jobID string, ttl time.Duration, now time.Time) bool: acquires idempotency lock.
  • AcquireDedupLock(key string, ttl time.Duration, now time.Time) bool: acquires dedup lock.
  • Acquire(key string, ttl time.Duration, now time.Time) bool: deduper adapter alias.
  • SetCheckpoint(org, repo string, checkpoint time.Time) error: persists last successful scrape timestamp for one repo.
  • GetCheckpoint(org, repo string) (time.Time, bool, error): returns last checkpoint for one repo.
  • Healthy(ctx context.Context) bool: reports backend health for readiness probes.
  • GC(now time.Time): removes expired metrics and expired lock entries.
  • Snapshot() []MetricPoint: returns sorted metric snapshot.
RedisStore
  • Close() error: closes underlying Redis client.
  • UpsertMetric(role RuntimeRole, source WriteSource, point MetricPoint) error: validates/upserts series to Redis hash + index.
  • AcquireJobLock(jobID string, ttl time.Duration, now time.Time) bool: acquires Redis lock key with TTL.
  • AcquireDedupLock(key string, ttl time.Duration, now time.Time) bool: acquires dedup lock key with TTL.
  • Acquire(key string, ttl time.Duration, now time.Time) bool: deduper adapter alias.
  • SetCheckpoint(org, repo string, checkpoint time.Time) error: writes checkpoint with retention TTL.
  • GetCheckpoint(org, repo string) (time.Time, bool, error): reads checkpoint for one repo.
  • Healthy(ctx context.Context) bool: checks Redis command health for active dependency probes.
  • GC(now time.Time): cleans stale index members whose metric keys have expired.
  • Snapshot() []MetricPoint: reads indexed series from Redis and returns sorted snapshot.
  • SnapshotCursor() (uint64, error): returns current incremental cursor sequence.
  • SnapshotDelta(cursor uint64) (SnapshotDelta, error): returns changed/deleted series after a cursor for incremental exporter refresh.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is an in-memory shared metric store.

func NewMemoryStore

func NewMemoryStore(retention time.Duration, maxSeries int) *MemoryStore

NewMemoryStore creates a memory store.

func (*MemoryStore) Acquire

func (s *MemoryStore) Acquire(key string, ttl time.Duration, now time.Time) bool

Acquire acquires a dedup lock for a key. It is an adapter for queue deduper interfaces.

func (*MemoryStore) AcquireDedupLock

func (s *MemoryStore) AcquireDedupLock(key string, ttl time.Duration, now time.Time) bool

AcquireDedupLock acquires a dedup lock for a key.

func (*MemoryStore) AcquireJobLock

func (s *MemoryStore) AcquireJobLock(jobID string, ttl time.Duration, now time.Time) bool

AcquireJobLock acquires an idempotency lock for a job id.

func (*MemoryStore) GC

func (s *MemoryStore) GC(now time.Time)

GC deletes expired metrics and locks.

func (*MemoryStore) GetCheckpoint

func (s *MemoryStore) GetCheckpoint(org, repo string) (time.Time, bool, error)

GetCheckpoint returns the last processed timestamp for one org/repo.

func (*MemoryStore) Healthy

func (s *MemoryStore) Healthy(_ context.Context) bool

Healthy reports memory store availability.

func (*MemoryStore) SetCheckpoint

func (s *MemoryStore) SetCheckpoint(org, repo string, checkpoint time.Time) error

SetCheckpoint stores the latest processed timestamp for one org/repo.

func (*MemoryStore) Snapshot

func (s *MemoryStore) Snapshot() []MetricPoint

Snapshot returns all non-expired metrics.

func (*MemoryStore) UpsertMetric

func (s *MemoryStore) UpsertMetric(role RuntimeRole, source WriteSource, point MetricPoint) error

UpsertMetric inserts or updates a metric point with role/source write guards.

type MetricPoint

type MetricPoint struct {
	Name      string
	Labels    map[string]string
	Value     float64
	UpdatedAt time.Time
}

MetricPoint is a single metric sample.

type RedisStore

type RedisStore struct {
	// contains filtered or unexported fields
}

RedisStore stores shared metrics and lock state in Redis.

func NewRedisStore

func NewRedisStore(client redis.UniversalClient, cfg RedisStoreConfig) *RedisStore

NewRedisStore creates a Redis-backed metric store.

func (*RedisStore) Acquire

func (s *RedisStore) Acquire(key string, ttl time.Duration, now time.Time) bool

Acquire acquires a dedup lock for a key. It is an adapter for queue deduper interfaces.

func (*RedisStore) AcquireDedupLock

func (s *RedisStore) AcquireDedupLock(key string, ttl time.Duration, now time.Time) bool

AcquireDedupLock acquires a dedup lock for a key.

func (*RedisStore) AcquireJobLock

func (s *RedisStore) AcquireJobLock(jobID string, ttl time.Duration, now time.Time) bool

AcquireJobLock acquires an idempotency lock for a job id.

func (*RedisStore) Close

func (s *RedisStore) Close() error

Close closes the underlying Redis client.

func (*RedisStore) GC

func (s *RedisStore) GC(_ time.Time)

GC removes stale metric index references where series keys have already expired.

func (*RedisStore) GetCheckpoint

func (s *RedisStore) GetCheckpoint(org, repo string) (time.Time, bool, error)

GetCheckpoint returns the latest processed timestamp for one org/repo.

func (*RedisStore) Healthy

func (s *RedisStore) Healthy(ctx context.Context) bool

Healthy reports Redis connectivity for dependency health probing.

func (*RedisStore) SetCheckpoint

func (s *RedisStore) SetCheckpoint(org, repo string, checkpoint time.Time) error

SetCheckpoint stores the latest processed timestamp for one org/repo.

func (*RedisStore) Snapshot

func (s *RedisStore) Snapshot() []MetricPoint

Snapshot returns all currently available metric series from Redis.

func (*RedisStore) SnapshotCursor

func (s *RedisStore) SnapshotCursor() (uint64, error)

SnapshotCursor returns the current incremental snapshot cursor.

func (*RedisStore) SnapshotDelta

func (s *RedisStore) SnapshotDelta(cursor uint64) (SnapshotDelta, error)

SnapshotDelta returns incremental series changes after the provided cursor.

func (*RedisStore) UpsertMetric

func (s *RedisStore) UpsertMetric(role RuntimeRole, source WriteSource, point MetricPoint) error

UpsertMetric inserts or updates a metric point with role/source write guards.

type RedisStoreConfig

type RedisStoreConfig struct {
	Namespace   string
	Retention   time.Duration
	MaxSeries   int
	IndexShards int
}

RedisStoreConfig configures the Redis-backed shared metric store.

type RuntimeRole

type RuntimeRole string

RuntimeRole represents the current runtime role.

const (
	// RoleLeader is the leader role.
	RoleLeader RuntimeRole = "leader"
	// RoleFollower is the follower role.
	RoleFollower RuntimeRole = "follower"
)

type SnapshotDelta

type SnapshotDelta struct {
	NextCursor uint64
	Events     []SnapshotDeltaEvent
}

SnapshotDelta contains a set of incremental changes after a cursor.

type SnapshotDeltaEvent

type SnapshotDeltaEvent struct {
	SeriesID string
	Point    MetricPoint
	Deleted  bool
}

SnapshotDeltaEvent is one incremental snapshot change for a series.

type WriteSource

type WriteSource string

WriteSource represents the source path for a metric write.

const (
	// SourceLeaderScrape writes originate from scheduled leader scrape.
	SourceLeaderScrape WriteSource = "leader_scrape"
	// SourceWorkerBackfill writes originate from backfill workers.
	SourceWorkerBackfill WriteSource = "worker_backfill"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL