123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- package database
- import (
- "context"
- "crypto/tls"
- "fmt"
- "github.com/ClickHouse/clickhouse-go/v2"
- "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
- "strings"
- "sync"
- "time"
- )
- type _ch struct {
- config *configCH
- conn clickhouse.Conn
- }
- //goland:noinspection GoUnusedExportedFunction
- func NewClickhouse(cfg Config, debug bool) (Database[driver.Rows, driver.Batch], error) {
- var tlcConfig *tls.Config
- if cfg.Clickhouse.Params.Secure {
- tlcConfig = &tls.Config{
- InsecureSkipVerify: true,
- }
- }
- session, err := clickhouse.Open(&clickhouse.Options{
- Protocol: clickhouse.Native,
- TLS: tlcConfig,
- Addr: []string{
- fmt.Sprintf("%s:%d", cfg.Clickhouse.Host, cfg.Clickhouse.Port),
- },
- Auth: clickhouse.Auth{
- Database: cfg.Clickhouse.Database,
- Username: cfg.Clickhouse.Username,
- Password: cfg.Clickhouse.Password,
- },
- Debug: debug,
- Compression: &clickhouse.Compression{
- Method: cfg.Clickhouse.Params.CompressionMethod(),
- },
- DialTimeout: time.Duration(cfg.Clickhouse.Params.Timeout) * time.Second,
- ReadTimeout: time.Duration(cfg.Clickhouse.Params.ReadTimeout) * time.Second,
- })
- if err != nil {
- return nil, err
- }
- s := &_ch{
- config: &cfg.Clickhouse,
- conn: session,
- }
- if err = s.Ping(context.TODO()); err != nil {
- return nil, err
- }
- return s, nil
- }
- func (db *_ch) String() string {
- return "clickhouse"
- }
- func (db *_ch) Close() error {
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- if db.conn != nil {
- _ = db.conn.Close()
- }
- }()
- wg.Wait()
- return nil
- }
- func (db *_ch) Ping(ctx context.Context) error {
- return db.conn.Ping(ctx)
- }
- func (db *_ch) Exec(ctx context.Context, query string, args ...any) error {
- return db.conn.Exec(ctx, query, args...)
- }
- func (db *_ch) Query(ctx context.Context, query string, args ...any) (driver.Rows, error) {
- return db.conn.Query(ctx, query, args...)
- }
- //goland:noinspection SqlNoDataSourceInspection
- func (db *_ch) Batch(ctx context.Context, table string, release bool) (driver.Batch, error) {
- table = strings.TrimSpace(table)
- if table == "" {
- return nil, fmt.Errorf("illegal argumets: empty table name")
- }
- var (
- query = fmt.Sprintf("INSERT INTO `%s`", table)
- opts []driver.PrepareBatchOption
- )
- if release {
- opts = append(opts, driver.WithReleaseConnection())
- }
- return db.conn.PrepareBatch(ctx, query, opts...)
- }
|