clickhouse.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. package db
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gshopify/service-wrapper/config"
  6. "github.com/gshopify/service-wrapper/db"
  7. "github.com/gshopify/service-wrapper/model"
  8. "github.com/gshopify/service-wrapper/server/middleware"
  9. "github.com/jellydator/ttlcache/v3"
  10. "github.com/mailru/dbr"
  11. _ "github.com/mailru/go-clickhouse"
  12. "gshopper.com/gshopify/products/cache"
  13. "gshopper.com/gshopify/products/graphql/generated"
  14. "gshopper.com/gshopify/products/relation"
  15. "net/url"
  16. "strings"
  17. "sync"
  18. )
  19. type clickhouse struct {
  20. ctx context.Context
  21. config *db.Config
  22. session *dbr.Session
  23. cache *ttlcache.Cache[string, any]
  24. }
  25. func New(ctx context.Context, forceDebug bool) (Database, error) {
  26. r := &clickhouse{
  27. ctx: ctx,
  28. config: db.New(),
  29. cache: ttlcache.New[string, any](
  30. ttlcache.WithTTL[string, any](cacheTimeout),
  31. ttlcache.WithCapacity[string, any](cacheCapacity),
  32. ),
  33. }
  34. if err := config.Instance().Load(ctx, r.config); err != nil {
  35. return nil, err
  36. }
  37. if forceDebug {
  38. r.config.Params.Debug = true
  39. }
  40. //goland:noinspection HttpUrlsUsage
  41. source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s",
  42. url.QueryEscape(r.config.Username),
  43. url.QueryEscape(r.config.Password),
  44. r.config.Host,
  45. r.config.Port,
  46. r.config.Database))
  47. if err != nil {
  48. return nil, err
  49. }
  50. kv := make(url.Values)
  51. kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout))
  52. kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout))
  53. kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout))
  54. kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug))
  55. source.RawQuery = kv.Encode()
  56. con, err := dbr.Open("clickhouse", source.String(), nil)
  57. if err != nil {
  58. return nil, fmt.Errorf("could not establish Clickhouse session: %v", err)
  59. }
  60. r.session = con.NewSessionContext(ctx, nil)
  61. if err = r.session.Ping(); err != nil {
  62. return nil, err
  63. }
  64. go r.cache.Start()
  65. return r, nil
  66. }
  67. func (db *clickhouse) Collection(ln model.LanguageCode, handle *string, id *string) (*generated.Collection, error) {
  68. var (
  69. clause = strings.Builder{}
  70. vars []any
  71. key *cache.SqlKey
  72. loader ttlcache.LoaderFunc[string, any]
  73. )
  74. if id != nil {
  75. clause.WriteString("id=?")
  76. vars = append(vars, *id)
  77. }
  78. if handle != nil {
  79. if clause.Len() > 0 {
  80. clause.WriteString(" AND ")
  81. }
  82. clause.WriteString("handle=?")
  83. vars = append(vars, *handle)
  84. }
  85. key = productCollectionKey(ln, clause.String(), vars...)
  86. loader = func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  87. var o relation.ProductCollection
  88. rows, err := db.session.
  89. Select(key.Selection()...).
  90. From(key.Table()).
  91. Where(key.Clause(), key.Args()...).
  92. Limit(1).
  93. Load(&o)
  94. if rows != 1 || err != nil {
  95. return nil
  96. }
  97. return ttl.Set(key.String(), o, key.TTL())
  98. }
  99. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](loader))
  100. if p == nil {
  101. return nil, fmt.Errorf("not found")
  102. }
  103. collection := p.Value().(relation.ProductCollection)
  104. return collection.As(), nil
  105. }
  106. func (db *clickhouse) Collections(ln model.LanguageCode) ([]*generated.Collection, error) {
  107. var (
  108. collections []*generated.Collection
  109. key = productCollectionKey(ln, "list")
  110. l = ttlcache.LoaderFunc[string, any](
  111. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  112. var o []relation.ProductCollection
  113. rows, err := db.session.
  114. Select(key.Selection()...).
  115. From(key.Table()).
  116. Load(&o)
  117. if rows < 1 || err != nil {
  118. return nil
  119. }
  120. return ttl.Set(key.String(), o, key.TTL())
  121. },
  122. )
  123. )
  124. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  125. for _, row := range p.Value().([]relation.ProductCollection) {
  126. collections = append(collections, row.As())
  127. }
  128. }
  129. return collections, nil
  130. }
  131. func (db *clickhouse) ProductCollections(ln model.LanguageCode, id string) ([]*generated.Collection, error) {
  132. var (
  133. collections []*generated.Collection
  134. key = productCollectionKey(ln, "product.id=?", id)
  135. l = ttlcache.LoaderFunc[string, any](
  136. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  137. var o []relation.ProductCollection
  138. rows, err := db.session.SelectBySql("SELECT "+
  139. strings.Join(key.Selection(), ", ")+
  140. " FROM `"+key.Table()+"`"+
  141. " ARRAY JOIN (SELECT `collections` FROM `product` WHERE `id` = ?) AS cid"+
  142. " WHERE `id` = cid", key.Args()...).
  143. Load(&o)
  144. if rows < 1 || err != nil {
  145. return nil
  146. }
  147. return ttl.Set(key.String(), o, key.TTL())
  148. })
  149. )
  150. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  151. for _, row := range p.Value().([]relation.ProductCollection) {
  152. collections = append(collections, row.As())
  153. }
  154. }
  155. return collections, nil
  156. }
  157. func (db *clickhouse) Product(ln model.LanguageCode, handle *string, id *string) (*generated.Product, error) {
  158. var (
  159. clause = strings.Builder{}
  160. vars = []any{model.ProductStatusActive}
  161. )
  162. clause.WriteString("t.status=?")
  163. if id != nil {
  164. clause.WriteString(" AND t.id=?")
  165. vars = append(vars, *id)
  166. }
  167. if handle != nil {
  168. clause.WriteString(" AND t.handle=?")
  169. vars = append(vars, *handle)
  170. }
  171. var (
  172. key = productKey(ln, defaultCurrency, clause.String(), vars...)
  173. l = ttlcache.LoaderFunc[string, any](
  174. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  175. o := relation.Product{}
  176. rows, err := db.session.
  177. Select(key.Selection()...).
  178. From(fmt.Sprintf("%s as t", key.Table())).
  179. LeftJoin("product_variant", "product_variant.product_id = t.id").
  180. LeftJoin("inventory_item", "inventory_item.id = product_variant.inventory_item_id").
  181. LeftJoin("inventory_level", "inventory_level.inventory_item_id = product_variant.inventory_item_id").
  182. Where(key.Clause(), key.Args()...).
  183. GroupBy("t.id").
  184. Limit(1).
  185. Load(&o)
  186. if rows < 1 || err != nil {
  187. return nil
  188. }
  189. return ttl.Set(key.String(), o, key.TTL())
  190. })
  191. )
  192. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  193. if p == nil {
  194. return nil, fmt.Errorf("not found")
  195. }
  196. product := p.Value().(relation.Product)
  197. return product.As(), nil
  198. }
  199. func (db *clickhouse) ProductOptions(ln model.LanguageCode, id string) ([]*generated.ProductOption, error) {
  200. var (
  201. options []*generated.ProductOption
  202. key = productOptionKey(ln, "product_id=?", id)
  203. l = ttlcache.LoaderFunc[string, any](
  204. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  205. var o []relation.ProductOption
  206. rows, err := db.session.SelectBySql("SELECT tuple('', 1) as opt, "+
  207. strings.Join(key.Selection(), ", ")+
  208. " FROM `"+key.Table()+"`"+
  209. " ARRAY JOIN (SELECT `options` FROM `product` WHERE `id` = ?) AS oid"+
  210. " WHERE `id` = oid ORDER BY `position`;", key.Args()).
  211. Load(&o)
  212. if rows < 1 || err != nil {
  213. return nil
  214. }
  215. return ttl.Set(key.String(), o, key.TTL())
  216. },
  217. )
  218. )
  219. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  220. for _, v := range p.Value().([]relation.ProductOption) {
  221. options = append(options, v.As())
  222. }
  223. }
  224. return options, nil
  225. }
  226. func (db *clickhouse) ProductVariants(ctx *middleware.GShopifyContext, id string) ([]*generated.ProductVariant, error) {
  227. var (
  228. variants []*generated.ProductVariant
  229. key = productVariantKey(ctx.Language, defaultCurrency, "t.product_id=?", id)
  230. l = ttlcache.LoaderFunc[string, any](
  231. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  232. var o []relation.ProductVariant
  233. rows, err := db.session.
  234. Select(key.Selection()...).
  235. From(fmt.Sprintf("%s as t", key.Table())).
  236. LeftJoin("product", "product.id = t.product_id").
  237. LeftJoin("inventory_item", "inventory_item.id = t.inventory_item_id").
  238. LeftJoin("inventory_level", "inventory_level.inventory_item_id = t.inventory_item_id").
  239. Where(key.Clause(), key.Args()...).
  240. OrderBy("position").
  241. GroupBy("t.id").
  242. Load(&o)
  243. if rows < 1 || err != nil {
  244. return nil
  245. }
  246. return ttl.Set(key.String(), o, key.TTL())
  247. },
  248. )
  249. )
  250. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  251. if p != nil {
  252. for _, v := range p.Value().([]relation.ProductVariant) {
  253. variants = append(variants, v.As())
  254. }
  255. }
  256. return variants, nil
  257. }
  258. func (db *clickhouse) Products(ln model.LanguageCode, collectionId *string) ([]*generated.Product, error) {
  259. var (
  260. clause = strings.Builder{}
  261. args []any
  262. )
  263. clause.WriteString("t.status = ?")
  264. args = append(args, model.ProductStatusActive)
  265. if collectionId != nil {
  266. clause.WriteString(" AND has(t.collections, ?)")
  267. args = append(args, *collectionId)
  268. }
  269. var (
  270. products []*generated.Product
  271. key = productKey(ln, defaultCurrency, clause.String(), args...)
  272. l = ttlcache.LoaderFunc[string, any](
  273. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  274. var o []relation.Product
  275. rows, err := db.session.
  276. Select(key.Selection()...).
  277. From(fmt.Sprintf("%s as t", key.Table())).
  278. LeftJoin("product_variant", "product_variant.product_id = t.id").
  279. LeftJoin("inventory_item", "inventory_item.id = product_variant.inventory_item_id").
  280. LeftJoin("inventory_level", "inventory_level.inventory_item_id = product_variant.inventory_item_id").
  281. Where(key.Clause(), key.Args()...).
  282. GroupBy("t.id").
  283. Load(&o)
  284. if rows < 1 || err != nil {
  285. return nil
  286. }
  287. return ttl.Set(key.String(), o, key.TTL())
  288. },
  289. )
  290. )
  291. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  292. for _, row := range p.Value().([]relation.Product) {
  293. products = append(products, row.As())
  294. }
  295. }
  296. return products, nil
  297. }
  298. func (db *clickhouse) ProductVariantOptions(ln model.LanguageCode, id string) ([]*generated.SelectedOption, error) {
  299. var (
  300. options []*generated.SelectedOption
  301. key = productOptionKey(ln, "product_variant.id = ?", id)
  302. l = ttlcache.LoaderFunc[string, any](
  303. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  304. var o []relation.ProductOption
  305. rows, err := db.session.SelectBySql("SELECT "+
  306. strings.Join(key.Selection(), ", ")+
  307. " FROM `"+key.Table()+"`"+
  308. " ARRAY JOIN (SELECT anyLast(options) as options from `product_variant` where `id` = ? GROUP BY id) as opt"+
  309. " WHERE id = tupleElement(opt, 1)"+
  310. " ORDER BY position", key.Args()).
  311. Load(&o)
  312. if rows < 1 || err != nil {
  313. return nil
  314. }
  315. return ttl.Set(key.String(), o, key.TTL())
  316. },
  317. )
  318. )
  319. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  320. for _, option := range p.Value().([]relation.ProductOption) {
  321. options = append(options, option.AsSelected())
  322. }
  323. }
  324. return options, nil
  325. }
  326. func (db *clickhouse) ProductTags() ([]*model.ProductTag, error) {
  327. var (
  328. tags []*model.ProductTag
  329. key = productTagKey("list")
  330. l = ttlcache.LoaderFunc[string, any](
  331. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  332. var o []*model.ProductTag
  333. rows, err := db.session.
  334. Select(key.Selection()...).
  335. From(key.Table()).
  336. OrderBy("count DESC").
  337. Load(&o)
  338. if rows < 1 || err != nil {
  339. return nil
  340. }
  341. return ttl.Set(key.String(), o, key.TTL())
  342. },
  343. )
  344. )
  345. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  346. for _, s := range p.Value().([]*model.ProductTag) {
  347. tags = append(tags, s)
  348. }
  349. }
  350. return tags, nil
  351. }
  352. func (db *clickhouse) ProductTypes() ([]*model.ProductType, error) {
  353. var (
  354. types []*model.ProductType
  355. key = productTypeKey("list")
  356. l = ttlcache.LoaderFunc[string, any](
  357. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  358. var o []*model.ProductType
  359. rows, err := db.session.
  360. Select(key.Selection()...).
  361. From(key.Table()).
  362. OrderBy("count DESC").
  363. Load(&o)
  364. if rows < 1 || err != nil {
  365. return nil
  366. }
  367. return ttl.Set(key.String(), o, key.TTL())
  368. },
  369. )
  370. )
  371. if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
  372. for _, s := range p.Value().([]*model.ProductType) {
  373. types = append(types, s)
  374. }
  375. }
  376. return types, nil
  377. }
  378. func (db *clickhouse) Ping() error {
  379. return db.session.Ping()
  380. }
  381. func (db *clickhouse) Close() error {
  382. var wg sync.WaitGroup
  383. wg.Add(2)
  384. go func() {
  385. defer wg.Done()
  386. if db.cache != nil {
  387. db.cache.DeleteAll()
  388. db.cache.Stop()
  389. }
  390. }()
  391. go func() {
  392. defer wg.Done()
  393. if db.session != nil {
  394. _ = db.session.Close()
  395. }
  396. }()
  397. return nil
  398. }