package database import ( "context" "crypto/tls" "fmt" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "strings" "sync" "time" ) type _ch struct { config *configCH conn clickhouse.Conn } //goland:noinspection GoUnusedExportedFunction func NewClickhouse(cfg Config, debug bool) (Database[driver.Rows, driver.Batch], error) { var tlcConfig *tls.Config if cfg.Clickhouse.Params.Secure { tlcConfig = &tls.Config{ InsecureSkipVerify: true, } } session, err := clickhouse.Open(&clickhouse.Options{ Protocol: clickhouse.Native, TLS: tlcConfig, Addr: []string{ fmt.Sprintf("%s:%d", cfg.Clickhouse.Host, cfg.Clickhouse.Port), }, Auth: clickhouse.Auth{ Database: cfg.Clickhouse.Database, Username: cfg.Clickhouse.Username, Password: cfg.Clickhouse.Password, }, Debug: debug, Compression: &clickhouse.Compression{ Method: cfg.Clickhouse.Params.CompressionMethod(), }, DialTimeout: time.Duration(cfg.Clickhouse.Params.Timeout) * time.Second, ReadTimeout: time.Duration(cfg.Clickhouse.Params.ReadTimeout) * time.Second, }) if err != nil { return nil, err } s := &_ch{ config: &cfg.Clickhouse, conn: session, } if err = s.Ping(context.TODO()); err != nil { return nil, err } return s, nil } func (db *_ch) String() string { return "clickhouse" } func (db *_ch) Close() error { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() if db.conn != nil { _ = db.conn.Close() } }() wg.Wait() return nil } func (db *_ch) Ping(ctx context.Context) error { return db.conn.Ping(ctx) } func (db *_ch) Exec(ctx context.Context, query string, args ...any) error { return db.conn.Exec(ctx, query, args...) } func (db *_ch) Query(ctx context.Context, query string, args ...any) (driver.Rows, error) { return db.conn.Query(ctx, query, args...) } //goland:noinspection SqlNoDataSourceInspection func (db *_ch) Batch(ctx context.Context, table string, release bool) (driver.Batch, error) { table = strings.TrimSpace(table) if table == "" { return nil, fmt.Errorf("illegal argumets: empty table name") } var ( query = fmt.Sprintf("INSERT INTO `%s`", table) opts []driver.PrepareBatchOption ) if release { opts = append(opts, driver.WithReleaseConnection()) } return db.conn.PrepareBatch(ctx, query, opts...) }