| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 | 
							- package clickhouse
 
- import (
 
- 	"context"
 
- 	"fmt"
 
- 	"git.beejay.kim/tool/service"
 
- 	"github.com/ClickHouse/clickhouse-go/v2"
 
- 	"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
 
- 	"github.com/google/uuid"
 
- 	"github.com/rs/zerolog/log"
 
- 	"strings"
 
- 	"sync"
 
- 	"time"
 
- )
 
- type _ch struct {
 
- 	config *Config
 
- 	cli    clickhouse.Conn
 
- 	debug  bool
 
- }
 
- func New(config Config, debug bool) (service.Clickhouse, error) {
 
- 	var err error
 
- 	ch := &_ch{
 
- 		config: &config,
 
- 		debug:  debug,
 
- 	}
 
- 	if ch.cli, err = clickhouse.Open(&clickhouse.Options{
 
- 		Protocol: clickhouse.Native,
 
- 		Addr: []string{
 
- 			fmt.Sprintf("%s:%d", config.Host, config.Port),
 
- 		},
 
- 		Auth: clickhouse.Auth{
 
- 			Database: config.Database,
 
- 			Username: config.Username,
 
- 			Password: config.Password,
 
- 		},
 
- 		Debug: debug,
 
- 		Compression: &clickhouse.Compression{
 
- 			Method: clickhouse.CompressionLZ4,
 
- 		},
 
- 		DialTimeout: time.Duration(config.Params.Timeout) * time.Second,
 
- 		ReadTimeout: time.Duration(config.Params.ReadTimeout) * time.Second,
 
- 	}); err != nil {
 
- 		return nil, err
 
- 	}
 
- 	if err = ch.Ping(context.TODO()); err != nil {
 
- 		return nil, err
 
- 	}
 
- 	return ch, nil
 
- }
 
- func (ch *_ch) ID() uuid.UUID {
 
- 	return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("db.clickhouse"))
 
- }
 
- func (ch *_ch) Close() error {
 
- 	var wg sync.WaitGroup
 
- 	wg.Add(1)
 
- 	go func() {
 
- 		defer wg.Done()
 
- 		if ch.cli != nil {
 
- 			_ = ch.cli.Close()
 
- 		}
 
- 		log.Debug().
 
- 			Str("clickhouse", "stop session").
 
- 			Send()
 
- 	}()
 
- 	wg.Wait()
 
- 	return nil
 
- }
 
- func (ch *_ch) Ping(ctx context.Context) error {
 
- 	return ch.cli.Ping(ctx)
 
- }
 
- func (ch *_ch) Exec(ctx context.Context, query string, args ...any) error {
 
- 	return ch.cli.Exec(ctx, query, args...)
 
- }
 
- func (ch *_ch) Query(ctx context.Context, query string, args ...any) (driver.Rows, error) {
 
- 	return ch.cli.Query(ctx, query, args...)
 
- }
 
- //goland:noinspection ALL
 
- func (ch *_ch) Batch(ctx context.Context, tblName string, release bool) (driver.Batch, error) {
 
- 	if tblName = strings.TrimSpace(tblName); tblName == "" {
 
- 		return nil, fmt.Errorf("illegal argumets: empty table name")
 
- 	}
 
- 	var (
 
- 		query = fmt.Sprintf("INSERT INTO `%s`", tblName)
 
- 		opts  []driver.PrepareBatchOption
 
- 	)
 
- 	if release {
 
- 		opts = append(opts, driver.WithReleaseConnection())
 
- 	}
 
- 	return ch.cli.PrepareBatch(ctx, query, opts...)
 
- }
 
 
  |