package db import ( "context" "fmt" "github.com/gshopify/service-wrapper/config" "github.com/gshopify/service-wrapper/db" "github.com/gshopify/service-wrapper/model" "github.com/gshopify/service-wrapper/server/middleware" "github.com/jellydator/ttlcache/v3" "github.com/mailru/dbr" _ "github.com/mailru/go-clickhouse" "gshopper.com/gshopify/products/graphql/generated" "gshopper.com/gshopify/products/relation" "net/url" "strings" "sync" ) type clickhouse struct { ctx context.Context config *db.Config session *dbr.Session cache *ttlcache.Cache[string, any] } func New(ctx context.Context, forceDebug bool) (Database, error) { r := &clickhouse{ ctx: ctx, config: db.New(), cache: ttlcache.New[string, any]( ttlcache.WithTTL[string, any](cacheTimeout), ttlcache.WithCapacity[string, any](cacheCapacity), ), } if err := config.Instance().Load(ctx, r.config); err != nil { return nil, err } if forceDebug { r.config.Params.Debug = true } //goland:noinspection HttpUrlsUsage source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s", url.QueryEscape(r.config.Username), url.QueryEscape(r.config.Password), r.config.Host, r.config.Port, r.config.Database)) if err != nil { return nil, err } kv := make(url.Values) kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout)) kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout)) kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout)) kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug)) source.RawQuery = kv.Encode() con, err := dbr.Open("clickhouse", source.String(), nil) if err != nil { return nil, fmt.Errorf("could not establish Clickhouse session: %v", err) } r.session = con.NewSessionContext(ctx, nil) if err = r.session.Ping(); err != nil { return nil, err } go r.cache.Start() return r, nil } func (db *clickhouse) ProductCollections(ln model.LanguageCode, id string) ([]*generated.Collection, error) { var ( collections []*generated.Collection key = productCollectionKey("product.id=?", id) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.ProductCollection rows, err := db.session.SelectBySql("SELECT "+ strings.Join(productCollectionSelection(ln), ", ")+ " FROM `"+key.Table()+"`"+ " ARRAY JOIN (SELECT `collections` FROM `product` WHERE `id` = ?) AS cid"+ " WHERE `id` = cid", key.Args()...). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }) ) if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil { for _, row := range p.Value().([]relation.ProductCollection) { collections = append(collections, row.As()) } } return collections, nil } func (db *clickhouse) Product(ln model.LanguageCode, handle *string, id *string) (*generated.Product, error) { var ( clause = strings.Builder{} vars = []any{model.ProductStatusActive} ) clause.WriteString("t.status=?") if id != nil { clause.WriteString(" AND t.id=?") vars = append(vars, *id) } if handle != nil { clause.WriteString(" AND t.handle=?") vars = append(vars, *handle) } var ( key = productKey(clause.String(), vars...) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { o := relation.Product{} rows, err := db.session. Select(productSelection(ln, defaultCurrency)...). From(fmt.Sprintf("%s as t", key.Table())). LeftJoin("product_variant", "product_variant.product_id = t.id"). LeftJoin("inventory_item", "inventory_item.id = product_variant.inventory_item_id"). LeftJoin("inventory_level", "inventory_level.inventory_item_id = product_variant.inventory_item_id"). Where(key.Clause(), key.Args()...). GroupBy("t.id"). Limit(1). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }) ) p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)) if p == nil { return nil, fmt.Errorf("not found") } product := p.Value().(relation.Product) return product.As(), nil } func (db *clickhouse) ProductOptions(ln model.LanguageCode, id string) ([]*generated.ProductOption, error) { var ( options []*generated.ProductOption key = productOptionKey("product_id=?", id) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.ProductOption rows, err := db.session.SelectBySql("SELECT "+ strings.Join(productOptionSelection(ln), ", ")+ " FROM `"+key.Table()+"`"+ " ARRAY JOIN (SELECT `options` FROM `product` WHERE `id` = ?) AS oid"+ " WHERE `id` = oid", key.Args()). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }, ) ) if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil { for _, v := range p.Value().([]relation.ProductOption) { options = append(options, v.As()) } } return options, nil } func (db *clickhouse) ProductVariants(ctx *middleware.GShopifyContext, id string) ([]*generated.ProductVariant, error) { var ( variants []*generated.ProductVariant key = productVariantKey("t.product_id=?", id) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.ProductVariant rows, err := db.session. Select(productVariantSelection(ctx.Language, defaultCurrency)...). From(fmt.Sprintf("%s as t", key.Table())). LeftJoin("product", "product.id = t.product_id"). LeftJoin("inventory_item", "inventory_item.id = t.inventory_item_id"). LeftJoin("inventory_level", "inventory_level.inventory_item_id = t.inventory_item_id"). Where(key.Clause(), key.Args()...). OrderBy("position"). GroupBy("t.id"). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }, ) ) p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)) if p != nil { for _, v := range p.Value().([]relation.ProductVariant) { variants = append(variants, v.As()) } } return variants, nil } func (db *clickhouse) CollectionProducts(ln model.LanguageCode, id string) ([]*generated.Product, error) { var ( products []*generated.Product key = productKey("has(t.collections, ?)", id) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.Product rows, err := db.session. Select(productSelection(ln, defaultCurrency)...). From(fmt.Sprintf("%s as t", key.Table())). LeftJoin("product_variant", "product_variant.product_id = t.id"). LeftJoin("inventory_item", "inventory_item.id = product_variant.inventory_item_id"). LeftJoin("inventory_level", "inventory_level.inventory_item_id = product_variant.inventory_item_id"). Where(key.Clause(), key.Args()...). GroupBy("t.id"). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }, ) ) p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)) if p == nil { return nil, fmt.Errorf("not found") } for _, row := range p.Value().([]relation.Product) { products = append(products, row.As()) } return products, nil } func (db *clickhouse) ProductVariantOptions(ln model.LanguageCode, id string) ([]*generated.SelectedOption, error) { var ( options []*generated.SelectedOption key = productOptionKey("product_variant.id = ?", id) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.ProductOption rows, err := db.session.SelectBySql("SELECT "+ strings.Join(productOptionSelectedSelection(ln), ", ")+ " FROM `"+key.Table()+"`"+ " ARRAY JOIN (SELECT `options` from `product_variant` where `id` = ?) as opt"+ " WHERE id = tupleElement(opt, 1)"+ " ORDER BY position", key.Args()). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }, ) ) if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil { for _, option := range p.Value().([]relation.ProductOption) { options = append(options, option.AsSelected()) } } return options, nil } func (db *clickhouse) Ping() error { return db.session.Ping() } func (db *clickhouse) Close() error { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() if db.cache != nil { db.cache.DeleteAll() db.cache.Stop() } }() go func() { defer wg.Done() if db.session != nil { _ = db.session.Close() } }() return nil }