clickhouse.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. package db
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gshopify/service-wrapper/config"
  6. "github.com/gshopify/service-wrapper/db"
  7. "github.com/gshopify/service-wrapper/model"
  8. "github.com/gshopify/service-wrapper/server/middleware"
  9. "github.com/jellydator/ttlcache/v3"
  10. "github.com/mailru/dbr"
  11. _ "github.com/mailru/go-clickhouse"
  12. "gshopper.com/gshopify/products/graphql/generated"
  13. "gshopper.com/gshopify/products/relation"
  14. "net/url"
  15. "strings"
  16. "sync"
  17. )
  18. type clickhouse struct {
  19. ctx context.Context
  20. config *db.Config
  21. session *dbr.Session
  22. cache *ttlcache.Cache[string, any]
  23. }
  24. func New(ctx context.Context, forceDebug bool) (Database, error) {
  25. r := &clickhouse{
  26. ctx: ctx,
  27. config: db.New(),
  28. cache: ttlcache.New[string, any](
  29. ttlcache.WithTTL[string, any](cacheTimeout),
  30. ttlcache.WithCapacity[string, any](cacheCapacity),
  31. ),
  32. }
  33. if err := config.Instance().Load(ctx, r.config); err != nil {
  34. return nil, err
  35. }
  36. if forceDebug {
  37. r.config.Params.Debug = true
  38. }
  39. //goland:noinspection HttpUrlsUsage
  40. source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s",
  41. url.QueryEscape(r.config.Username),
  42. url.QueryEscape(r.config.Password),
  43. r.config.Host,
  44. r.config.Port,
  45. r.config.Database))
  46. if err != nil {
  47. return nil, err
  48. }
  49. kv := make(url.Values)
  50. kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout))
  51. kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout))
  52. kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout))
  53. kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug))
  54. source.RawQuery = kv.Encode()
  55. con, err := dbr.Open("clickhouse", source.String(), nil)
  56. if err != nil {
  57. return nil, fmt.Errorf("could not establish Clickhouse session: %v", err)
  58. }
  59. r.session = con.NewSessionContext(ctx, nil)
  60. if err = r.session.Ping(); err != nil {
  61. return nil, err
  62. }
  63. go r.cache.Start()
  64. return r, nil
  65. }
  66. func (db *clickhouse) Collections(ln model.LanguageCode) ([]*generated.Collection, error) {
  67. var (
  68. collections []*generated.Collection
  69. key = productCollectionKey("list")
  70. l = ttlcache.LoaderFunc[string, any](
  71. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  72. var o []relation.ProductCollection
  73. rows, err := db.session.
  74. Select(productCollectionSelection(ln)...).
  75. From(key.Table()).
  76. Load(&o)
  77. if rows < 1 || err != nil {
  78. return nil
  79. }
  80. return ttl.Set(key.String(), o, key.TTL())
  81. },
  82. )
  83. )
  84. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  85. for _, row := range p.Value().([]relation.ProductCollection) {
  86. collections = append(collections, row.As())
  87. }
  88. }
  89. return collections, nil
  90. }
  91. func (db *clickhouse) ProductCollections(ln model.LanguageCode, id string) ([]*generated.Collection, error) {
  92. var (
  93. collections []*generated.Collection
  94. key = productCollectionKey("product.id=?", id)
  95. l = ttlcache.LoaderFunc[string, any](
  96. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  97. var o []relation.ProductCollection
  98. rows, err := db.session.SelectBySql("SELECT "+
  99. strings.Join(productCollectionSelection(ln), ", ")+
  100. " FROM `"+key.Table()+"`"+
  101. " ARRAY JOIN (SELECT `collections` FROM `product` WHERE `id` = ?) AS cid"+
  102. " WHERE `id` = cid", key.Args()...).
  103. Load(&o)
  104. if rows < 1 || err != nil {
  105. return nil
  106. }
  107. return ttl.Set(key.String(), o, key.TTL())
  108. })
  109. )
  110. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  111. for _, row := range p.Value().([]relation.ProductCollection) {
  112. collections = append(collections, row.As())
  113. }
  114. }
  115. return collections, nil
  116. }
  117. func (db *clickhouse) Product(ln model.LanguageCode, handle *string, id *string) (*generated.Product, error) {
  118. var (
  119. clause = strings.Builder{}
  120. vars = []any{model.ProductStatusActive}
  121. )
  122. clause.WriteString("t.status=?")
  123. if id != nil {
  124. clause.WriteString(" AND t.id=?")
  125. vars = append(vars, *id)
  126. }
  127. if handle != nil {
  128. clause.WriteString(" AND t.handle=?")
  129. vars = append(vars, *handle)
  130. }
  131. var (
  132. key = productKey(clause.String(), vars...)
  133. l = ttlcache.LoaderFunc[string, any](
  134. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  135. o := relation.Product{}
  136. rows, err := db.session.
  137. Select(productSelection(ln, defaultCurrency)...).
  138. From(fmt.Sprintf("%s as t", key.Table())).
  139. LeftJoin("product_variant", "product_variant.product_id = t.id").
  140. LeftJoin("inventory_item", "inventory_item.id = product_variant.inventory_item_id").
  141. LeftJoin("inventory_level", "inventory_level.inventory_item_id = product_variant.inventory_item_id").
  142. Where(key.Clause(), key.Args()...).
  143. GroupBy("t.id").
  144. Limit(1).
  145. Load(&o)
  146. if rows < 1 || err != nil {
  147. return nil
  148. }
  149. return ttl.Set(key.String(), o, key.TTL())
  150. })
  151. )
  152. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  153. if p == nil {
  154. return nil, fmt.Errorf("not found")
  155. }
  156. product := p.Value().(relation.Product)
  157. return product.As(), nil
  158. }
  159. func (db *clickhouse) ProductOptions(ln model.LanguageCode, id string) ([]*generated.ProductOption, error) {
  160. var (
  161. options []*generated.ProductOption
  162. key = productOptionKey("product_id=?", id)
  163. l = ttlcache.LoaderFunc[string, any](
  164. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  165. var o []relation.ProductOption
  166. rows, err := db.session.SelectBySql("SELECT "+
  167. strings.Join(productOptionSelection(ln), ", ")+
  168. " FROM `"+key.Table()+"`"+
  169. " ARRAY JOIN (SELECT `options` FROM `product` WHERE `id` = ?) AS oid"+
  170. " WHERE `id` = oid", key.Args()).
  171. Load(&o)
  172. if rows < 1 || err != nil {
  173. return nil
  174. }
  175. return ttl.Set(key.String(), o, key.TTL())
  176. },
  177. )
  178. )
  179. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  180. for _, v := range p.Value().([]relation.ProductOption) {
  181. options = append(options, v.As())
  182. }
  183. }
  184. return options, nil
  185. }
  186. func (db *clickhouse) ProductVariants(ctx *middleware.GShopifyContext, id string) ([]*generated.ProductVariant, error) {
  187. var (
  188. variants []*generated.ProductVariant
  189. key = productVariantKey("t.product_id=?", id)
  190. l = ttlcache.LoaderFunc[string, any](
  191. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  192. var o []relation.ProductVariant
  193. rows, err := db.session.
  194. Select(productVariantSelection(ctx.Language, defaultCurrency)...).
  195. From(fmt.Sprintf("%s as t", key.Table())).
  196. LeftJoin("product", "product.id = t.product_id").
  197. LeftJoin("inventory_item", "inventory_item.id = t.inventory_item_id").
  198. LeftJoin("inventory_level", "inventory_level.inventory_item_id = t.inventory_item_id").
  199. Where(key.Clause(), key.Args()...).
  200. OrderBy("position").
  201. GroupBy("t.id").
  202. Load(&o)
  203. if rows < 1 || err != nil {
  204. return nil
  205. }
  206. return ttl.Set(key.String(), o, key.TTL())
  207. },
  208. )
  209. )
  210. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  211. if p != nil {
  212. for _, v := range p.Value().([]relation.ProductVariant) {
  213. variants = append(variants, v.As())
  214. }
  215. }
  216. return variants, nil
  217. }
  218. func (db *clickhouse) CollectionProducts(ln model.LanguageCode, id string) ([]*generated.Product, error) {
  219. var (
  220. products []*generated.Product
  221. key = productKey("has(t.collections, ?) AND t.status = ?", id, model.ProductStatusActive)
  222. l = ttlcache.LoaderFunc[string, any](
  223. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  224. var o []relation.Product
  225. rows, err := db.session.
  226. Select(productSelection(ln, defaultCurrency)...).
  227. From(fmt.Sprintf("%s as t", key.Table())).
  228. LeftJoin("product_variant", "product_variant.product_id = t.id").
  229. LeftJoin("inventory_item", "inventory_item.id = product_variant.inventory_item_id").
  230. LeftJoin("inventory_level", "inventory_level.inventory_item_id = product_variant.inventory_item_id").
  231. Where(key.Clause(), key.Args()...).
  232. GroupBy("t.id").
  233. Load(&o)
  234. if rows < 1 || err != nil {
  235. return nil
  236. }
  237. return ttl.Set(key.String(), o, key.TTL())
  238. },
  239. )
  240. )
  241. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  242. if p == nil {
  243. return nil, fmt.Errorf("not found")
  244. }
  245. for _, row := range p.Value().([]relation.Product) {
  246. products = append(products, row.As())
  247. }
  248. return products, nil
  249. }
  250. func (db *clickhouse) ProductVariantOptions(ln model.LanguageCode, id string) ([]*generated.SelectedOption, error) {
  251. var (
  252. options []*generated.SelectedOption
  253. key = productOptionKey("product_variant.id = ?", id)
  254. l = ttlcache.LoaderFunc[string, any](
  255. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  256. var o []relation.ProductOption
  257. rows, err := db.session.SelectBySql("SELECT "+
  258. strings.Join(productOptionSelectedSelection(ln), ", ")+
  259. " FROM `"+key.Table()+"`"+
  260. " ARRAY JOIN (SELECT `options` from `product_variant` where `id` = ?) as opt"+
  261. " WHERE id = tupleElement(opt, 1)"+
  262. " ORDER BY position", key.Args()).
  263. Load(&o)
  264. if rows < 1 || err != nil {
  265. return nil
  266. }
  267. return ttl.Set(key.String(), o, key.TTL())
  268. },
  269. )
  270. )
  271. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  272. for _, option := range p.Value().([]relation.ProductOption) {
  273. options = append(options, option.AsSelected())
  274. }
  275. }
  276. return options, nil
  277. }
  278. func (db *clickhouse) Ping() error {
  279. return db.session.Ping()
  280. }
  281. func (db *clickhouse) Close() error {
  282. var wg sync.WaitGroup
  283. wg.Add(2)
  284. go func() {
  285. defer wg.Done()
  286. if db.cache != nil {
  287. db.cache.DeleteAll()
  288. db.cache.Stop()
  289. }
  290. }()
  291. go func() {
  292. defer wg.Done()
  293. if db.session != nil {
  294. _ = db.session.Close()
  295. }
  296. }()
  297. return nil
  298. }