123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- package clickhouse
- import (
- "context"
- "fmt"
- "git.beejay.kim/tool/service"
- "github.com/ClickHouse/clickhouse-go/v2"
- "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
- "github.com/google/uuid"
- "github.com/rs/zerolog/log"
- "strings"
- "sync"
- "time"
- )
- type _ch struct {
- config *Config
- cli clickhouse.Conn
- debug bool
- }
- func New(config Config, debug bool) (service.Clickhouse, error) {
- var err error
- ch := &_ch{
- config: &config,
- debug: debug,
- }
- if ch.cli, err = clickhouse.Open(&clickhouse.Options{
- Protocol: clickhouse.Native,
- Addr: []string{
- fmt.Sprintf("%s:%d", config.Host, config.Port),
- },
- Auth: clickhouse.Auth{
- Database: config.Database,
- Username: config.Username,
- Password: config.Password,
- },
- Debug: debug,
- Compression: &clickhouse.Compression{
- Method: clickhouse.CompressionLZ4,
- },
- DialTimeout: time.Duration(config.Params.Timeout) * time.Second,
- ReadTimeout: time.Duration(config.Params.ReadTimeout) * time.Second,
- }); err != nil {
- return nil, err
- }
- if err = ch.Ping(context.TODO()); err != nil {
- return nil, err
- }
- return ch, nil
- }
- func (ch *_ch) ID() uuid.UUID {
- return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("db.clickhouse"))
- }
- func (ch *_ch) Close() error {
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- if ch.cli != nil {
- _ = ch.cli.Close()
- }
- log.Debug().
- Str("clickhouse", "stop session").
- Send()
- }()
- wg.Wait()
- return nil
- }
- func (ch *_ch) Ping(ctx context.Context) error {
- return ch.cli.Ping(ctx)
- }
- func (ch *_ch) Exec(ctx context.Context, query string, args ...any) error {
- return ch.cli.Exec(ctx, query, args...)
- }
- func (ch *_ch) Query(ctx context.Context, query string, args ...any) (driver.Rows, error) {
- return ch.cli.Query(ctx, query, args...)
- }
- //goland:noinspection ALL
- func (ch *_ch) Batch(ctx context.Context, tblName string, release bool) (driver.Batch, error) {
- if tblName = strings.TrimSpace(tblName); tblName == "" {
- return nil, fmt.Errorf("illegal argumets: empty table name")
- }
- var (
- query = fmt.Sprintf("INSERT INTO `%s`", tblName)
- opts []driver.PrepareBatchOption
- )
- if release {
- opts = append(opts, driver.WithReleaseConnection())
- }
- return ch.cli.PrepareBatch(ctx, query, opts...)
- }
|