clickhouse.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package clickhouse
  2. import (
  3. "context"
  4. "fmt"
  5. "git.beejay.kim/tool/service"
  6. "github.com/ClickHouse/clickhouse-go/v2"
  7. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  8. "github.com/google/uuid"
  9. "github.com/rs/zerolog/log"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. type _ch struct {
  15. config *Config
  16. cli clickhouse.Conn
  17. debug bool
  18. }
  19. func New(config Config, debug bool) (service.Clickhouse, error) {
  20. var err error
  21. ch := &_ch{
  22. config: &config,
  23. debug: debug,
  24. }
  25. if ch.cli, err = clickhouse.Open(&clickhouse.Options{
  26. Protocol: clickhouse.Native,
  27. Addr: []string{
  28. fmt.Sprintf("%s:%d", config.Host, config.Port),
  29. },
  30. Auth: clickhouse.Auth{
  31. Database: config.Database,
  32. Username: config.Username,
  33. Password: config.Password,
  34. },
  35. Debug: debug,
  36. Compression: &clickhouse.Compression{
  37. Method: clickhouse.CompressionLZ4,
  38. },
  39. DialTimeout: time.Duration(config.Params.Timeout) * time.Second,
  40. ReadTimeout: time.Duration(config.Params.ReadTimeout) * time.Second,
  41. }); err != nil {
  42. return nil, err
  43. }
  44. if err = ch.Ping(context.TODO()); err != nil {
  45. return nil, err
  46. }
  47. return ch, nil
  48. }
  49. func (ch *_ch) ID() uuid.UUID {
  50. return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("db.clickhouse"))
  51. }
  52. func (ch *_ch) Close() error {
  53. var wg sync.WaitGroup
  54. wg.Add(1)
  55. go func() {
  56. defer wg.Done()
  57. if ch.cli != nil {
  58. _ = ch.cli.Close()
  59. }
  60. log.Debug().
  61. Str("clickhouse", "stop session").
  62. Send()
  63. }()
  64. wg.Wait()
  65. return nil
  66. }
  67. func (ch *_ch) Ping(ctx context.Context) error {
  68. return ch.cli.Ping(ctx)
  69. }
  70. func (ch *_ch) Exec(ctx context.Context, query string, args ...any) error {
  71. return ch.cli.Exec(ctx, query, args...)
  72. }
  73. func (ch *_ch) Query(ctx context.Context, query string, args ...any) (driver.Rows, error) {
  74. return ch.cli.Query(ctx, query, args...)
  75. }
  76. //goland:noinspection ALL
  77. func (ch *_ch) Batch(ctx context.Context, tblName string, release bool) (driver.Batch, error) {
  78. if tblName = strings.TrimSpace(tblName); tblName == "" {
  79. return nil, fmt.Errorf("illegal argumets: empty table name")
  80. }
  81. var (
  82. query = fmt.Sprintf("INSERT INTO `%s`", tblName)
  83. opts []driver.PrepareBatchOption
  84. )
  85. if release {
  86. opts = append(opts, driver.WithReleaseConnection())
  87. }
  88. return ch.cli.PrepareBatch(ctx, query, opts...)
  89. }