utils

package
v0.0.0-...-69105db Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2026 License: AGPL-3.0 Imports: 61 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AvroLocalStorage = iota
	AvroS3Storage
	AvroGCSStorage
)
View Source
const FullTablePartitionID = "full-table-partition-id"
View Source
const MaxAWSSessionNameLength = 63 // Docs mention 64 as limit, but always good to stay under
View Source
const RDSAuthTokenTTL = 10 * time.Minute

RDSAuthTokenTTL is the cache TTL for RDS auth tokens. RDS Tokens Live for 15 minutes by default

View Source
const SSHKeepaliveInterval = 15 * time.Second

Variables

This section is empty.

Functions

func CreatePeerNoValidate

func CreatePeerNoValidate(
	ctx context.Context,
	pool shared.CatalogPool,
	peer *protos.Peer,
	allowUpdate bool,
) (*protos.CreatePeerResponse, error)

func CreateS3Client

func CreateS3Client(ctx context.Context, credsProvider AWSCredentialsProvider) (*s3.Client, error)

func DefaultOnRecord

func DefaultOnRecord(ls *lua.LState) int

func FileURLForS3Service

func FileURLForS3Service(endpoint string, region string, bucket string, filePath string) string

func FormatTableSize

func FormatTableSize(bytes int64) string

FormatTableSize converts bytes to human-readable format

func GetRDSToken

func GetRDSToken(ctx context.Context, connConfig RDSConnectionConfig, rdsAuth *RDSAuth, connectorName string) (string, error)

func GetSSHClientConfig

func GetSSHClientConfig(config *protos.SSHConfig) (*ssh.ClientConfig, error)

GetSSHClientConfig returns an *ssh.ClientConfig based on provided credentials.

func InitialiseTableRowsMap

func InitialiseTableRowsMap(tableMaps []*protos.TableMapping) map[string]*model.RecordTypeCounts

func IsLower

func IsLower(s string) bool

I think these only work with ASCII?

func IsUpper

func IsUpper(s string) bool

I think these only work with ASCII?

func KeysToString

func KeysToString(m map[string]struct{}) string

func LVAsReadOnlyBytes

func LVAsReadOnlyBytes(ls *lua.LState, v lua.LValue) ([]byte, error)

func LVAsStringOrNil

func LVAsStringOrNil(ls *lua.LState, v lua.LValue) (string, error)

func LoadScript

func LoadScript(ctx context.Context, script string, printfn lua.LGFunction) (*lua.LState, error)

func LuaPrintFn

func LuaPrintFn(fn func(string)) lua.LGFunction

func NewPeerDBOCFWriter

func NewPeerDBOCFWriter(
	stream *model.QRecordStream,
	avroSchema *model.QRecordAvroSchemaDefinition,
	avroCompressionCodec ocf.CodecName,
	targetDWH protos.DBType,
	sizeTracker *model.QRecordAvroChunkSizeTracker,
) *peerDBOCFWriter

func PutAndRemoveS3

func PutAndRemoveS3(ctx context.Context, client *s3.Client, bucket string, prefix string) error

Write an empty file and then delete it to check if we have write permissions

func QuoteLiteral

func QuoteLiteral(literal string) string

QuoteLiteral quotes a 'literal' (e.g. a parameter, often used to pass literal to DDL and other statements that do not accept parameters) to be used as part of an SQL statement. For example:

exp_date := pq.QuoteLiteral("2023-01-05 15:00:00Z")
err := db.Exec(fmt.Sprintf("CREATE ROLE my_user VALID UNTIL %s", exp_date))

