Documentation
¶
Index ¶
- Constants
- func CreatePeerNoValidate(ctx context.Context, pool shared.CatalogPool, peer *protos.Peer, ...) (*protos.CreatePeerResponse, error)
- func CreateS3Client(ctx context.Context, credsProvider AWSCredentialsProvider) (*s3.Client, error)
- func DefaultOnRecord(ls *lua.LState) int
- func FileURLForS3Service(endpoint string, region string, bucket string, filePath string) string
- func FormatTableSize(bytes int64) string
- func GetRDSToken(ctx context.Context, connConfig RDSConnectionConfig, rdsAuth *RDSAuth, ...) (string, error)
- func GetSSHClientConfig(config *protos.SSHConfig) (*ssh.ClientConfig, error)
- func InitialiseTableRowsMap(tableMaps []*protos.TableMapping) map[string]*model.RecordTypeCounts
- func IsLower(s string) bool
- func IsUpper(s string) bool
- func KeysToString(m map[string]struct{}) string
- func LVAsReadOnlyBytes(ls *lua.LState, v lua.LValue) ([]byte, error)
- func LVAsStringOrNil(ls *lua.LState, v lua.LValue) (string, error)
- func LoadScript(ctx context.Context, script string, printfn lua.LGFunction) (*lua.LState, error)
- func LuaPrintFn(fn func(string)) lua.LGFunction
- func NewPeerDBOCFWriter(stream *model.QRecordStream, avroSchema *model.QRecordAvroSchemaDefinition, ...) *peerDBOCFWriter
- func PutAndRemoveS3(ctx context.Context, client *s3.Client, bucket string, prefix string) error
- func QuoteLiteral(literal string) string
- func RecordsToRawTableStream[Items model.Items](req *model.RecordsToStreamRequest[Items], ...) (*model.QRecordStream, error)
- func RemoveSpacesTabsNewlines(s string) string
- type AWSCredentials
- type AWSCredentialsProvider
- type AWSSecrets
- type AssumeRoleBasedAWSCredentialsProvider
- func (a *AssumeRoleBasedAWSCredentialsProvider) GetEndpointURL() string
- func (a *AssumeRoleBasedAWSCredentialsProvider) GetRegion() string
- func (a *AssumeRoleBasedAWSCredentialsProvider) GetTlsConfig() (*string, string)
- func (a *AssumeRoleBasedAWSCredentialsProvider) GetUnderlyingProvider() aws.CredentialsProvider
- func (a *AssumeRoleBasedAWSCredentialsProvider) Retrieve(ctx context.Context) (AWSCredentials, error)
- type AvroFile
- type AvroStorageLocation
- type CDCStore
- type ClickHouseS3Credentials
- type ConfigBasedAWSCredentialsProvider
- func (r *ConfigBasedAWSCredentialsProvider) GetEndpointURL() string
- func (r *ConfigBasedAWSCredentialsProvider) GetRegion() string
- func (r *ConfigBasedAWSCredentialsProvider) GetTlsConfig() (*string, string)
- func (r *ConfigBasedAWSCredentialsProvider) GetUnderlyingProvider() aws.CredentialsProvider
- func (r *ConfigBasedAWSCredentialsProvider) Retrieve(ctx context.Context) (AWSCredentials, error)
- type GcpServiceAccount
- func (sa *GcpServiceAccount) CreateBigQueryClient(ctx context.Context) (*bigquery.Client, error)
- func (sa *GcpServiceAccount) CreatePubSubClient(ctx context.Context) (*pubsub.Client, error)
- func (sa *GcpServiceAccount) CreateStorageClient(ctx context.Context) (*storage.Client, error)
- func (sa *GcpServiceAccount) Validate() error
- type LPool
- type LPoolMessage
- type MeteredConn
- type MeteredDialer
- type NoDeadlineConn
- type PartitionHelper
- type PartitionRangeForComparison
- type PartitionRangeType
- type PeerAWSCredentials
- type RDSAuth
- type RDSConnectionConfig
- type RecalculateV4Signature
- type S3BucketAndPrefix
- type SSHTunnel
- type StaticAWSCredentialsProvider
- func (s *StaticAWSCredentialsProvider) GetEndpointURL() string
- func (s *StaticAWSCredentialsProvider) GetRegion() string
- func (s *StaticAWSCredentialsProvider) GetTlsConfig() (*string, string)
- func (s *StaticAWSCredentialsProvider) GetUnderlyingProvider() aws.CredentialsProvider
- func (s *StaticAWSCredentialsProvider) Retrieve(ctx context.Context) (AWSCredentials, error)
Constants ¶
const ( AvroLocalStorage = iota AvroS3Storage AvroGCSStorage )
const FullTablePartitionID = "full-table-partition-id"
const MaxAWSSessionNameLength = 63 // Docs mention 64 as limit, but always good to stay under
const RDSAuthTokenTTL = 10 * time.Minute
RDSAuthTokenTTL is the cache TTL for RDS auth tokens. RDS Tokens Live for 15 minutes by default
const SSHKeepaliveInterval = 15 * time.Second
Variables ¶
This section is empty.
Functions ¶
func CreatePeerNoValidate ¶
func CreatePeerNoValidate( ctx context.Context, pool shared.CatalogPool, peer *protos.Peer, allowUpdate bool, ) (*protos.CreatePeerResponse, error)
func CreateS3Client ¶
func DefaultOnRecord ¶
func FileURLForS3Service ¶
func FormatTableSize ¶
FormatTableSize converts bytes to human-readable format
func GetRDSToken ¶
func GetSSHClientConfig ¶
func GetSSHClientConfig(config *protos.SSHConfig) (*ssh.ClientConfig, error)
GetSSHClientConfig returns an *ssh.ClientConfig based on provided credentials.
func InitialiseTableRowsMap ¶
func InitialiseTableRowsMap(tableMaps []*protos.TableMapping) map[string]*model.RecordTypeCounts
func KeysToString ¶
func LoadScript ¶
func LuaPrintFn ¶
func LuaPrintFn(fn func(string)) lua.LGFunction
func NewPeerDBOCFWriter ¶
func NewPeerDBOCFWriter( stream *model.QRecordStream, avroSchema *model.QRecordAvroSchemaDefinition, avroCompressionCodec ocf.CodecName, targetDWH protos.DBType, sizeTracker *model.QRecordAvroChunkSizeTracker, ) *peerDBOCFWriter
func PutAndRemoveS3 ¶
Write an empty file and then delete it to check if we have write permissions
func QuoteLiteral ¶
QuoteLiteral quotes a 'literal' (e.g. a parameter, often used to pass literal to DDL and other statements that do not accept parameters) to be used as part of an SQL statement. For example:
exp_date := pq.QuoteLiteral("2023-01-05 15:00:00Z")
err := db.Exec(fmt.Sprintf("CREATE ROLE my_user VALID UNTIL %s", exp_date))
Any single quotes in name will be escaped. Any backslashes (i.e. "\") will be replaced by two backslashes (i.e. "\\") and the C-style escape identifier that PostgreSQL provides ('E') will be prepended to the string.
func RecordsToRawTableStream ¶
func RecordsToRawTableStream[Items model.Items]( req *model.RecordsToStreamRequest[Items], numericTruncator model.StreamNumericTruncator, ) (*model.QRecordStream, error)
Types ¶
type AWSCredentials ¶
type AWSCredentials struct {
EndpointUrl *string
AWS aws.Credentials
}
type AWSCredentialsProvider ¶
type AWSCredentialsProvider interface {
Retrieve(ctx context.Context) (AWSCredentials, error)
GetUnderlyingProvider() aws.CredentialsProvider
GetRegion() string
GetEndpointURL() string
GetTlsConfig() (*string, string)
}
func GetAWSCredentialsProvider ¶
func GetAWSCredentialsProvider(ctx context.Context, connectorName string, peerCredentials PeerAWSCredentials) (AWSCredentialsProvider, error)
type AWSSecrets ¶
type AssumeRoleBasedAWSCredentialsProvider ¶
type AssumeRoleBasedAWSCredentialsProvider struct {
Provider aws.CredentialsProvider // New Credentials
// contains filtered or unexported fields
}
func (*AssumeRoleBasedAWSCredentialsProvider) GetEndpointURL ¶
func (a *AssumeRoleBasedAWSCredentialsProvider) GetEndpointURL() string
func (*AssumeRoleBasedAWSCredentialsProvider) GetRegion ¶
func (a *AssumeRoleBasedAWSCredentialsProvider) GetRegion() string
func (*AssumeRoleBasedAWSCredentialsProvider) GetTlsConfig ¶
func (a *AssumeRoleBasedAWSCredentialsProvider) GetTlsConfig() (*string, string)
func (*AssumeRoleBasedAWSCredentialsProvider) GetUnderlyingProvider ¶
func (a *AssumeRoleBasedAWSCredentialsProvider) GetUnderlyingProvider() aws.CredentialsProvider
func (*AssumeRoleBasedAWSCredentialsProvider) Retrieve ¶
func (a *AssumeRoleBasedAWSCredentialsProvider) Retrieve(ctx context.Context) (AWSCredentials, error)
type AvroFile ¶
type AvroFile struct {
FilePath string `json:"filePath"`
StorageLocation AvroStorageLocation `json:"storageLocation"`
NumRecords int64 `json:"numRecords"`
}
type AvroStorageLocation ¶
type AvroStorageLocation int64
type CDCStore ¶
func NewCDCStore ¶
type ClickHouseS3Credentials ¶
type ClickHouseS3Credentials struct {
Provider AWSCredentialsProvider
BucketPath string
}
type ConfigBasedAWSCredentialsProvider ¶
type ConfigBasedAWSCredentialsProvider struct {
// contains filtered or unexported fields
}
func NewConfigBasedAWSCredentialsProvider ¶
func NewConfigBasedAWSCredentialsProvider(config aws.Config) *ConfigBasedAWSCredentialsProvider
func (*ConfigBasedAWSCredentialsProvider) GetEndpointURL ¶
func (r *ConfigBasedAWSCredentialsProvider) GetEndpointURL() string
func (*ConfigBasedAWSCredentialsProvider) GetRegion ¶
func (r *ConfigBasedAWSCredentialsProvider) GetRegion() string
func (*ConfigBasedAWSCredentialsProvider) GetTlsConfig ¶
func (r *ConfigBasedAWSCredentialsProvider) GetTlsConfig() (*string, string)
func (*ConfigBasedAWSCredentialsProvider) GetUnderlyingProvider ¶
func (r *ConfigBasedAWSCredentialsProvider) GetUnderlyingProvider() aws.CredentialsProvider
func (*ConfigBasedAWSCredentialsProvider) Retrieve ¶
func (r *ConfigBasedAWSCredentialsProvider) Retrieve(ctx context.Context) (AWSCredentials, error)
Retrieve should be called as late as possible in order to have credentials with latest expiry
type GcpServiceAccount ¶
type GcpServiceAccount struct {
Type string `json:"type"`
ProjectID string `json:"project_id"`
PrivateKeyID string `json:"private_key_id"`
PrivateKey string `json:"private_key"`
ClientEmail string `json:"client_email"`
ClientID string `json:"client_id"`
AuthURI string `json:"auth_uri"`
TokenURI string `json:"token_uri"`
AuthProviderX509CertURL string `json:"auth_provider_x509_cert_url"`
ClientX509CertURL string `json:"client_x509_cert_url"`
}
func GcpServiceAccountFromProto ¶
func GcpServiceAccountFromProto(sa *protos.GcpServiceAccount) *GcpServiceAccount
func (*GcpServiceAccount) CreateBigQueryClient ¶
CreateBigQueryClient creates a new BigQuery client from a GcpServiceAccount.
func (*GcpServiceAccount) CreatePubSubClient ¶
CreatePubSubClient creates a new PubSub client from a GcpServiceAccount.
func (*GcpServiceAccount) CreateStorageClient ¶
CreateStorageClient creates a new Storage client from a GcpServiceAccount.
func (*GcpServiceAccount) Validate ¶
func (sa *GcpServiceAccount) Validate() error
Validates a GcpServiceAccount, that none of the fields are empty.
type LPoolMessage ¶
type LPoolMessage[T any] struct { // contains filtered or unexported fields }
type MeteredConn ¶
type MeteredDialer ¶
type MeteredDialer struct {
// contains filtered or unexported fields
}
func NewMeteredDialer ¶
func (*MeteredDialer) DialContext ¶
type NoDeadlineConn ¶
see: https://github.com/jackc/pgx/issues/382#issuecomment-1496586216
func (*NoDeadlineConn) SetDeadline ¶
func (c *NoDeadlineConn) SetDeadline(t time.Time) error
func (*NoDeadlineConn) SetReadDeadline ¶
func (c *NoDeadlineConn) SetReadDeadline(t time.Time) error
func (*NoDeadlineConn) SetWriteDeadline ¶
func (c *NoDeadlineConn) SetWriteDeadline(t time.Time) error
type PartitionHelper ¶
type PartitionHelper struct {
// contains filtered or unexported fields
}
func NewPartitionHelper ¶
func NewPartitionHelper(logger log.Logger) *PartitionHelper
func (*PartitionHelper) AddPartition ¶
func (p *PartitionHelper) AddPartition(start any, end any) error
func (*PartitionHelper) AddPartitionsWithRange ¶
func (p *PartitionHelper) AddPartitionsWithRange(start any, end any, numPartitions int64) error
func (*PartitionHelper) GetPartitions ¶
func (p *PartitionHelper) GetPartitions() []*protos.QRepPartition
type PartitionRangeForComparison ¶
type PartitionRangeForComparison struct {
// contains filtered or unexported fields
}
type PartitionRangeType ¶
type PartitionRangeType string
const ( PartitionEndRangeType PartitionRangeType = "end" PartitionStartRangeType PartitionRangeType = "start" )
type PeerAWSCredentials ¶
type PeerAWSCredentials struct {
Credentials aws.Credentials
RoleArn *string
ChainedRoleArn *string
EndpointUrl *string
Region string
RootCAs *string
TlsHost string
}
func BuildPeerAWSCredentials ¶
func BuildPeerAWSCredentials(awsAuth *protos.AwsAuthenticationConfig) PeerAWSCredentials
func NewPeerAWSCredentials ¶
func NewPeerAWSCredentials(s3 *protos.S3Config) PeerAWSCredentials
type RDSAuth ¶
type RDSAuth struct {
AwsAuthConfig *protos.AwsAuthenticationConfig
// contains filtered or unexported fields
}
func (*RDSAuth) VerifyAuthConfig ¶
type RDSConnectionConfig ¶
type RecalculateV4Signature ¶
type RecalculateV4Signature struct {
// contains filtered or unexported fields
}
RecalculateV4Signature allow GCS over S3, removing Accept-Encoding header from sign https://stackoverflow.com/a/74382598/1204665 https://github.com/aws/aws-sdk-go-v2/issues/1816
type S3BucketAndPrefix ¶
func NewS3BucketAndPrefix ¶
func NewS3BucketAndPrefix(s3Path string) (*S3BucketAndPrefix, error)
path would be something like s3://bucket/prefix
type SSHTunnel ¶
func NewSSHTunnel ¶
func (*SSHTunnel) GetKeepaliveChan ¶
returns a channel that will receive a value if the SSH keepalive fails or nil if no SSH tunnel is configured
type StaticAWSCredentialsProvider ¶
type StaticAWSCredentialsProvider struct {
// contains filtered or unexported fields
}
func LoadPeerDBAWSEnvConfigProvider ¶
func LoadPeerDBAWSEnvConfigProvider(connectorName string) *StaticAWSCredentialsProvider
func NewStaticAWSCredentialsProvider ¶
func NewStaticAWSCredentialsProvider(credentials AWSCredentials, region string, rootCAs *string, tlsHost string) *StaticAWSCredentialsProvider
func (*StaticAWSCredentialsProvider) GetEndpointURL ¶
func (s *StaticAWSCredentialsProvider) GetEndpointURL() string
func (*StaticAWSCredentialsProvider) GetRegion ¶
func (s *StaticAWSCredentialsProvider) GetRegion() string
func (*StaticAWSCredentialsProvider) GetTlsConfig ¶
func (s *StaticAWSCredentialsProvider) GetTlsConfig() (*string, string)
func (*StaticAWSCredentialsProvider) GetUnderlyingProvider ¶
func (s *StaticAWSCredentialsProvider) GetUnderlyingProvider() aws.CredentialsProvider
func (*StaticAWSCredentialsProvider) Retrieve ¶
func (s *StaticAWSCredentialsProvider) Retrieve(ctx context.Context) (AWSCredentials, error)