package db import ( "context" "fmt" "github.com/gshopify/service-wrapper/config" "github.com/gshopify/service-wrapper/db" "github.com/gshopify/service-wrapper/model" "github.com/gshopify/service-wrapper/server/middleware" "github.com/jellydator/ttlcache/v3" "github.com/mailru/dbr" _ "github.com/mailru/go-clickhouse" "gshopper.com/gshopify/products/graphql/generated" "gshopper.com/gshopify/products/relation" "net/url" "strings" "sync" ) type clickhouse struct { ctx context.Context config *db.Config session *dbr.Session cache *ttlcache.Cache[string, any] } func New(ctx context.Context, forceDebug bool) (Database, error) { r := &clickhouse{ ctx: ctx, config: db.New(), cache: ttlcache.New[string, any]( ttlcache.WithTTL[string, any](cacheTimeout), ttlcache.WithCapacity[string, any](cacheCapacity), ), } if err := config.Instance().Load(ctx, r.config); err != nil { return nil, err } if forceDebug { r.config.Params.Debug = true } //goland:noinspection HttpUrlsUsage source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s", url.QueryEscape(r.config.Username), url.QueryEscape(r.config.Password), r.config.Host, r.config.Port, r.config.Database)) if err != nil { return nil, err } kv := make(url.Values) kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout)) kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout)) kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout)) kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug)) source.RawQuery = kv.Encode() con, err := dbr.Open("clickhouse", source.String(), nil) if err != nil { return nil, fmt.Errorf("could not establish Clickhouse session: %v", err) } r.session = con.NewSessionContext(ctx, nil) if err = r.session.Ping(); err != nil { return nil, err } go r.cache.Start() return r, nil } func (db *clickhouse) ProductCollections(ln model.LanguageCode, id string) ([]*generated.Collection, error) { var ( collections []*generated.Collection key = productCollectionKey("product.id=?", id) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.ProductCollection rows, err := db.session.SelectBySql("SELECT "+ strings.Join(productCollectionSelection(ln), ", ")+ " FROM `"+key.Table()+"`"+ " ARRAY JOIN (SELECT `collections` FROM `product` WHERE `id` = ?) AS cid"+ " WHERE `id` = cid", key.Args()...). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }) ) if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil { for _, row := range p.Value().([]relation.ProductCollection) { collections = append(collections, row.As()) } } return collections, nil } func (db *clickhouse) Product(ln model.LanguageCode, handle *string, id *string) (*generated.Product, error) { var ( clause = strings.Builder{} vars = []any{model.ProductStatusActive} ) clause.WriteString("status=?") if id != nil { clause.WriteString(" AND id=?") vars = append(vars, *id) } if handle != nil { clause.WriteString(" AND handle=?") vars = append(vars, *handle) } var ( key = productKey(clause.String(), vars...) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { o := relation.Product{} rows, err := db.session. Select(productSelection(ln)...). From(key.Table()). Where(key.Clause(), key.Args()...). Limit(1). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }) ) p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)) if p == nil { return nil, fmt.Errorf("not found") } product := p.Value().(relation.Product) return product.As(), nil } func (db *clickhouse) ProductOptions(ln model.LanguageCode, id string) ([]*generated.ProductOption, error) { var ( options []*generated.ProductOption key = productOptionKey("product_id=?", id) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.ProductOption rows, err := db.session.SelectBySql("SELECT "+ strings.Join(productOptionSelection(ln), ", ")+ " FROM `"+key.Table()+"`"+ " ARRAY JOIN (SELECT `options` FROM `product` WHERE `id` = ?) AS oid"+ " WHERE `id` = oid", key.Args()). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }, ) ) if p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)); p != nil { for _, v := range p.Value().([]relation.ProductOption) { options = append(options, v.As()) } } return options, nil } func (db *clickhouse) ProductVariants(ctx *middleware.GShopifyContext, id string) ([]*generated.ProductVariant, error) { var ( variants []*generated.ProductVariant key = productVariantKey("product_id=?", id) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.ProductVariant rows, err := db.session. Select( "t.id as id", "t.product_id as product_id", "t.options as options", fmt.Sprintf("t.price['%s'] as price", defaultCurrency), fmt.Sprintf("t.unit_price['%s'] as unit_price", defaultCurrency), fmt.Sprintf("t.compare_at_price['%s'] as compare_at_price", defaultCurrency), "t.barcode as barcode", "t.sku as sku", "t.hs_code as hs_code", "t.origin_country as origin_country", "t.allow_backorder as allow_backorder", "t.manage_inventory as manage_inventory", "t.unit_price_measurement as unit_price_measurement", "t.weight as weight", "t.wight_unit as weight_unit", "t.created_at as created_at", "t.updated_at as updated_at", "t.deleted_at as deleted_at", "shopping_profile.type as type", ctx.Language.SqlFieldSelection("title", "t"), ). From(fmt.Sprintf("%s as t", key.Table())). Where(key.Clause(), key.Args()...). LeftJoin("product", "product.id = t.product_id"). LeftJoin("shopping_profile", "product.profile_id = shopping_profile.id"). OrderBy("t.created_at"). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }, ) ) p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)) if p != nil { for _, v := range p.Value().([]relation.ProductVariant) { variants = append(variants, v.As()) } } return variants, nil } func (db *clickhouse) CollectionProducts(ln model.LanguageCode, id string) ([]*generated.Product, error) { var ( products []*generated.Product key = productKey("has(collections, ?)", id) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var o []relation.Product rows, err := db.session. Select(productSelection(ln)...). From(key.Table()). Where(key.Clause(), key.Args()...). Load(&o) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), o, key.TTL()) }, ) ) p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)) if p == nil { return nil, fmt.Errorf("not found") } for _, row := range p.Value().([]relation.Product) { products = append(products, row.As()) } return products, nil } func (db *clickhouse) Ping() error { return db.session.Ping() } func (db *clickhouse) Close() error { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() if db.cache != nil { db.cache.DeleteAll() db.cache.Stop() } }() go func() { defer wg.Done() if db.session != nil { _ = db.session.Close() } }() return nil }