123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467 |
- package db
- import (
- "context"
- "fmt"
- "github.com/gshopify/service-wrapper/config"
- "github.com/gshopify/service-wrapper/db"
- "github.com/gshopify/service-wrapper/model"
- "github.com/gshopify/service-wrapper/server/middleware"
- "github.com/jellydator/ttlcache/v3"
- "github.com/mailru/dbr"
- _ "github.com/mailru/go-clickhouse"
- "gshopper.com/gshopify/products/cache"
- "gshopper.com/gshopify/products/graphql/generated"
- "gshopper.com/gshopify/products/relation"
- "net/url"
- "strings"
- "sync"
- )
- type clickhouse struct {
- ctx context.Context
- config *db.Config
- session *dbr.Session
- cache *ttlcache.Cache[string, any]
- }
- func New(ctx context.Context, forceDebug bool) (Database, error) {
- r := &clickhouse{
- ctx: ctx,
- config: db.New(),
- cache: ttlcache.New[string, any](
- ttlcache.WithTTL[string, any](cacheTimeout),
- ttlcache.WithCapacity[string, any](cacheCapacity),
- ),
- }
- if err := config.Instance().Load(ctx, r.config); err != nil {
- return nil, err
- }
- if forceDebug {
- r.config.Params.Debug = true
- }
- //goland:noinspection HttpUrlsUsage
- source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s",
- url.QueryEscape(r.config.Username),
- url.QueryEscape(r.config.Password),
- r.config.Host,
- r.config.Port,
- r.config.Database))
- if err != nil {
- return nil, err
- }
- kv := make(url.Values)
- kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout))
- kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout))
- kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout))
- kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug))
- source.RawQuery = kv.Encode()
- con, err := dbr.Open("clickhouse", source.String(), nil)
- if err != nil {
- return nil, fmt.Errorf("could not establish Clickhouse session: %v", err)
- }
- r.session = con.NewSessionContext(ctx, nil)
- if err = r.session.Ping(); err != nil {
- return nil, err
- }
- go r.cache.Start()
- return r, nil
- }
- func (db *clickhouse) Collection(ln model.LanguageCode, handle *string, id *string) (*generated.Collection, error) {
- var (
- clause = strings.Builder{}
- vars []any
- key *cache.SqlKey
- loader ttlcache.LoaderFunc[string, any]
- )
- if id != nil {
- clause.WriteString("id=?")
- vars = append(vars, *id)
- }
- if handle != nil {
- if clause.Len() > 0 {
- clause.WriteString(" AND ")
- }
- clause.WriteString("handle=?")
- vars = append(vars, *handle)
- }
- key = productCollectionKey(ln, clause.String(), vars...)
- loader = func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
- var o relation.ProductCollection
- rows, err := db.session.
- Select(key.Selection()...).
- From(key.Table()).
- Where(key.Clause(), key.Args()...).
- Limit(1).
- Load(&o)
- if rows != 1 || err != nil {
- return nil
- }
- return ttl.Set(key.String(), o, key.TTL())
- }
- p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](loader))
- if p == nil {
- return nil, fmt.Errorf("not found")
- }
- collection := p.Value().(relation.ProductCollection)
- return collection.As(), nil
- }
- func (db *clickhouse) Collections(ln model.LanguageCode) ([]*generated.Collection, error) {
- var (
- collections []*generated.Collection
- key = productCollectionKey(ln, "list")
- l = ttlcache.LoaderFunc[string, any](
- func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
- var o []relation.ProductCollection
- rows, err := db.session.
- Select(key.Selection()...).
- From(key.Table()).
- Load(&o)
- if rows < 1 || err != nil {
- return nil
- }
- return ttl.Set(key.String(), o, key.TTL())
- },
- )
- )
- if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
- for _, row := range p.Value().([]relation.ProductCollection) {
- collections = append(collections, row.As())
- }
- }
- return collections, nil
- }
- func (db *clickhouse) ProductCollections(ln model.LanguageCode, id string) ([]*generated.Collection, error) {
- var (
- collections []*generated.Collection
- key = productCollectionKey(ln, "product.id=?", id)
- l = ttlcache.LoaderFunc[string, any](
- func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
- var o []relation.ProductCollection
- rows, err := db.session.SelectBySql("SELECT "+
- strings.Join(key.Selection(), ", ")+
- " FROM `"+key.Table()+"`"+
- " ARRAY JOIN (SELECT `collections` FROM `product` WHERE `id` = ?) AS cid"+
- " WHERE `id` = cid", key.Args()...).
- Load(&o)
- if rows < 1 || err != nil {
- return nil
- }
- return ttl.Set(key.String(), o, key.TTL())
- })
- )
- if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
- for _, row := range p.Value().([]relation.ProductCollection) {
- collections = append(collections, row.As())
- }
- }
- return collections, nil
- }
- func (db *clickhouse) Product(ln model.LanguageCode, handle *string, id *string) (*generated.Product, error) {
- var (
- clause = strings.Builder{}
- vars = []any{model.ProductStatusActive}
- )
- clause.WriteString("t.status=?")
- if id != nil {
- clause.WriteString(" AND t.id=?")
- vars = append(vars, *id)
- }
- if handle != nil {
- clause.WriteString(" AND t.handle=?")
- vars = append(vars, *handle)
- }
- var (
- key = productKey(ln, defaultCurrency, clause.String(), vars...)
- l = ttlcache.LoaderFunc[string, any](
- func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
- o := relation.Product{}
- rows, err := db.session.
- Select(key.Selection()...).
- From(fmt.Sprintf("%s as t", key.Table())).
- LeftJoin("product_variant", "product_variant.product_id = t.id").
- LeftJoin("inventory_item", "inventory_item.id = product_variant.inventory_item_id").
- LeftJoin("inventory_level", "inventory_level.inventory_item_id = product_variant.inventory_item_id").
- Where(key.Clause(), key.Args()...).
- GroupBy("t.id").
- Limit(1).
- Load(&o)
- if rows < 1 || err != nil {
- return nil
- }
- return ttl.Set(key.String(), o, key.TTL())
- })
- )
- p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
- if p == nil {
- return nil, fmt.Errorf("not found")
- }
- product := p.Value().(relation.Product)
- return product.As(), nil
- }
- func (db *clickhouse) ProductOptions(ln model.LanguageCode, id string) ([]*generated.ProductOption, error) {
- var (
- options []*generated.ProductOption
- key = productOptionKey(ln, "product_id=?", id)
- l = ttlcache.LoaderFunc[string, any](
- func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
- var o []relation.ProductOption
- rows, err := db.session.SelectBySql("SELECT tuple('', 1) as opt, "+
- strings.Join(key.Selection(), ", ")+
- " FROM `"+key.Table()+"`"+
- " ARRAY JOIN (SELECT `options` FROM `product` WHERE `id` = ?) AS oid"+
- " WHERE `id` = oid ORDER BY `position`;", key.Args()).
- Load(&o)
- if rows < 1 || err != nil {
- return nil
- }
- return ttl.Set(key.String(), o, key.TTL())
- },
- )
- )
- if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
- for _, v := range p.Value().([]relation.ProductOption) {
- options = append(options, v.As())
- }
- }
- return options, nil
- }
- func (db *clickhouse) ProductVariants(ctx *middleware.GShopifyContext, id string) ([]*generated.ProductVariant, error) {
- var (
- variants []*generated.ProductVariant
- key = productVariantKey(ctx.Language, defaultCurrency, "t.product_id=?", id)
- l = ttlcache.LoaderFunc[string, any](
- func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
- var o []relation.ProductVariant
- rows, err := db.session.
- Select(key.Selection()...).
- From(fmt.Sprintf("%s as t", key.Table())).
- LeftJoin("product", "product.id = t.product_id").
- LeftJoin("inventory_item", "inventory_item.id = t.inventory_item_id").
- LeftJoin("inventory_level", "inventory_level.inventory_item_id = t.inventory_item_id").
- Where(key.Clause(), key.Args()...).
- OrderBy("position").
- GroupBy("t.id").
- Load(&o)
- if rows < 1 || err != nil {
- return nil
- }
- return ttl.Set(key.String(), o, key.TTL())
- },
- )
- )
- p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
- if p != nil {
- for _, v := range p.Value().([]relation.ProductVariant) {
- variants = append(variants, v.As())
- }
- }
- return variants, nil
- }
- func (db *clickhouse) Products(ln model.LanguageCode, collectionId *string) ([]*generated.Product, error) {
- var (
- clause = strings.Builder{}
- args []any
- )
- clause.WriteString("t.status = ?")
- args = append(args, model.ProductStatusActive)
- if collectionId != nil {
- clause.WriteString(" AND has(t.collections, ?)")
- args = append(args, *collectionId)
- }
- var (
- products []*generated.Product
- key = productKey(ln, defaultCurrency, clause.String(), args...)
- l = ttlcache.LoaderFunc[string, any](
- func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
- var o []relation.Product
- rows, err := db.session.
- Select(key.Selection()...).
- From(fmt.Sprintf("%s as t", key.Table())).
- LeftJoin("product_variant", "product_variant.product_id = t.id").
- LeftJoin("inventory_item", "inventory_item.id = product_variant.inventory_item_id").
- LeftJoin("inventory_level", "inventory_level.inventory_item_id = product_variant.inventory_item_id").
- Where(key.Clause(), key.Args()...).
- GroupBy("t.id").
- Load(&o)
- if rows < 1 || err != nil {
- return nil
- }
- return ttl.Set(key.String(), o, key.TTL())
- },
- )
- )
- if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
- for _, row := range p.Value().([]relation.Product) {
- products = append(products, row.As())
- }
- }
- return products, nil
- }
- func (db *clickhouse) ProductVariantOptions(ln model.LanguageCode, id string) ([]*generated.SelectedOption, error) {
- var (
- options []*generated.SelectedOption
- key = productOptionKey(ln, "product_variant.id = ?", id)
- l = ttlcache.LoaderFunc[string, any](
- func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
- var o []relation.ProductOption
- rows, err := db.session.SelectBySql("SELECT "+
- strings.Join(key.Selection(), ", ")+
- " FROM `"+key.Table()+"`"+
- " ARRAY JOIN (SELECT anyLast(options) as options from `product_variant` where `id` = ? GROUP BY id) as opt"+
- " WHERE id = tupleElement(opt, 1)"+
- " ORDER BY position", key.Args()).
- Load(&o)
- if rows < 1 || err != nil {
- return nil
- }
- return ttl.Set(key.String(), o, key.TTL())
- },
- )
- )
- if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
- for _, option := range p.Value().([]relation.ProductOption) {
- options = append(options, option.AsSelected())
- }
- }
- return options, nil
- }
- func (db *clickhouse) ProductTags() ([]*model.ProductTag, error) {
- var (
- tags []*model.ProductTag
- key = productTagKey("list")
- l = ttlcache.LoaderFunc[string, any](
- func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
- var o []*model.ProductTag
- rows, err := db.session.
- Select(key.Selection()...).
- From(key.Table()).
- OrderBy("count DESC").
- Load(&o)
- if rows < 1 || err != nil {
- return nil
- }
- return ttl.Set(key.String(), o, key.TTL())
- },
- )
- )
- if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
- for _, s := range p.Value().([]*model.ProductTag) {
- tags = append(tags, s)
- }
- }
- return tags, nil
- }
- func (db *clickhouse) ProductTypes() ([]*model.ProductType, error) {
- var (
- types []*model.ProductType
- key = productTypeKey("list")
- l = ttlcache.LoaderFunc[string, any](
- func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
- var o []*model.ProductType
- rows, err := db.session.
- Select(key.Selection()...).
- From(key.Table()).
- OrderBy("count DESC").
- Load(&o)
- if rows < 1 || err != nil {
- return nil
- }
- return ttl.Set(key.String(), o, key.TTL())
- },
- )
- )
- if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil {
- for _, s := range p.Value().([]*model.ProductType) {
- types = append(types, s)
- }
- }
- return types, nil
- }
- func (db *clickhouse) Ping() error {
- return db.session.Ping()
- }
- func (db *clickhouse) Close() error {
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- defer wg.Done()
- if db.cache != nil {
- db.cache.DeleteAll()
- db.cache.Stop()
- }
- }()
- go func() {
- defer wg.Done()
- if db.session != nil {
- _ = db.session.Close()
- }
- }()
- return nil
- }
|