clickhouse.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. package db
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gshopify/service-wrapper/config"
  6. "github.com/gshopify/service-wrapper/db"
  7. "github.com/gshopify/service-wrapper/model"
  8. "github.com/jellydator/ttlcache/v3"
  9. "github.com/mailru/dbr"
  10. _ "github.com/mailru/go-clickhouse"
  11. "gshopper.com/gshopify/products/graphql/generated"
  12. "gshopper.com/gshopify/products/relation"
  13. "net/url"
  14. "strings"
  15. "sync"
  16. )
  17. type clickhouse struct {
  18. ctx context.Context
  19. config *db.Config
  20. session *dbr.Session
  21. cache *ttlcache.Cache[string, any]
  22. }
  23. func New(ctx context.Context, forceDebug bool) (Database, error) {
  24. r := &clickhouse{
  25. ctx: ctx,
  26. config: db.New(),
  27. cache: ttlcache.New[string, any](
  28. ttlcache.WithTTL[string, any](cacheTimeout),
  29. ttlcache.WithCapacity[string, any](cacheCapacity),
  30. ),
  31. }
  32. if err := config.Instance().Load(ctx, r.config); err != nil {
  33. return nil, err
  34. }
  35. if forceDebug {
  36. r.config.Params.Debug = true
  37. }
  38. //goland:noinspection HttpUrlsUsage
  39. source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s",
  40. url.QueryEscape(r.config.Username),
  41. url.QueryEscape(r.config.Password),
  42. r.config.Host,
  43. r.config.Port,
  44. r.config.Database))
  45. if err != nil {
  46. return nil, err
  47. }
  48. kv := make(url.Values)
  49. kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout))
  50. kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout))
  51. kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout))
  52. kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug))
  53. source.RawQuery = kv.Encode()
  54. con, err := dbr.Open("clickhouse", source.String(), nil)
  55. if err != nil {
  56. return nil, fmt.Errorf("could not establish Clickhouse session: %v", err)
  57. }
  58. r.session = con.NewSessionContext(ctx, nil)
  59. if err = r.session.Ping(); err != nil {
  60. return nil, err
  61. }
  62. go r.cache.Start()
  63. return r, nil
  64. }
  65. func (db *clickhouse) ProductCollections(ln model.LanguageCode, id string) ([]*generated.Collection, error) {
  66. var (
  67. collections []*generated.Collection
  68. key = productCollectionKey("product.id=?", id)
  69. l = ttlcache.LoaderFunc[string, any](
  70. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  71. var o []relation.Collection
  72. rows, err := db.session.SelectBySql("SELECT "+
  73. ln.SqlFieldSelection("title")+", "+ln.SqlFieldSelection("description")+", `id`, `handle`, `thumbnail`, "+
  74. "`created_at`, `updated_at`, `deleted_at` "+
  75. "FROM `product_collection` "+
  76. "ARRAY JOIN (SELECT `collection_ids` FROM `product` WHERE `id` = ?) AS cid "+
  77. "WHERE `id` = cid "+
  78. "ORDER BY `created_at` ASC;", key.Args()...).
  79. Load(&o)
  80. if rows < 1 || err != nil {
  81. return nil
  82. }
  83. return ttl.Set(key.String(), o, key.TTL())
  84. })
  85. )
  86. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  87. if p == nil {
  88. return nil, fmt.Errorf("not found")
  89. }
  90. for _, row := range p.Value().([]relation.Collection) {
  91. collections = append(collections, row.As())
  92. }
  93. return collections, nil
  94. }
  95. func (db *clickhouse) Product(ln model.LanguageCode, handle *string, id *string) (*generated.Product, error) {
  96. var (
  97. clause = strings.Builder{}
  98. vars = []any{relation.ProductStatusPublished}
  99. )
  100. clause.WriteString("status=?")
  101. if id != nil {
  102. clause.WriteString(" AND id=?")
  103. vars = append(vars, *id)
  104. }
  105. if handle != nil {
  106. clause.WriteString(" AND handle=?")
  107. vars = append(vars, *handle)
  108. }
  109. var (
  110. key = productKey(clause.String(), vars...)
  111. l = ttlcache.LoaderFunc[string, any](
  112. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  113. o := relation.Product{}
  114. rows, err := db.session.
  115. Select(productSelection(ln)...).
  116. From(key.Table()).
  117. Where(key.Clause(), key.Args()...).
  118. OrderBy("created_at").
  119. Limit(1).
  120. Load(&o)
  121. if rows < 1 || err != nil {
  122. return nil
  123. }
  124. return ttl.Set(key.String(), o, key.TTL())
  125. })
  126. )
  127. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  128. if p == nil {
  129. return nil, fmt.Errorf("not found")
  130. }
  131. product := p.Value().(relation.Product)
  132. return product.As(), nil
  133. }
  134. func (db *clickhouse) ProductOptions(ln model.LanguageCode, id string) ([]*generated.ProductOption, error) {
  135. var options []*generated.ProductOption
  136. var o []relation.ProductOption
  137. _, err := db.session.
  138. Select(
  139. "id",
  140. "product_id",
  141. "created_at", "updated_at", "deleted_at",
  142. ln.SqlFieldSelection("name"),
  143. ln.SqlArraySelection("values")).
  144. From("product_option").
  145. Where("product_id=?", id).
  146. OrderBy("created_at").
  147. Load(&o)
  148. if err != nil {
  149. return nil, err
  150. }
  151. for _, v := range o {
  152. options = append(options, v.As())
  153. }
  154. return options, nil
  155. }
  156. func (db *clickhouse) CollectionProducts(ln model.LanguageCode, id string) ([]*generated.Product, error) {
  157. var (
  158. products []*generated.Product
  159. key = productKey("has(collection_ids, ?)", id)
  160. l = ttlcache.LoaderFunc[string, any](
  161. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  162. var o []relation.Product
  163. rows, err := db.session.
  164. Select(productSelection(ln)...).
  165. From(key.Table()).
  166. Where(key.Clause(), key.Args()...).
  167. OrderBy("created_at").
  168. Load(&o)
  169. if rows < 1 || err != nil {
  170. return nil
  171. }
  172. return ttl.Set(key.String(), o, key.TTL())
  173. },
  174. )
  175. )
  176. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  177. if p == nil {
  178. return nil, fmt.Errorf("not found")
  179. }
  180. for _, row := range p.Value().([]relation.Product) {
  181. products = append(products, row.As())
  182. }
  183. return products, nil
  184. }
  185. func (db *clickhouse) Ping() error {
  186. return db.session.Ping()
  187. }
  188. func (db *clickhouse) Close() error {
  189. var wg sync.WaitGroup
  190. wg.Add(2)
  191. go func() {
  192. defer wg.Done()
  193. if db.cache != nil {
  194. db.cache.DeleteAll()
  195. db.cache.Stop()
  196. }
  197. }()
  198. go func() {
  199. defer wg.Done()
  200. if db.session != nil {
  201. _ = db.session.Close()
  202. }
  203. }()
  204. return nil
  205. }