2
0

clickhouse.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package db
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gshopify/service-wrapper/config"
  6. "github.com/gshopify/service-wrapper/db"
  7. "github.com/gshopify/service-wrapper/model"
  8. "github.com/gshopify/service-wrapper/server/middleware"
  9. "github.com/jellydator/ttlcache/v3"
  10. "github.com/mailru/dbr"
  11. _ "github.com/mailru/go-clickhouse"
  12. "gshopper.com/gshopify/products/graphql/generated"
  13. "gshopper.com/gshopify/products/relation"
  14. "net/url"
  15. "strings"
  16. "sync"
  17. )
  18. type clickhouse struct {
  19. ctx context.Context
  20. config *db.Config
  21. session *dbr.Session
  22. cache *ttlcache.Cache[string, any]
  23. }
  24. func New(ctx context.Context, forceDebug bool) (Database, error) {
  25. r := &clickhouse{
  26. ctx: ctx,
  27. config: db.New(),
  28. cache: ttlcache.New[string, any](
  29. ttlcache.WithTTL[string, any](cacheTimeout),
  30. ttlcache.WithCapacity[string, any](cacheCapacity),
  31. ),
  32. }
  33. if err := config.Instance().Load(ctx, r.config); err != nil {
  34. return nil, err
  35. }
  36. if forceDebug {
  37. r.config.Params.Debug = true
  38. }
  39. //goland:noinspection HttpUrlsUsage
  40. source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s",
  41. url.QueryEscape(r.config.Username),
  42. url.QueryEscape(r.config.Password),
  43. r.config.Host,
  44. r.config.Port,
  45. r.config.Database))
  46. if err != nil {
  47. return nil, err
  48. }
  49. kv := make(url.Values)
  50. kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout))
  51. kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout))
  52. kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout))
  53. kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug))
  54. source.RawQuery = kv.Encode()
  55. con, err := dbr.Open("clickhouse", source.String(), nil)
  56. if err != nil {
  57. return nil, fmt.Errorf("could not establish Clickhouse session: %v", err)
  58. }
  59. r.session = con.NewSessionContext(ctx, nil)
  60. if err = r.session.Ping(); err != nil {
  61. return nil, err
  62. }
  63. go r.cache.Start()
  64. return r, nil
  65. }
  66. func (db *clickhouse) ProductCollections(ln model.LanguageCode, id string) ([]*generated.Collection, error) {
  67. var (
  68. collections []*generated.Collection
  69. key = productCollectionKey("product.id=?", id)
  70. l = ttlcache.LoaderFunc[string, any](
  71. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  72. var o []relation.Collection
  73. rows, err := db.session.SelectBySql("SELECT "+
  74. ln.SqlFieldSelection("title", "")+
  75. ", "+
  76. ln.SqlFieldSelection("description", "")+
  77. ", `id`, `handle`, `thumbnail`, "+
  78. "`created_at`, `updated_at`, `deleted_at` "+
  79. "FROM `product_collection` "+
  80. "ARRAY JOIN (SELECT `collection_ids` FROM `product` WHERE `id` = ?) AS cid "+
  81. "WHERE `id` = cid "+
  82. "ORDER BY `created_at` ASC;", key.Args()...).
  83. Load(&o)
  84. if rows < 1 || err != nil {
  85. return nil
  86. }
  87. return ttl.Set(key.String(), o, key.TTL())
  88. })
  89. )
  90. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  91. if p == nil {
  92. return nil, fmt.Errorf("not found")
  93. }
  94. for _, row := range p.Value().([]relation.Collection) {
  95. collections = append(collections, row.As())
  96. }
  97. return collections, nil
  98. }
  99. func (db *clickhouse) Product(ln model.LanguageCode, handle *string, id *string) (*generated.Product, error) {
  100. var (
  101. clause = strings.Builder{}
  102. vars = []any{relation.ProductStatusPublished}
  103. )
  104. clause.WriteString("status=?")
  105. if id != nil {
  106. clause.WriteString(" AND id=?")
  107. vars = append(vars, *id)
  108. }
  109. if handle != nil {
  110. clause.WriteString(" AND handle=?")
  111. vars = append(vars, *handle)
  112. }
  113. var (
  114. key = productKey(clause.String(), vars...)
  115. l = ttlcache.LoaderFunc[string, any](
  116. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  117. o := relation.Product{}
  118. rows, err := db.session.
  119. Select(productSelection(ln)...).
  120. From(key.Table()).
  121. Where(key.Clause(), key.Args()...).
  122. OrderBy("created_at").
  123. Limit(1).
  124. Load(&o)
  125. if rows < 1 || err != nil {
  126. return nil
  127. }
  128. return ttl.Set(key.String(), o, key.TTL())
  129. })
  130. )
  131. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  132. if p == nil {
  133. return nil, fmt.Errorf("not found")
  134. }
  135. product := p.Value().(relation.Product)
  136. return product.As(), nil
  137. }
  138. func (db *clickhouse) ProductOptions(ln model.LanguageCode, id string) ([]*generated.ProductOption, error) {
  139. var (
  140. options []*generated.ProductOption
  141. key = productOptionKey("product_id=?", id)
  142. l = ttlcache.LoaderFunc[string, any](
  143. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  144. var o []relation.ProductOption
  145. rows, err := db.session.
  146. Select(productOptionSelection(ln)...).
  147. From(key.Table()).
  148. Where(key.Clause(), key.Args()...).
  149. OrderBy("created_at").
  150. Load(&o)
  151. if rows < 1 || err != nil {
  152. return nil
  153. }
  154. return ttl.Set(key.String(), o, key.TTL())
  155. },
  156. )
  157. )
  158. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  159. if p == nil {
  160. return nil, fmt.Errorf("not found")
  161. }
  162. for _, v := range p.Value().([]relation.ProductOption) {
  163. options = append(options, v.As())
  164. }
  165. return options, nil
  166. }
  167. func (db *clickhouse) ProductVariants(ctx *middleware.GShopifyContext, id string) ([]*generated.ProductVariant, error) {
  168. var (
  169. variants []*generated.ProductVariant
  170. key = productVariantKey("product_id=?", id)
  171. l = ttlcache.LoaderFunc[string, any](
  172. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  173. var o []relation.ProductVariant
  174. rows, err := db.session.
  175. Select(
  176. "t.id as id", "t.product_id as product_id", "t.options as options",
  177. fmt.Sprintf("t.price['%s'] as price", defaultCurrency),
  178. fmt.Sprintf("t.unit_price['%s'] as unit_price", defaultCurrency),
  179. fmt.Sprintf("t.compare_at_price['%s'] as compare_at_price", defaultCurrency),
  180. "t.barcode as barcode", "t.sku as sku", "t.hs_code as hs_code", "t.origin_country as origin_country",
  181. "t.allow_backorder as allow_backorder", "t.manage_inventory as manage_inventory", "t.unit_price_measurement as unit_price_measurement",
  182. "t.weight as weight", "t.wight_unit as weight_unit",
  183. "t.created_at as created_at", "t.updated_at as updated_at", "t.deleted_at as deleted_at",
  184. "shopping_profile.type as type",
  185. ctx.Language.SqlFieldSelection("title", "t"),
  186. ).
  187. From(fmt.Sprintf("%s as t", key.Table())).
  188. Where(key.Clause(), key.Args()...).
  189. LeftJoin("product", "product.id = t.product_id").
  190. LeftJoin("shopping_profile", "product.profile_id = shopping_profile.id").
  191. OrderBy("t.created_at").
  192. Load(&o)
  193. if rows < 1 || err != nil {
  194. return nil
  195. }
  196. return ttl.Set(key.String(), o, key.TTL())
  197. },
  198. )
  199. )
  200. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  201. if p != nil {
  202. for _, v := range p.Value().([]relation.ProductVariant) {
  203. variants = append(variants, v.As())
  204. }
  205. }
  206. return variants, nil
  207. }
  208. func (db *clickhouse) CollectionProducts(ln model.LanguageCode, id string) ([]*generated.Product, error) {
  209. var (
  210. products []*generated.Product
  211. key = productKey("has(collection_ids, ?)", id)
  212. l = ttlcache.LoaderFunc[string, any](
  213. func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
  214. var o []relation.Product
  215. rows, err := db.session.
  216. Select(productSelection(ln)...).
  217. From(key.Table()).
  218. Where(key.Clause(), key.Args()...).
  219. OrderBy("created_at").
  220. Load(&o)
  221. if rows < 1 || err != nil {
  222. return nil
  223. }
  224. return ttl.Set(key.String(), o, key.TTL())
  225. },
  226. )
  227. )
  228. p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
  229. if p == nil {
  230. return nil, fmt.Errorf("not found")
  231. }
  232. for _, row := range p.Value().([]relation.Product) {
  233. products = append(products, row.As())
  234. }
  235. return products, nil
  236. }
  237. func (db *clickhouse) Ping() error {
  238. return db.session.Ping()
  239. }
  240. func (db *clickhouse) Close() error {
  241. var wg sync.WaitGroup
  242. wg.Add(2)
  243. go func() {
  244. defer wg.Done()
  245. if db.cache != nil {
  246. db.cache.DeleteAll()
  247. db.cache.Stop()
  248. }
  249. }()
  250. go func() {
  251. defer wg.Done()
  252. if db.session != nil {
  253. _ = db.session.Close()
  254. }
  255. }()
  256. return nil
  257. }