Any single quotes in name will be escaped. Any backslashes (i.e. "\") will be replaced by two backslashes (i.e. "\\") and the C-style escape identifier that PostgreSQL provides ('E') will be prepended to the string.

func RecordsToRawTableStream

func RecordsToRawTableStream[Items model.Items](
	req *model.RecordsToStreamRequest[Items], numericTruncator model.StreamNumericTruncator,
) (*model.QRecordStream, error)

func RemoveSpacesTabsNewlines

func RemoveSpacesTabsNewlines(s string) string

Types

type AWSCredentials

type AWSCredentials struct {
	EndpointUrl *string
	AWS         aws.Credentials
}

type AWSCredentialsProvider

type AWSCredentialsProvider interface {
	Retrieve(ctx context.Context) (AWSCredentials, error)
	GetUnderlyingProvider() aws.CredentialsProvider
	GetRegion() string
	GetEndpointURL() string
	GetTlsConfig() (*string, string)
}

func GetAWSCredentialsProvider

func GetAWSCredentialsProvider(ctx context.Context, connectorName string, peerCredentials PeerAWSCredentials) (AWSCredentialsProvider, error)

type AWSSecrets

type AWSSecrets struct {
	AccessKeyID     string
	SecretAccessKey string
	AwsRoleArn      string
	Region          string
	Endpoint        string
	SessionToken    string
}

type AssumeRoleBasedAWSCredentialsProvider

type AssumeRoleBasedAWSCredentialsProvider struct {
	Provider aws.CredentialsProvider // New Credentials
	// contains filtered or unexported fields
}

func NewAssumeRoleBasedAWSCredentialsProvider

func NewAssumeRoleBasedAWSCredentialsProvider(
	ctx context.Context,
	config aws.Config,
	roleArn string,
	sessionName string,
) (*AssumeRoleBasedAWSCredentialsProvider, error)

func (*AssumeRoleBasedAWSCredentialsProvider) GetEndpointURL

func (a *AssumeRoleBasedAWSCredentialsProvider) GetEndpointURL() string

func (*AssumeRoleBasedAWSCredentialsProvider) GetRegion

func (*AssumeRoleBasedAWSCredentialsProvider) GetTlsConfig

func (a *AssumeRoleBasedAWSCredentialsProvider) GetTlsConfig() (*string, string)

func (*AssumeRoleBasedAWSCredentialsProvider) GetUnderlyingProvider

func (*AssumeRoleBasedAWSCredentialsProvider) Retrieve

type AvroFile

type AvroFile struct {
	FilePath        string              `json:"filePath"`
	StorageLocation AvroStorageLocation `json:"storageLocation"`
	NumRecords      int64               `json:"numRecords"`
}

func (*AvroFile) Cleanup

func (l *AvroFile) Cleanup(ctx context.Context)

type AvroStorageLocation

type AvroStorageLocation int64

type CDCStore

type CDCStore[Items model.Items] struct {
	// contains filtered or unexported fields
}

func NewCDCStore

func NewCDCStore[Items model.Items](ctx context.Context, env map[string]string, flowJobName string) (*CDCStore[Items], error)

func (*CDCStore[T]) Close

func (c *CDCStore[T]) Close() error

func (*CDCStore[T]) Get

func (c *CDCStore[T]) Get(key model.TableWithPkey) (model.Record[T], bool, error)

bool is to indicate if a record is found or not [similar to ok]

func (*CDCStore[T]) Set

func (c *CDCStore[T]) Set(key model.TableWithPkey, rec model.Record[T]) error

type ClickHouseS3Credentials

type ClickHouseS3Credentials struct {
	Provider   AWSCredentialsProvider
	BucketPath string
}

type ConfigBasedAWSCredentialsProvider

type ConfigBasedAWSCredentialsProvider struct {
	// contains filtered or unexported fields
}

func NewConfigBasedAWSCredentialsProvider

func NewConfigBasedAWSCredentialsProvider(config aws.Config) *ConfigBasedAWSCredentialsProvider

func (*ConfigBasedAWSCredentialsProvider) GetEndpointURL

func (r *ConfigBasedAWSCredentialsProvider) GetEndpointURL() string

func (*ConfigBasedAWSCredentialsProvider) GetRegion

func (*ConfigBasedAWSCredentialsProvider) GetTlsConfig

func (r *ConfigBasedAWSCredentialsProvider) GetTlsConfig() (*string, string)

func (*ConfigBasedAWSCredentialsProvider) GetUnderlyingProvider

func (r *ConfigBasedAWSCredentialsProvider) GetUnderlyingProvider() aws.CredentialsProvider

func (*ConfigBasedAWSCredentialsProvider) Retrieve

Retrieve should be called as late as possible in order to have credentials with latest expiry

type GcpServiceAccount

type GcpServiceAccount struct {
	Type                    string `json:"type"`
	ProjectID               string `json:"project_id"`
	PrivateKeyID            string `json:"private_key_id"`
	PrivateKey              string `json:"private_key"`
	ClientEmail             string `json:"client_email"`
	ClientID                string `json:"client_id"`
	AuthURI                 string `json:"auth_uri"`
	TokenURI                string `json:"token_uri"`
	AuthProviderX509CertURL string `json:"auth_provider_x509_cert_url"`
	ClientX509CertURL       string `json:"client_x509_cert_url"`
}

func GcpServiceAccountFromProto

func GcpServiceAccountFromProto(sa *protos.GcpServiceAccount) *GcpServiceAccount

func (*GcpServiceAccount) CreateBigQueryClient

func (sa *GcpServiceAccount) CreateBigQueryClient(ctx context.Context) (*bigquery.Client, error)

CreateBigQueryClient creates a new BigQuery client from a GcpServiceAccount.

func (*GcpServiceAccount) CreatePubSubClient

func (sa *GcpServiceAccount) CreatePubSubClient(ctx context.Context) (*pubsub.Client, error)

CreatePubSubClient creates a new PubSub client from a GcpServiceAccount.

func (*GcpServiceAccount) CreateStorageClient

func (sa *GcpServiceAccount) CreateStorageClient(ctx context.Context) (*storage.Client, error)

CreateStorageClient creates a new Storage client from a GcpServiceAccount.

func (*GcpServiceAccount) Validate

func (sa *GcpServiceAccount) Validate() error

Validates a GcpServiceAccount, that none of the fields are empty.

type LPool

type LPool[T any] struct {
	// contains filtered or unexported fields
}

func LuaPool

func LuaPool[T any](maxSize int, cons func() (*lua.LState, error), merge func(T)) (*LPool[T], error)

func (*LPool[T]) Close

func (pool *LPool[T]) Close()

func (*LPool[T]) Run

func (pool *LPool[T]) Run(f func(*lua.LState) T)

func (*LPool[T]) Spawn

func (pool *LPool[T]) Spawn() error

func (*LPool[T]) Wait

func (pool *LPool[T]) Wait(ctx context.Context) error

type LPoolMessage

type LPoolMessage[T any] struct {
	// contains filtered or unexported fields
}

type MeteredConn

type MeteredConn struct {
	net.Conn
	// contains filtered or unexported fields
}

func (*MeteredConn) Read

func (mc *MeteredConn) Read(b []byte) (int, error)

type MeteredDialer

type MeteredDialer struct {
	// contains filtered or unexported fields
}

func NewMeteredDialer

func NewMeteredDialer(totalBytesRead *atomic.Int64, deltaBytesRead *atomic.Int64,
	innerDialer innerDialer, noDeadlineRequired bool,
) MeteredDialer

func (*MeteredDialer) DialContext

func (md *MeteredDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error)

type NoDeadlineConn

type NoDeadlineConn struct{ net.Conn }

see: https://github.com/jackc/pgx/issues/382#issuecomment-1496586216

func (*NoDeadlineConn) SetDeadline

func (c *NoDeadlineConn) SetDeadline(t time.Time) error

func (*NoDeadlineConn) SetReadDeadline

func (c *NoDeadlineConn) SetReadDeadline(t time.Time) error

func (*NoDeadlineConn) SetWriteDeadline

func (c *NoDeadlineConn) SetWriteDeadline(t time.Time) error

type PartitionHelper

type PartitionHelper struct {
	// contains filtered or unexported fields
}

func NewPartitionHelper

func NewPartitionHelper(logger log.Logger) *PartitionHelper

func (*PartitionHelper) AddPartition

func (p *PartitionHelper) AddPartition(start any, end any) error

func (*PartitionHelper) AddPartitionsWithRange

func (p *PartitionHelper) AddPartitionsWithRange(start any, end any, numPartitions int64) error

func (*PartitionHelper) GetPartitions

func (p *PartitionHelper) GetPartitions() []*protos.QRepPartition

type PartitionRangeForComparison

type PartitionRangeForComparison struct {
	// contains filtered or unexported fields
}

type PartitionRangeType

type PartitionRangeType string
const (
	PartitionEndRangeType   PartitionRangeType = "end"
	PartitionStartRangeType PartitionRangeType = "start"
)

type PeerAWSCredentials

type PeerAWSCredentials struct {
	Credentials    aws.Credentials
	RoleArn        *string
	ChainedRoleArn *string
	EndpointUrl    *string
	Region         string
	RootCAs        *string
	TlsHost        string
}

func BuildPeerAWSCredentials

func BuildPeerAWSCredentials(awsAuth *protos.AwsAuthenticationConfig) PeerAWSCredentials

func NewPeerAWSCredentials

func NewPeerAWSCredentials(s3 *protos.S3Config) PeerAWSCredentials

type RDSAuth

type RDSAuth struct {
	AwsAuthConfig *protos.AwsAuthenticationConfig
	// contains filtered or unexported fields
}

func (*RDSAuth) VerifyAuthConfig

func (r *RDSAuth) VerifyAuthConfig() error

type RDSConnectionConfig

type RDSConnectionConfig struct {
	Host string
	User string
	Port uint32
}

type RecalculateV4Signature

type RecalculateV4Signature struct {
	// contains filtered or unexported fields
}

RecalculateV4Signature allow GCS over S3, removing Accept-Encoding header from sign https://stackoverflow.com/a/74382598/1204665 https://github.com/aws/aws-sdk-go-v2/issues/1816

func (*RecalculateV4Signature) RoundTrip

func (lt *RecalculateV4Signature) RoundTrip(req *http.Request) (*http.Response, error)

type S3BucketAndPrefix

type S3BucketAndPrefix struct {
	Bucket string
	Prefix string
}

func NewS3BucketAndPrefix

func NewS3BucketAndPrefix(s3Path string) (*S3BucketAndPrefix, error)

path would be something like s3://bucket/prefix

type SSHTunnel

type SSHTunnel struct {
	*ssh.Client
	// contains filtered or unexported fields
}

func NewSSHTunnel

func NewSSHTunnel(
	ctx context.Context,
	sshConfig *protos.SSHConfig,
) (*SSHTunnel, error)

func (*SSHTunnel) Close

func (tunnel *SSHTunnel) Close() error

func (*SSHTunnel) GetKeepaliveChan

func (tunnel *SSHTunnel) GetKeepaliveChan(ctx context.Context) <-chan struct{}

returns a channel that will receive a value if the SSH keepalive fails or nil if no SSH tunnel is configured

type StaticAWSCredentialsProvider

type StaticAWSCredentialsProvider struct {
	// contains filtered or unexported fields
}

func LoadPeerDBAWSEnvConfigProvider

func LoadPeerDBAWSEnvConfigProvider(connectorName string) *StaticAWSCredentialsProvider

func NewStaticAWSCredentialsProvider

func NewStaticAWSCredentialsProvider(credentials AWSCredentials, region string, rootCAs *string, tlsHost string) *StaticAWSCredentialsProvider

func (*StaticAWSCredentialsProvider) GetEndpointURL

func (s *StaticAWSCredentialsProvider) GetEndpointURL() string

func (*StaticAWSCredentialsProvider) GetRegion

func (s *StaticAWSCredentialsProvider) GetRegion() string

func (*StaticAWSCredentialsProvider) GetTlsConfig

func (s *StaticAWSCredentialsProvider) GetTlsConfig() (*string, string)

func (*StaticAWSCredentialsProvider) GetUnderlyingProvider

func (s *StaticAWSCredentialsProvider) GetUnderlyingProvider() aws.CredentialsProvider

func (*StaticAWSCredentialsProvider) Retrieve

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL