clickhouse.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package database
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "github.com/ClickHouse/clickhouse-go/v2"
  7. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  8. "sync"
  9. "time"
  10. )
  11. type _ch struct {
  12. Database[driver.Rows, driver.Batch]
  13. config *configCH
  14. conn clickhouse.Conn
  15. }
  16. //goland:noinspection GoUnusedExportedFunction
  17. func NewClickhouse(cfg Config, debug bool) (Database[driver.Rows, driver.Batch], error) {
  18. var tlcConfig *tls.Config
  19. if cfg.Clickhouse.Params.Secure {
  20. tlcConfig = &tls.Config{
  21. InsecureSkipVerify: true,
  22. }
  23. }
  24. session, err := clickhouse.Open(&clickhouse.Options{
  25. Protocol: clickhouse.Native,
  26. TLS: tlcConfig,
  27. Addr: []string{
  28. fmt.Sprintf("%s:%d", cfg.Clickhouse.Host, cfg.Clickhouse.Port),
  29. },
  30. Auth: clickhouse.Auth{
  31. Database: cfg.Clickhouse.Database,
  32. Username: cfg.Clickhouse.Username,
  33. Password: cfg.Clickhouse.Password,
  34. },
  35. Debug: debug,
  36. Compression: &clickhouse.Compression{
  37. Method: cfg.Clickhouse.Params.CompressionMethod(),
  38. },
  39. DialTimeout: time.Duration(cfg.Clickhouse.Params.Timeout) * time.Second,
  40. ReadTimeout: time.Duration(cfg.Clickhouse.Params.ReadTimeout) * time.Second,
  41. })
  42. if err != nil {
  43. return nil, err
  44. }
  45. s := &_ch{
  46. config: &cfg.Clickhouse,
  47. conn: session,
  48. }
  49. if err = s.Ping(context.TODO()); err != nil {
  50. return nil, err
  51. }
  52. return s, nil
  53. }
  54. func (db *_ch) String() string {
  55. return "clickhouse"
  56. }
  57. func (db *_ch) Close() error {
  58. var wg sync.WaitGroup
  59. wg.Add(1)
  60. go func() {
  61. defer wg.Done()
  62. if db.conn != nil {
  63. _ = db.conn.Close()
  64. }
  65. }()
  66. wg.Wait()
  67. return nil
  68. }
  69. func (db *_ch) Ping(ctx context.Context) error {
  70. return db.conn.Ping(ctx)
  71. }
  72. func (db *_ch) Query(ctx context.Context, query string, args ...any) (driver.Rows, error) {
  73. return db.conn.Query(ctx, query, args...)
  74. }