clickhouse.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package database
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "github.com/ClickHouse/clickhouse-go/v2"
  7. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. type _ch struct {
  13. config *configCH
  14. conn clickhouse.Conn
  15. }
  16. //goland:noinspection GoUnusedExportedFunction
  17. func NewClickhouse(cfg Config, debug bool) (Database[driver.Rows, driver.Batch], error) {
  18. var tlcConfig *tls.Config
  19. if cfg.Clickhouse.Params.Secure {
  20. tlcConfig = &tls.Config{
  21. InsecureSkipVerify: true,
  22. }
  23. }
  24. session, err := clickhouse.Open(&clickhouse.Options{
  25. Protocol: clickhouse.Native,
  26. TLS: tlcConfig,
  27. Addr: []string{
  28. fmt.Sprintf("%s:%d", cfg.Clickhouse.Host, cfg.Clickhouse.Port),
  29. },
  30. Auth: clickhouse.Auth{
  31. Database: cfg.Clickhouse.Database,
  32. Username: cfg.Clickhouse.Username,
  33. Password: cfg.Clickhouse.Password,
  34. },
  35. Debug: debug,
  36. Compression: &clickhouse.Compression{
  37. Method: cfg.Clickhouse.Params.CompressionMethod(),
  38. },
  39. DialTimeout: time.Duration(cfg.Clickhouse.Params.Timeout) * time.Second,
  40. ReadTimeout: time.Duration(cfg.Clickhouse.Params.ReadTimeout) * time.Second,
  41. })
  42. if err != nil {
  43. return nil, err
  44. }
  45. s := &_ch{
  46. config: &cfg.Clickhouse,
  47. conn: session,
  48. }
  49. if err = s.Ping(context.TODO()); err != nil {
  50. return nil, err
  51. }
  52. return s, nil
  53. }
  54. func (db *_ch) String() string {
  55. return "clickhouse"
  56. }
  57. func (db *_ch) Close() error {
  58. var wg sync.WaitGroup
  59. wg.Add(1)
  60. go func() {
  61. defer wg.Done()
  62. if db.conn != nil {
  63. _ = db.conn.Close()
  64. }
  65. }()
  66. wg.Wait()
  67. return nil
  68. }
  69. func (db *_ch) Ping(ctx context.Context) error {
  70. return db.conn.Ping(ctx)
  71. }
  72. func (db *_ch) Exec(ctx context.Context, query string, args ...any) error {
  73. return db.conn.Exec(ctx, query, args...)
  74. }
  75. func (db *_ch) Query(ctx context.Context, query string, args ...any) (driver.Rows, error) {
  76. return db.conn.Query(ctx, query, args...)
  77. }
  78. //goland:noinspection SqlNoDataSourceInspection
  79. func (db *_ch) Batch(ctx context.Context, table string, release bool) (driver.Batch, error) {
  80. table = strings.TrimSpace(table)
  81. if table == "" {
  82. return nil, fmt.Errorf("illegal argumets: empty table name")
  83. }
  84. var (
  85. query = fmt.Sprintf("INSERT INTO `%s`", table)
  86. opts []driver.PrepareBatchOption
  87. )
  88. if release {
  89. opts = append(opts, driver.WithReleaseConnection())
  90. }
  91. return db.conn.PrepareBatch(ctx, query, opts...)
  92. }