| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 | package databaseimport (	"context"	"crypto/tls"	"fmt"	"github.com/ClickHouse/clickhouse-go/v2"	"github.com/ClickHouse/clickhouse-go/v2/lib/driver"	"strings"	"sync"	"time")type _ch struct {	config *configCH	conn   clickhouse.Conn}//goland:noinspection GoUnusedExportedFunctionfunc NewClickhouse(cfg Config, debug bool) (Database[driver.Rows, driver.Batch], error) {	var tlcConfig *tls.Config	if cfg.Clickhouse.Params.Secure {		tlcConfig = &tls.Config{			InsecureSkipVerify: true,		}	}	session, err := clickhouse.Open(&clickhouse.Options{		Protocol: clickhouse.Native,		TLS:      tlcConfig,		Addr: []string{			fmt.Sprintf("%s:%d", cfg.Clickhouse.Host, cfg.Clickhouse.Port),		},		Auth: clickhouse.Auth{			Database: cfg.Clickhouse.Database,			Username: cfg.Clickhouse.Username,			Password: cfg.Clickhouse.Password,		},		Debug: debug,		Compression: &clickhouse.Compression{			Method: cfg.Clickhouse.Params.CompressionMethod(),		},		DialTimeout: time.Duration(cfg.Clickhouse.Params.Timeout) * time.Second,		ReadTimeout: time.Duration(cfg.Clickhouse.Params.ReadTimeout) * time.Second,	})	if err != nil {		return nil, err	}	s := &_ch{		config: &cfg.Clickhouse,		conn:   session,	}	if err = s.Ping(context.TODO()); err != nil {		return nil, err	}	return s, nil}func (db *_ch) String() string {	return "clickhouse"}func (db *_ch) Close() error {	var wg sync.WaitGroup	wg.Add(1)	go func() {		defer wg.Done()		if db.conn != nil {			_ = db.conn.Close()		}	}()	wg.Wait()	return nil}func (db *_ch) Ping(ctx context.Context) error {	return db.conn.Ping(ctx)}func (db *_ch) Exec(ctx context.Context, query string, args ...any) error {	return db.conn.Exec(ctx, query, args...)}func (db *_ch) Query(ctx context.Context, query string, args ...any) (driver.Rows, error) {	return db.conn.Query(ctx, query, args...)}//goland:noinspection SqlNoDataSourceInspectionfunc (db *_ch) Batch(ctx context.Context, table string, release bool) (driver.Batch, error) {	table = strings.TrimSpace(table)	if table == "" {		return nil, fmt.Errorf("illegal argumets: empty table name")	}	var (		query = fmt.Sprintf("INSERT INTO `%s`", table)		opts  []driver.PrepareBatchOption	)	if release {		opts = append(opts, driver.WithReleaseConnection())	}	return db.conn.PrepareBatch(ctx, query, opts...)}
 |