2
0

clickhouse.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package db
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gshopify/service-wrapper/config"
  6. "github.com/gshopify/service-wrapper/db"
  7. "github.com/gshopify/service-wrapper/model"
  8. "github.com/jellydator/ttlcache/v3"
  9. "github.com/mailru/dbr"
  10. _ "github.com/mailru/go-clickhouse"
  11. "gshopper.com/gshopify/products/graphql/generated"
  12. "gshopper.com/gshopify/products/relation"
  13. "net/url"
  14. "strings"
  15. "sync"
  16. )
  17. type clickhouse struct {
  18. ctx context.Context
  19. config *db.Config
  20. session *dbr.Session
  21. cache *ttlcache.Cache[string, any]
  22. }
  23. func New(ctx context.Context, forceDebug bool) (Database, error) {
  24. r := &clickhouse{
  25. ctx: ctx,
  26. config: db.New(),
  27. cache: ttlcache.New[string, any](
  28. ttlcache.WithTTL[string, any](cacheTimeout),
  29. ttlcache.WithCapacity[string, any](cacheCapacity),
  30. ),
  31. }
  32. if err := config.Instance().Load(ctx, r.config); err != nil {
  33. return nil, err
  34. }
  35. if forceDebug {
  36. r.config.Params.Debug = true
  37. }
  38. //goland:noinspection HttpUrlsUsage
  39. source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s",
  40. url.QueryEscape(r.config.Username),
  41. url.QueryEscape(r.config.Password),
  42. r.config.Host,
  43. r.config.Port,
  44. r.config.Database))
  45. if err != nil {
  46. return nil, err
  47. }
  48. kv := make(url.Values)
  49. kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout))
  50. kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout))
  51. kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout))
  52. kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug))
  53. source.RawQuery = kv.Encode()
  54. con, err := dbr.Open("clickhouse", source.String(), nil)
  55. if err != nil {
  56. return nil, fmt.Errorf("could not establish Clickhouse session: %v", err)
  57. }
  58. r.session = con.NewSessionContext(ctx, nil)
  59. if err = r.session.Ping(); err != nil {
  60. return nil, err
  61. }
  62. go r.cache.Start()
  63. return r, nil
  64. }
  65. func (db *clickhouse) ProductCollections(ln model.LanguageCode, id string) ([]*generated.Collection, error) {
  66. var (
  67. collections []*generated.Collection
  68. key = productCollectionKey("product.id=?", id)
  69. l = ttlcache.LoaderFunc[string, any](
  70. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  71. var o []relation.Collection
  72. rows, err := db.session.SelectBySql("SELECT "+
  73. ln.SqlFieldSelection("title")+", "+ln.SqlFieldSelection("description")+
  74. ", `id`, `handle`, `thumbnail`, "+
  75. "`created_at`, `updated_at`, `deleted_at` "+
  76. "FROM `product_collection` "+
  77. "ARRAY JOIN (SELECT `collection_ids` FROM `product` WHERE `id` = ?) AS cid "+
  78. "WHERE `id` = cid "+
  79. "ORDER BY `created_at` ASC;", key.Args()...).
  80. Load(&o)
  81. if rows < 1 || err != nil {
  82. return nil
  83. }
  84. return ttl.Set(key.String(), o, key.TTL())
  85. })
  86. )
  87. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  88. if p == nil {
  89. return nil, fmt.Errorf("not found")
  90. }
  91. for _, row := range p.Value().([]relation.Collection) {
  92. collections = append(collections, row.As())
  93. }
  94. return collections, nil
  95. }
  96. func (db *clickhouse) Product(ln model.LanguageCode, handle *string, id *string) (*generated.Product, error) {
  97. var (
  98. clause = strings.Builder{}
  99. vars = []any{relation.ProductStatusPublished}
  100. )
  101. clause.WriteString("status=?")
  102. if id != nil {
  103. clause.WriteString(" AND id=?")
  104. vars = append(vars, *id)
  105. }
  106. if handle != nil {
  107. clause.WriteString(" AND handle=?")
  108. vars = append(vars, *handle)
  109. }
  110. var (
  111. key = productKey(clause.String(), vars...)
  112. l = ttlcache.LoaderFunc[string, any](
  113. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  114. o := relation.Product{}
  115. rows, err := db.session.
  116. Select(productSelection(ln)...).
  117. From(key.Table()).
  118. Where(key.Clause(), key.Args()...).
  119. OrderBy("created_at").
  120. Limit(1).
  121. Load(&o)
  122. if rows < 1 || err != nil {
  123. return nil
  124. }
  125. return ttl.Set(key.String(), o, key.TTL())
  126. })
  127. )
  128. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  129. if p == nil {
  130. return nil, fmt.Errorf("not found")
  131. }
  132. product := p.Value().(relation.Product)
  133. return product.As(), nil
  134. }
  135. func (db *clickhouse) ProductOptions(ln model.LanguageCode, id string) ([]*generated.ProductOption, error) {
  136. var (
  137. options []*generated.ProductOption
  138. key = productOptionKey("product_id=?", id)
  139. l = ttlcache.LoaderFunc[string, any](
  140. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  141. var o []relation.ProductOption
  142. rows, err := db.session.
  143. Select(productOptionSelection(ln)...).
  144. From(key.Table()).
  145. Where(key.Clause(), key.Args()...).
  146. OrderBy("created_at").
  147. Load(&o)
  148. if rows < 1 || err != nil {
  149. return nil
  150. }
  151. return ttl.Set(key.String(), o, key.TTL())
  152. },
  153. )
  154. )
  155. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  156. if p == nil {
  157. return nil, fmt.Errorf("not found")
  158. }
  159. for _, v := range p.Value().([]relation.ProductOption) {
  160. options = append(options, v.As())
  161. }
  162. return options, nil
  163. }
  164. func (db *clickhouse) CollectionProducts(ln model.LanguageCode, id string) ([]*generated.Product, error) {
  165. var (
  166. products []*generated.Product
  167. key = productKey("has(collection_ids, ?)", id)
  168. l = ttlcache.LoaderFunc[string, any](
  169. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  170. var o []relation.Product
  171. rows, err := db.session.
  172. Select(productSelection(ln)...).
  173. From(key.Table()).
  174. Where(key.Clause(), key.Args()...).
  175. OrderBy("created_at").
  176. Load(&o)
  177. if rows < 1 || err != nil {
  178. return nil
  179. }
  180. return ttl.Set(key.String(), o, key.TTL())
  181. },
  182. )
  183. )
  184. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  185. if p == nil {
  186. return nil, fmt.Errorf("not found")
  187. }
  188. for _, row := range p.Value().([]relation.Product) {
  189. products = append(products, row.As())
  190. }
  191. return products, nil
  192. }
  193. func (db *clickhouse) Ping() error {
  194. return db.session.Ping()
  195. }
  196. func (db *clickhouse) Close() error {
  197. var wg sync.WaitGroup
  198. wg.Add(2)
  199. go func() {
  200. defer wg.Done()
  201. if db.cache != nil {
  202. db.cache.DeleteAll()
  203. db.cache.Stop()
  204. }
  205. }()
  206. go func() {
  207. defer wg.Done()
  208. if db.session != nil {
  209. _ = db.session.Close()
  210. }
  211. }()
  212. return nil
  213. }