clickhouse.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package db
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gshopify/service-wrapper/config"
  6. "github.com/gshopify/service-wrapper/db"
  7. "github.com/jellydator/ttlcache/v3"
  8. "github.com/mailru/dbr"
  9. _ "github.com/mailru/go-clickhouse"
  10. "net/url"
  11. "sync"
  12. )
  13. type clickhouse struct {
  14. ctx context.Context
  15. config *db.Config
  16. session *dbr.Session
  17. cache *ttlcache.Cache[string, any]
  18. }
  19. func Clickhouse(ctx context.Context, forceDebug bool) (Database, error) {
  20. r := &clickhouse{
  21. ctx: ctx,
  22. config: db.New(),
  23. cache: ttlcache.New[string, any](
  24. ttlcache.WithTTL[string, any](cacheTimeout),
  25. ttlcache.WithCapacity[string, any](cacheCapacity),
  26. ),
  27. }
  28. if err := config.Instance().Load(ctx, r.config); err != nil {
  29. return nil, err
  30. }
  31. if forceDebug {
  32. r.config.Params.Debug = true
  33. }
  34. //goland:noinspection HttpUrlsUsage
  35. source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s",
  36. url.QueryEscape(r.config.Username),
  37. url.QueryEscape(r.config.Password),
  38. r.config.Host,
  39. r.config.Port,
  40. r.config.Database))
  41. if err != nil {
  42. return nil, err
  43. }
  44. kv := make(url.Values)
  45. kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout))
  46. kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout))
  47. kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout))
  48. kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug))
  49. source.RawQuery = kv.Encode()
  50. con, err := dbr.Open("clickhouse", source.String(), nil)
  51. if err != nil {
  52. return nil, fmt.Errorf("could not establish Clickhouse session: %v", err)
  53. }
  54. r.session = con.NewSessionContext(ctx, nil)
  55. if err = r.session.Ping(); err != nil {
  56. return nil, err
  57. }
  58. go r.cache.Start()
  59. return r, nil
  60. }
  61. func (db *clickhouse) Ping() error {
  62. return db.session.Ping()
  63. }
  64. func (db *clickhouse) Close() error {
  65. var wg sync.WaitGroup
  66. wg.Add(2)
  67. go func() {
  68. defer wg.Done()
  69. if db.cache != nil {
  70. db.cache.DeleteAll()
  71. db.cache.Stop()
  72. }
  73. }()
  74. go func() {
  75. defer wg.Done()
  76. if db.session != nil {
  77. _ = db.session.Close()
  78. }
  79. }()
  80. return nil
  81. }