2
0

clickhouse.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package db
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gshopify/service-wrapper/config"
  6. "github.com/gshopify/service-wrapper/db"
  7. "github.com/gshopify/service-wrapper/model"
  8. "github.com/jellydator/ttlcache/v3"
  9. "github.com/mailru/dbr"
  10. _ "github.com/mailru/go-clickhouse"
  11. "gshopper.com/gshopify/shop/graphql/generated"
  12. "gshopper.com/gshopify/shop/relation"
  13. "net/url"
  14. "sync"
  15. )
  16. type clickhouse struct {
  17. ctx context.Context
  18. config *db.Config
  19. session *dbr.Session
  20. cache *ttlcache.Cache[string, any]
  21. }
  22. func New(ctx context.Context, forceDebug bool) (Database, error) {
  23. r := &clickhouse{
  24. ctx: ctx,
  25. config: db.New(),
  26. cache: ttlcache.New[string, any](
  27. ttlcache.WithTTL[string, any](cacheTimeout),
  28. ttlcache.WithCapacity[string, any](cacheCapacity),
  29. ),
  30. }
  31. if err := config.Instance().Load(ctx, r.config); err != nil {
  32. return nil, err
  33. }
  34. if forceDebug {
  35. r.config.Params.Debug = true
  36. }
  37. //goland:noinspection HttpUrlsUsage
  38. source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s",
  39. url.QueryEscape(r.config.Username),
  40. url.QueryEscape(r.config.Password),
  41. r.config.Host,
  42. r.config.Port,
  43. r.config.Database))
  44. if err != nil {
  45. return nil, err
  46. }
  47. kv := make(url.Values)
  48. kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout))
  49. kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout))
  50. kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout))
  51. kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug))
  52. source.RawQuery = kv.Encode()
  53. con, err := dbr.Open("clickhouse", source.String(), nil)
  54. if err != nil {
  55. return nil, fmt.Errorf("could not establish Clickhouse session: %v", err)
  56. }
  57. r.session = con.NewSessionContext(ctx, nil)
  58. if err = r.session.Ping(); err != nil {
  59. return nil, err
  60. }
  61. go r.cache.Start()
  62. return r, nil
  63. }
  64. func (db *clickhouse) Location(ln model.LanguageCode, id string) (*generated.Location, error) {
  65. var (
  66. key = locationKey("id=?", id)
  67. l = ttlcache.LoaderFunc[string, any](
  68. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  69. var loc relation.Location
  70. rows, err := db.session.
  71. Select(locationSelection(ln)...).
  72. From(key.Table()).
  73. Where(key.Clause(), key.Args()...).
  74. Limit(1).
  75. Load(&loc)
  76. if rows < 1 || err != nil {
  77. return nil
  78. }
  79. return ttl.Set(key.String(), loc, key.TTL())
  80. },
  81. )
  82. )
  83. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  84. if p == nil {
  85. return nil, fmt.Errorf("not found")
  86. }
  87. loc := p.Value().(relation.Location)
  88. return loc.As(), nil
  89. }
  90. func (db *clickhouse) Ping() error {
  91. return db.session.Ping()
  92. }
  93. func (db *clickhouse) Close() error {
  94. var wg sync.WaitGroup
  95. wg.Add(2)
  96. go func() {
  97. defer wg.Done()
  98. if db.cache != nil {
  99. db.cache.DeleteAll()
  100. db.cache.Stop()
  101. }
  102. }()
  103. go func() {
  104. defer wg.Done()
  105. if db.session != nil {
  106. _ = db.session.Close()
  107. }
  108. }()
  109. return nil
  110. }