clickhouse.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. package db
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gshopify/service-wrapper/config"
  6. "github.com/gshopify/service-wrapper/db"
  7. "github.com/gshopify/service-wrapper/model"
  8. "github.com/gshopify/service-wrapper/server/middleware"
  9. "github.com/jellydator/ttlcache/v3"
  10. "github.com/mailru/dbr"
  11. _ "github.com/mailru/go-clickhouse"
  12. "gshopper.com/gshopify/products/graphql/generated"
  13. "gshopper.com/gshopify/products/relation"
  14. "net/url"
  15. "strings"
  16. "sync"
  17. )
  18. type clickhouse struct {
  19. ctx context.Context
  20. config *db.Config
  21. session *dbr.Session
  22. cache *ttlcache.Cache[string, any]
  23. }
  24. func New(ctx context.Context, forceDebug bool) (Database, error) {
  25. r := &clickhouse{
  26. ctx: ctx,
  27. config: db.New(),
  28. cache: ttlcache.New[string, any](
  29. ttlcache.WithTTL[string, any](cacheTimeout),
  30. ttlcache.WithCapacity[string, any](cacheCapacity),
  31. ),
  32. }
  33. if err := config.Instance().Load(ctx, r.config); err != nil {
  34. return nil, err
  35. }
  36. if forceDebug {
  37. r.config.Params.Debug = true
  38. }
  39. //goland:noinspection HttpUrlsUsage
  40. source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s",
  41. url.QueryEscape(r.config.Username),
  42. url.QueryEscape(r.config.Password),
  43. r.config.Host,
  44. r.config.Port,
  45. r.config.Database))
  46. if err != nil {
  47. return nil, err
  48. }
  49. kv := make(url.Values)
  50. kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout))
  51. kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout))
  52. kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout))
  53. kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug))
  54. source.RawQuery = kv.Encode()
  55. con, err := dbr.Open("clickhouse", source.String(), nil)
  56. if err != nil {
  57. return nil, fmt.Errorf("could not establish Clickhouse session: %v", err)
  58. }
  59. r.session = con.NewSessionContext(ctx, nil)
  60. if err = r.session.Ping(); err != nil {
  61. return nil, err
  62. }
  63. go r.cache.Start()
  64. return r, nil
  65. }
  66. func (db *clickhouse) ProductCollections(ln model.LanguageCode, id string) ([]*generated.Collection, error) {
  67. var (
  68. collections []*generated.Collection
  69. key = productCollectionKey("product.id=?", id)
  70. l = ttlcache.LoaderFunc[string, any](
  71. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  72. var o []relation.ProductCollection
  73. rows, err := db.session.SelectBySql("SELECT "+
  74. strings.Join(productCollectionSelection(ln), ", ")+
  75. " FROM `"+key.Table()+"`"+
  76. " ARRAY JOIN (SELECT `collections` FROM `product` WHERE `id` = ?) AS cid"+
  77. " WHERE `id` = cid", key.Args()...).
  78. Load(&o)
  79. if rows < 1 || err != nil {
  80. return nil
  81. }
  82. return ttl.Set(key.String(), o, key.TTL())
  83. })
  84. )
  85. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  86. for _, row := range p.Value().([]relation.ProductCollection) {
  87. collections = append(collections, row.As())
  88. }
  89. }
  90. return collections, nil
  91. }
  92. func (db *clickhouse) Product(ln model.LanguageCode, handle *string, id *string) (*generated.Product, error) {
  93. var (
  94. clause = strings.Builder{}
  95. vars = []any{model.ProductStatusActive}
  96. )
  97. clause.WriteString("status=?")
  98. if id != nil {
  99. clause.WriteString(" AND id=?")
  100. vars = append(vars, *id)
  101. }
  102. if handle != nil {
  103. clause.WriteString(" AND handle=?")
  104. vars = append(vars, *handle)
  105. }
  106. var (
  107. key = productKey(clause.String(), vars...)
  108. l = ttlcache.LoaderFunc[string, any](
  109. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  110. o := relation.Product{}
  111. rows, err := db.session.
  112. Select(productSelection(ln)...).
  113. From(key.Table()).
  114. Where(key.Clause(), key.Args()...).
  115. Limit(1).
  116. Load(&o)
  117. if rows < 1 || err != nil {
  118. return nil
  119. }
  120. return ttl.Set(key.String(), o, key.TTL())
  121. })
  122. )
  123. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  124. if p == nil {
  125. return nil, fmt.Errorf("not found")
  126. }
  127. product := p.Value().(relation.Product)
  128. return product.As(), nil
  129. }
  130. func (db *clickhouse) ProductOptions(ln model.LanguageCode, id string) ([]*generated.ProductOption, error) {
  131. var (
  132. options []*generated.ProductOption
  133. key = productOptionKey("product_id=?", id)
  134. l = ttlcache.LoaderFunc[string, any](
  135. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  136. var o []relation.ProductOption
  137. rows, err := db.session.SelectBySql("SELECT "+
  138. strings.Join(productOptionSelection(ln), ", ")+
  139. " FROM `"+key.Table()+"`"+
  140. " ARRAY JOIN (SELECT `options` FROM `product` WHERE `id` = ?) AS oid"+
  141. " WHERE `id` = oid", key.Args()).
  142. Load(&o)
  143. if rows < 1 || err != nil {
  144. return nil
  145. }
  146. return ttl.Set(key.String(), o, key.TTL())
  147. },
  148. )
  149. )
  150. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  151. for _, v := range p.Value().([]relation.ProductOption) {
  152. options = append(options, v.As())
  153. }
  154. }
  155. return options, nil
  156. }
  157. func (db *clickhouse) ProductVariants(ctx *middleware.GShopifyContext, id string) ([]*generated.ProductVariant, error) {
  158. var (
  159. variants []*generated.ProductVariant
  160. selection = []string{
  161. "any(t.id) as id",
  162. "any(t.product_id) as product_id",
  163. "any(t.inventory_item_id) as inventory_item_id",
  164. fmt.Sprintf("any(t.price['%s']) as price", defaultCurrency),
  165. fmt.Sprintf("any(t.compare_at_price['%s']) as compare_at_price", defaultCurrency),
  166. "any(t.position) as position",
  167. "any(t.image) as image",
  168. "any(t.inventory_management) as inventory_management",
  169. "any(t.inventory_policy) as inventory_policy",
  170. "any(t.options) as options",
  171. "any(t.grams) as grams",
  172. "any(t.weight) as weight",
  173. "any(t.weight_unit) as weight_unit",
  174. "any(t.created_at) as created_at",
  175. "any(t.updated_at) as updated_at",
  176. "any(t.published_at) as published_at",
  177. "any(t.deleted_at) as deleted_at",
  178. "sum(inventory_level.available) as available",
  179. fmt.Sprintf("any(product.title['%s']) as title", ctx.Language),
  180. "any(inventory_item.sku) as sku",
  181. "any(inventory_item.barcode) as barcode",
  182. "any(inventory_item.requires_shipping) as requires_shipping",
  183. "any(inventory_item.tracked) as tracked",
  184. "if(tracked, if(available > 0, true, if(inventory_policy == 'continue', true, false)), false) as for_sale",
  185. }
  186. key = productVariantKey("t.product_id=?", id)
  187. l = ttlcache.LoaderFunc[string, any](
  188. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  189. var o []relation.ProductVariant
  190. rows, err := db.session.
  191. Select(selection...).
  192. From(fmt.Sprintf("%s as t", key.Table())).
  193. LeftJoin("product", "product.id = t.product_id").
  194. LeftJoin("inventory_item", "inventory_item.id = t.inventory_item_id").
  195. LeftJoin("inventory_level", "inventory_level.inventory_item_id = t.inventory_item_id").
  196. Where(key.Clause(), key.Args()...).
  197. OrderBy("position").
  198. GroupBy("t.id").
  199. Load(&o)
  200. if rows < 1 || err != nil {
  201. return nil
  202. }
  203. return ttl.Set(key.String(), o, key.TTL())
  204. },
  205. )
  206. )
  207. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  208. if p != nil {
  209. for _, v := range p.Value().([]relation.ProductVariant) {
  210. variants = append(variants, v.As())
  211. }
  212. }
  213. return variants, nil
  214. }
  215. func (db *clickhouse) CollectionProducts(ln model.LanguageCode, id string) ([]*generated.Product, error) {
  216. var (
  217. products []*generated.Product
  218. key = productKey("has(collections, ?)", id)
  219. l = ttlcache.LoaderFunc[string, any](
  220. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  221. var o []relation.Product
  222. rows, err := db.session.
  223. Select(productSelection(ln)...).
  224. From(key.Table()).
  225. Where(key.Clause(), key.Args()...).
  226. Load(&o)
  227. if rows < 1 || err != nil {
  228. return nil
  229. }
  230. return ttl.Set(key.String(), o, key.TTL())
  231. },
  232. )
  233. )
  234. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  235. if p == nil {
  236. return nil, fmt.Errorf("not found")
  237. }
  238. for _, row := range p.Value().([]relation.Product) {
  239. products = append(products, row.As())
  240. }
  241. return products, nil
  242. }
  243. func (db *clickhouse) Ping() error {
  244. return db.session.Ping()
  245. }
  246. func (db *clickhouse) Close() error {
  247. var wg sync.WaitGroup
  248. wg.Add(2)
  249. go func() {
  250. defer wg.Done()
  251. if db.cache != nil {
  252. db.cache.DeleteAll()
  253. db.cache.Stop()
  254. }
  255. }()
  256. go func() {
  257. defer wg.Done()
  258. if db.session != nil {
  259. _ = db.session.Close()
  260. }
  261. }()
  262. return nil
  263. }