clickhouse.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. package db
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gshopify/service-wrapper/config"
  6. "github.com/gshopify/service-wrapper/db"
  7. "github.com/gshopify/service-wrapper/model"
  8. "github.com/gshopify/service-wrapper/server/middleware"
  9. "github.com/jellydator/ttlcache/v3"
  10. "github.com/mailru/dbr"
  11. _ "github.com/mailru/go-clickhouse"
  12. "gshopper.com/gshopify/products/graphql/generated"
  13. "gshopper.com/gshopify/products/relation"
  14. "net/url"
  15. "strings"
  16. "sync"
  17. )
  18. type clickhouse struct {
  19. ctx context.Context
  20. config *db.Config
  21. session *dbr.Session
  22. cache *ttlcache.Cache[string, any]
  23. }
  24. func New(ctx context.Context, forceDebug bool) (Database, error) {
  25. r := &clickhouse{
  26. ctx: ctx,
  27. config: db.New(),
  28. cache: ttlcache.New[string, any](
  29. ttlcache.WithTTL[string, any](cacheTimeout),
  30. ttlcache.WithCapacity[string, any](cacheCapacity),
  31. ),
  32. }
  33. if err := config.Instance().Load(ctx, r.config); err != nil {
  34. return nil, err
  35. }
  36. if forceDebug {
  37. r.config.Params.Debug = true
  38. }
  39. //goland:noinspection HttpUrlsUsage
  40. source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s",
  41. url.QueryEscape(r.config.Username),
  42. url.QueryEscape(r.config.Password),
  43. r.config.Host,
  44. r.config.Port,
  45. r.config.Database))
  46. if err != nil {
  47. return nil, err
  48. }
  49. kv := make(url.Values)
  50. kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout))
  51. kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout))
  52. kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout))
  53. kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug))
  54. source.RawQuery = kv.Encode()
  55. con, err := dbr.Open("clickhouse", source.String(), nil)
  56. if err != nil {
  57. return nil, fmt.Errorf("could not establish Clickhouse session: %v", err)
  58. }
  59. r.session = con.NewSessionContext(ctx, nil)
  60. if err = r.session.Ping(); err != nil {
  61. return nil, err
  62. }
  63. go r.cache.Start()
  64. return r, nil
  65. }
  66. func (db *clickhouse) ProductCollections(ln model.LanguageCode, id string) ([]*generated.Collection, error) {
  67. var (
  68. collections []*generated.Collection
  69. key = productCollectionKey("product.id=?", id)
  70. l = ttlcache.LoaderFunc[string, any](
  71. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  72. var o []relation.Collection
  73. rows, err := db.session.SelectBySql("SELECT "+
  74. ln.SqlFieldSelection("title")+", "+ln.SqlFieldSelection("description")+
  75. ", `id`, `handle`, `thumbnail`, "+
  76. "`created_at`, `updated_at`, `deleted_at` "+
  77. "FROM `product_collection` "+
  78. "ARRAY JOIN (SELECT `collection_ids` FROM `product` WHERE `id` = ?) AS cid "+
  79. "WHERE `id` = cid "+
  80. "ORDER BY `created_at` ASC;", key.Args()...).
  81. Load(&o)
  82. if rows < 1 || err != nil {
  83. return nil
  84. }
  85. return ttl.Set(key.String(), o, key.TTL())
  86. })
  87. )
  88. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  89. if p == nil {
  90. return nil, fmt.Errorf("not found")
  91. }
  92. for _, row := range p.Value().([]relation.Collection) {
  93. collections = append(collections, row.As())
  94. }
  95. return collections, nil
  96. }
  97. func (db *clickhouse) Product(ln model.LanguageCode, handle *string, id *string) (*generated.Product, error) {
  98. var (
  99. clause = strings.Builder{}
  100. vars = []any{relation.ProductStatusPublished}
  101. )
  102. clause.WriteString("status=?")
  103. if id != nil {
  104. clause.WriteString(" AND id=?")
  105. vars = append(vars, *id)
  106. }
  107. if handle != nil {
  108. clause.WriteString(" AND handle=?")
  109. vars = append(vars, *handle)
  110. }
  111. var (
  112. key = productKey(clause.String(), vars...)
  113. l = ttlcache.LoaderFunc[string, any](
  114. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  115. o := relation.Product{}
  116. rows, err := db.session.
  117. Select(productSelection(ln)...).
  118. From(key.Table()).
  119. Where(key.Clause(), key.Args()...).
  120. OrderBy("created_at").
  121. Limit(1).
  122. Load(&o)
  123. if rows < 1 || err != nil {
  124. return nil
  125. }
  126. return ttl.Set(key.String(), o, key.TTL())
  127. })
  128. )
  129. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  130. if p == nil {
  131. return nil, fmt.Errorf("not found")
  132. }
  133. product := p.Value().(relation.Product)
  134. return product.As(), nil
  135. }
  136. func (db *clickhouse) ProductOptions(ln model.LanguageCode, id string) ([]*generated.ProductOption, error) {
  137. var (
  138. options []*generated.ProductOption
  139. key = productOptionKey("product_id=?", id)
  140. l = ttlcache.LoaderFunc[string, any](
  141. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  142. var o []relation.ProductOption
  143. rows, err := db.session.
  144. Select(productOptionSelection(ln)...).
  145. From(key.Table()).
  146. Where(key.Clause(), key.Args()...).
  147. OrderBy("created_at").
  148. Load(&o)
  149. if rows < 1 || err != nil {
  150. return nil
  151. }
  152. return ttl.Set(key.String(), o, key.TTL())
  153. },
  154. )
  155. )
  156. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  157. if p == nil {
  158. return nil, fmt.Errorf("not found")
  159. }
  160. for _, v := range p.Value().([]relation.ProductOption) {
  161. options = append(options, v.As())
  162. }
  163. return options, nil
  164. }
  165. func (db *clickhouse) ProductVariants(ctx *middleware.GShopifyContext, id string) ([]*generated.ProductVariant, error) {
  166. var (
  167. variants []*generated.ProductVariant
  168. key = productVariantKey("product_id=?", id)
  169. l = ttlcache.LoaderFunc[string, any](
  170. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  171. var o []relation.ProductVariant
  172. rows, err := db.session.
  173. Select(
  174. "id", "product_id", "options",
  175. fmt.Sprintf("price['%s'] as price", defaultCurrency),
  176. fmt.Sprintf("unit_price['%s'] as unit_price", defaultCurrency),
  177. fmt.Sprintf("compare_at_price['%s'] as compare_at_price", defaultCurrency),
  178. "barcode", "sku", "hs_code", "origin_country",
  179. "allow_backorder", "manage_inventory", "unit_price_measurement",
  180. "weight", "wight_unit", "created_at", "updated_at", "deleted_at",
  181. ctx.Language.SqlFieldSelection("title"),
  182. ).
  183. From(key.Table()).
  184. Where(key.Clause(), key.Args()...).
  185. OrderBy("created_at").
  186. Load(&o)
  187. if rows < 1 || err != nil {
  188. return nil
  189. }
  190. return ttl.Set(key.String(), o, key.TTL())
  191. },
  192. )
  193. )
  194. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  195. if p != nil {
  196. for _, v := range p.Value().([]relation.ProductVariant) {
  197. variants = append(variants, v.As())
  198. }
  199. }
  200. return variants, nil
  201. }
  202. func (db *clickhouse) CollectionProducts(ln model.LanguageCode, id string) ([]*generated.Product, error) {
  203. var (
  204. products []*generated.Product
  205. key = productKey("has(collection_ids, ?)", id)
  206. l = ttlcache.LoaderFunc[string, any](
  207. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  208. var o []relation.Product
  209. rows, err := db.session.
  210. Select(productSelection(ln)...).
  211. From(key.Table()).
  212. Where(key.Clause(), key.Args()...).
  213. OrderBy("created_at").
  214. Load(&o)
  215. if rows < 1 || err != nil {
  216. return nil
  217. }
  218. return ttl.Set(key.String(), o, key.TTL())
  219. },
  220. )
  221. )
  222. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  223. if p == nil {
  224. return nil, fmt.Errorf("not found")
  225. }
  226. for _, row := range p.Value().([]relation.Product) {
  227. products = append(products, row.As())
  228. }
  229. return products, nil
  230. }
  231. func (db *clickhouse) Ping() error {
  232. return db.session.Ping()
  233. }
  234. func (db *clickhouse) Close() error {
  235. var wg sync.WaitGroup
  236. wg.Add(2)
  237. go func() {
  238. defer wg.Done()
  239. if db.cache != nil {
  240. db.cache.DeleteAll()
  241. db.cache.Stop()
  242. }
  243. }()
  244. go func() {
  245. defer wg.Done()
  246. if db.session != nil {
  247. _ = db.session.Close()
  248. }
  249. }()
  250. return nil
  251. }