package db import ( "context" "fmt" "github.com/gshopify/service-wrapper/config" "github.com/gshopify/service-wrapper/db" "github.com/gshopify/service-wrapper/model" "github.com/gshopify/service-wrapper/server/middleware" "github.com/jellydator/ttlcache/v3" "github.com/mailru/dbr" _ "github.com/mailru/go-clickhouse" "gshopper.com/gshopify/products/cache" "gshopper.com/gshopify/products/graphql/generated" "gshopper.com/gshopify/products/relation" "net/url" "strings" "sync" ) type clickhouse struct { ctx context.Context config *db.Config session *dbr.Session cache *ttlcache.Cache[string, any] } func New(ctx context.Context, forceDebug bool) (Database, error) { r := &clickhouse{ ctx: ctx, config: db.New(), cache: ttlcache.New[string, any]( ttlcache.WithTTL[string, any](cacheTimeout), ttlcache.WithCapacity[string, any](cacheCapacity), ), } if err := config.Instance().Load(ctx, r.config); err != nil { return nil, err } if forceDebug { r.config.Params.Debug = true } //goland:noinspection HttpUrlsUsage source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s", url.QueryEscape(r.config.Username), url.QueryEscape(r.config.Password), r.config.Host, r.config.Port, r.config.Database)) if err != nil { return nil, err } kv := make(url.Values) kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout)) kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout)) kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout)) kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug)) source.RawQuery = kv.Encode() con, err := dbr.Open("clickhouse", source.String(), nil) if err != nil { return nil, fmt.Errorf("could not establish Clickhouse session: %v", err) } r.session = con.NewSessionContext(ctx, nil) if err = r.session.Ping(); err != nil { return nil, err } go r.cache.Start() return r, nil } func (db *clickhouse) Collection(ln model.LanguageCode, handle *string, id *string) (*generated.Collection, error) { var ( clause = strings.Builder{} vars []any key *cache.SqlKey loader ttlcache.LoaderFunc[string, any] ) if id != nil { clause.WriteString("id=?") vars = append(vars, *id) } if handle != nil { if clause.Len() > 0 { clause.WriteString(" AND ") } clause.WriteString("handle=?") vars = append(vars, *handle) } key = productCollectionKey(ln, clause.String(), vars...) loader = func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o relation.ProductCollection rows, err := db.session. Select(key.Selection()...). From(key.Table()). Where(key.Clause(), key.Args()...). Limit(1). Load(&o) if rows != 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) } p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](loader)) if p == nil { return nil, fmt.Errorf("not found") } collection := p.Value().(relation.ProductCollection) return collection.As(), nil } func (db *clickhouse) Collections(ln model.LanguageCode) ([]*generated.Collection, error) { var ( collections []*generated.Collection key = productCollectionKey(ln, "list") l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.ProductCollection rows, err := db.session. Select(key.Selection()...). From(key.Table()). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }, ) ) if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil { for _, row := range p.Value().([]relation.ProductCollection) { collections = append(collections, row.As()) } } return collections, nil } func (db *clickhouse) ProductCollections(ln model.LanguageCode, id string) ([]*generated.Collection, error) { var ( collections []*generated.Collection key = productCollectionKey(ln, "product.id=?", id) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.ProductCollection rows, err := db.session.SelectBySql("SELECT "+ strings.Join(key.Selection(), ", ")+ " FROM `"+key.Table()+"`"+ " ARRAY JOIN (SELECT `collections` FROM `product` WHERE `id` = ?) AS cid"+ " WHERE `id` = cid", key.Args()...). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }) ) if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil { for _, row := range p.Value().([]relation.ProductCollection) { collections = append(collections, row.As()) } } return collections, nil } func (db *clickhouse) Product(ln model.LanguageCode, handle *string, id *string) (*generated.Product, error) { var ( clause = strings.Builder{} vars = []any{model.ProductStatusActive} ) clause.WriteString("t.status=?") if id != nil { clause.WriteString(" AND t.id=?") vars = append(vars, *id) } if handle != nil { clause.WriteString(" AND t.handle=?") vars = append(vars, *handle) } var ( key = productKey(ln, defaultCurrency, clause.String(), vars...) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { o := relation.Product{} rows, err := db.session. Select(key.Selection()...). From(fmt.Sprintf("%s as t", key.Table())). LeftJoin("product_variant", "product_variant.product_id = t.id"). LeftJoin("inventory_item", "inventory_item.id = product_variant.inventory_item_id"). LeftJoin("inventory_level", "inventory_level.inventory_item_id = product_variant.inventory_item_id"). Where(key.Clause(), key.Args()...). GroupBy("t.id"). Limit(1). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }) ) p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)) if p == nil { return nil, fmt.Errorf("not found") } product := p.Value().(relation.Product) return product.As(), nil } func (db *clickhouse) ProductOptions(ln model.LanguageCode, id string) ([]*generated.ProductOption, error) { var ( options []*generated.ProductOption key = productOptionKey(ln, "product_id=?", id) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.ProductOption rows, err := db.session.SelectBySql("SELECT tuple('', 1) as opt, "+ strings.Join(key.Selection(), ", ")+ " FROM `"+key.Table()+"`"+ " ARRAY JOIN (SELECT `options` FROM `product` WHERE `id` = ?) AS oid"+ " WHERE `id` = oid ORDER BY `position`;", key.Args()). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }, ) ) if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil { for _, v := range p.Value().([]relation.ProductOption) { options = append(options, v.As()) } } return options, nil } func (db *clickhouse) ProductVariants(ctx *middleware.GShopifyContext, id string) ([]*generated.ProductVariant, error) { var ( variants []*generated.ProductVariant key = productVariantKey(ctx.Language, defaultCurrency, "t.product_id=?", id) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.ProductVariant rows, err := db.session. Select(key.Selection()...). From(fmt.Sprintf("%s as t", key.Table())). LeftJoin("product", "product.id = t.product_id"). LeftJoin("inventory_item", "inventory_item.id = t.inventory_item_id"). LeftJoin("inventory_level", "inventory_level.inventory_item_id = t.inventory_item_id"). Where(key.Clause(), key.Args()...). OrderBy("position"). GroupBy("t.id"). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }, ) ) p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)) if p != nil { for _, v := range p.Value().([]relation.ProductVariant) { variants = append(variants, v.As()) } } return variants, nil } func (db *clickhouse) Products(ln model.LanguageCode, collectionId *string) ([]*generated.Product, error) { var ( clause = strings.Builder{} args []any ) clause.WriteString("t.status = ?") args = append(args, model.ProductStatusActive) if collectionId != nil { clause.WriteString(" AND has(t.collections, ?)") args = append(args, *collectionId) } var ( products []*generated.Product key = productKey(ln, defaultCurrency, clause.String(), args...) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.Product rows, err := db.session. Select(key.Selection()...). From(fmt.Sprintf("%s as t", key.Table())). LeftJoin("product_variant", "product_variant.product_id = t.id"). LeftJoin("inventory_item", "inventory_item.id = product_variant.inventory_item_id"). LeftJoin("inventory_level", "inventory_level.inventory_item_id = product_variant.inventory_item_id"). Where(key.Clause(), key.Args()...). GroupBy("t.id"). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }, ) ) if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil { for _, row := range p.Value().([]relation.Product) { products = append(products, row.As()) } } return products, nil } func (db *clickhouse) ProductVariantOptions(ln model.LanguageCode, id string) ([]*generated.SelectedOption, error) { var ( options []*generated.SelectedOption key = productOptionKey(ln, "product_variant.id = ?", id) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.ProductOption rows, err := db.session.SelectBySql("SELECT "+ strings.Join(key.Selection(), ", ")+ " FROM `"+key.Table()+"`"+ " ARRAY JOIN (SELECT anyLast(options) as options from `product_variant` where `id` = ? GROUP BY id) as opt"+ " WHERE id = tupleElement(opt, 1)"+ " ORDER BY position", key.Args()). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }, ) ) if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil { for _, option := range p.Value().([]relation.ProductOption) { options = append(options, option.AsSelected()) } } return options, nil } func (db *clickhouse) ProductTags() ([]*model.ProductTag, error) { var ( tags []*model.ProductTag key = productTagKey("list") l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []*model.ProductTag rows, err := db.session. Select(key.Selection()...). From(key.Table()). OrderBy("count DESC"). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }, ) ) if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil { for _, s := range p.Value().([]*model.ProductTag) { tags = append(tags, s) } } return tags, nil } func (db *clickhouse) ProductTypes() ([]*model.ProductType, error) { var ( types []*model.ProductType key = productTypeKey("list") l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []*model.ProductType rows, err := db.session. Select(key.Selection()...). From(key.Table()). OrderBy("count DESC"). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }, ) ) if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil { for _, s := range p.Value().([]*model.ProductType) { types = append(types, s) } } return types, nil } func (db *clickhouse) Ping() error { return db.session.Ping() } func (db *clickhouse) Close() error { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() if db.cache != nil { db.cache.DeleteAll() db.cache.Stop() } }() go func() { defer wg.Done() if db.session != nil { _ = db.session.Close() } }() return nil }