clickhouse.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. package db
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gshopify/service-wrapper/config"
  6. "github.com/gshopify/service-wrapper/db"
  7. "github.com/gshopify/service-wrapper/model"
  8. "github.com/gshopify/service-wrapper/server/middleware"
  9. "github.com/jellydator/ttlcache/v3"
  10. "github.com/mailru/dbr"
  11. _ "github.com/mailru/go-clickhouse"
  12. "gshopper.com/gshopify/products/graphql/generated"
  13. "gshopper.com/gshopify/products/relation"
  14. "net/url"
  15. "strings"
  16. "sync"
  17. )
  18. type clickhouse struct {
  19. ctx context.Context
  20. config *db.Config
  21. session *dbr.Session
  22. cache *ttlcache.Cache[string, any]
  23. }
  24. func New(ctx context.Context, forceDebug bool) (Database, error) {
  25. r := &clickhouse{
  26. ctx: ctx,
  27. config: db.New(),
  28. cache: ttlcache.New[string, any](
  29. ttlcache.WithTTL[string, any](cacheTimeout),
  30. ttlcache.WithCapacity[string, any](cacheCapacity),
  31. ),
  32. }
  33. if err := config.Instance().Load(ctx, r.config); err != nil {
  34. return nil, err
  35. }
  36. if forceDebug {
  37. r.config.Params.Debug = true
  38. }
  39. //goland:noinspection HttpUrlsUsage
  40. source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s",
  41. url.QueryEscape(r.config.Username),
  42. url.QueryEscape(r.config.Password),
  43. r.config.Host,
  44. r.config.Port,
  45. r.config.Database))
  46. if err != nil {
  47. return nil, err
  48. }
  49. kv := make(url.Values)
  50. kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout))
  51. kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout))
  52. kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout))
  53. kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug))
  54. source.RawQuery = kv.Encode()
  55. con, err := dbr.Open("clickhouse", source.String(), nil)
  56. if err != nil {
  57. return nil, fmt.Errorf("could not establish Clickhouse session: %v", err)
  58. }
  59. r.session = con.NewSessionContext(ctx, nil)
  60. if err = r.session.Ping(); err != nil {
  61. return nil, err
  62. }
  63. go r.cache.Start()
  64. return r, nil
  65. }
  66. func (db *clickhouse) ProductCollections(ln model.LanguageCode, id string) ([]*generated.Collection, error) {
  67. var (
  68. collections []*generated.Collection
  69. key = productCollectionKey("product.id=?", id)
  70. l = ttlcache.LoaderFunc[string, any](
  71. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  72. var o []relation.ProductCollection
  73. rows, err := db.session.SelectBySql("SELECT "+
  74. strings.Join(productCollectionSelection(ln), ", ")+
  75. " FROM `"+key.Table()+"`"+
  76. " ARRAY JOIN (SELECT `collections` FROM `product` WHERE `id` = ?) AS cid"+
  77. " WHERE `id` = cid", key.Args()...).
  78. Load(&o)
  79. if rows < 1 || err != nil {
  80. return nil
  81. }
  82. return ttl.Set(key.String(), o, key.TTL())
  83. })
  84. )
  85. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  86. for _, row := range p.Value().([]relation.ProductCollection) {
  87. collections = append(collections, row.As())
  88. }
  89. }
  90. return collections, nil
  91. }
  92. func (db *clickhouse) Product(ln model.LanguageCode, handle *string, id *string) (*generated.Product, error) {
  93. var (
  94. clause = strings.Builder{}
  95. vars = []any{model.ProductStatusActive}
  96. )
  97. clause.WriteString("t.status=?")
  98. if id != nil {
  99. clause.WriteString(" AND t.id=?")
  100. vars = append(vars, *id)
  101. }
  102. if handle != nil {
  103. clause.WriteString(" AND t.handle=?")
  104. vars = append(vars, *handle)
  105. }
  106. var (
  107. key = productKey(clause.String(), vars...)
  108. l = ttlcache.LoaderFunc[string, any](
  109. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  110. o := relation.Product{}
  111. rows, err := db.session.
  112. Select(productSelection(ln, defaultCurrency)...).
  113. From(fmt.Sprintf("%s as t", key.Table())).
  114. LeftJoin("product_variant", "product_variant.product_id = t.id").
  115. LeftJoin("inventory_item", "inventory_item.id = product_variant.inventory_item_id").
  116. LeftJoin("inventory_level", "inventory_level.inventory_item_id = product_variant.inventory_item_id").
  117. Where(key.Clause(), key.Args()...).
  118. GroupBy("t.id").
  119. Limit(1).
  120. Load(&o)
  121. if rows < 1 || err != nil {
  122. return nil
  123. }
  124. return ttl.Set(key.String(), o, key.TTL())
  125. })
  126. )
  127. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  128. if p == nil {
  129. return nil, fmt.Errorf("not found")
  130. }
  131. product := p.Value().(relation.Product)
  132. return product.As(), nil
  133. }
  134. func (db *clickhouse) ProductOptions(ln model.LanguageCode, id string) ([]*generated.ProductOption, error) {
  135. var (
  136. options []*generated.ProductOption
  137. key = productOptionKey("product_id=?", id)
  138. l = ttlcache.LoaderFunc[string, any](
  139. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  140. var o []relation.ProductOption
  141. rows, err := db.session.SelectBySql("SELECT "+
  142. strings.Join(productOptionSelection(ln), ", ")+
  143. " FROM `"+key.Table()+"`"+
  144. " ARRAY JOIN (SELECT `options` FROM `product` WHERE `id` = ?) AS oid"+
  145. " WHERE `id` = oid", key.Args()).
  146. Load(&o)
  147. if rows < 1 || err != nil {
  148. return nil
  149. }
  150. return ttl.Set(key.String(), o, key.TTL())
  151. },
  152. )
  153. )
  154. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  155. for _, v := range p.Value().([]relation.ProductOption) {
  156. options = append(options, v.As())
  157. }
  158. }
  159. return options, nil
  160. }
  161. func (db *clickhouse) ProductVariants(ctx *middleware.GShopifyContext, id string) ([]*generated.ProductVariant, error) {
  162. var (
  163. variants []*generated.ProductVariant
  164. key = productVariantKey("t.product_id=?", id)
  165. l = ttlcache.LoaderFunc[string, any](
  166. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  167. var o []relation.ProductVariant
  168. rows, err := db.session.
  169. Select(productVariantSelection(ctx.Language, defaultCurrency)...).
  170. From(fmt.Sprintf("%s as t", key.Table())).
  171. LeftJoin("product", "product.id = t.product_id").
  172. LeftJoin("inventory_item", "inventory_item.id = t.inventory_item_id").
  173. LeftJoin("inventory_level", "inventory_level.inventory_item_id = t.inventory_item_id").
  174. Where(key.Clause(), key.Args()...).
  175. OrderBy("position").
  176. GroupBy("t.id").
  177. Load(&o)
  178. if rows < 1 || err != nil {
  179. return nil
  180. }
  181. return ttl.Set(key.String(), o, key.TTL())
  182. },
  183. )
  184. )
  185. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  186. if p != nil {
  187. for _, v := range p.Value().([]relation.ProductVariant) {
  188. variants = append(variants, v.As())
  189. }
  190. }
  191. return variants, nil
  192. }
  193. func (db *clickhouse) CollectionProducts(ln model.LanguageCode, id string) ([]*generated.Product, error) {
  194. var (
  195. products []*generated.Product
  196. key = productKey("has(t.collections, ?)", id)
  197. l = ttlcache.LoaderFunc[string, any](
  198. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  199. var o []relation.Product
  200. rows, err := db.session.
  201. Select(productSelection(ln, defaultCurrency)...).
  202. From(fmt.Sprintf("%s as t", key.Table())).
  203. LeftJoin("product_variant", "product_variant.product_id = t.id").
  204. LeftJoin("inventory_item", "inventory_item.id = product_variant.inventory_item_id").
  205. LeftJoin("inventory_level", "inventory_level.inventory_item_id = product_variant.inventory_item_id").
  206. Where(key.Clause(), key.Args()...).
  207. GroupBy("t.id").
  208. Load(&o)
  209. if rows < 1 || err != nil {
  210. return nil
  211. }
  212. return ttl.Set(key.String(), o, key.TTL())
  213. },
  214. )
  215. )
  216. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  217. if p == nil {
  218. return nil, fmt.Errorf("not found")
  219. }
  220. for _, row := range p.Value().([]relation.Product) {
  221. products = append(products, row.As())
  222. }
  223. return products, nil
  224. }
  225. func (db *clickhouse) ProductVariantOptions(ln model.LanguageCode, id string) ([]*generated.SelectedOption, error) {
  226. var (
  227. options []*generated.SelectedOption
  228. key = productOptionKey("product_variant.id = ?", id)
  229. l = ttlcache.LoaderFunc[string, any](
  230. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  231. var o []relation.ProductOption
  232. rows, err := db.session.SelectBySql("SELECT "+
  233. strings.Join(productOptionSelectedSelection(ln), ", ")+
  234. " FROM `"+key.Table()+"`"+
  235. " ARRAY JOIN (SELECT `options` from `product_variant` where `id` = ?) as opt"+
  236. " WHERE id = tupleElement(opt, 1)"+
  237. " ORDER BY position", key.Args()).
  238. Load(&o)
  239. if rows < 1 || err != nil {
  240. return nil
  241. }
  242. return ttl.Set(key.String(), o, key.TTL())
  243. },
  244. )
  245. )
  246. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  247. for _, option := range p.Value().([]relation.ProductOption) {
  248. options = append(options, option.AsSelected())
  249. }
  250. }
  251. return options, nil
  252. }
  253. func (db *clickhouse) Ping() error {
  254. return db.session.Ping()
  255. }
  256. func (db *clickhouse) Close() error {
  257. var wg sync.WaitGroup
  258. wg.Add(2)
  259. go func() {
  260. defer wg.Done()
  261. if db.cache != nil {
  262. db.cache.DeleteAll()
  263. db.cache.Stop()
  264. }
  265. }()
  266. go func() {
  267. defer wg.Done()
  268. if db.session != nil {
  269. _ = db.session.Close()
  270. }
  271. }()
  272. return nil
  273. }