|
@@ -0,0 +1,245 @@
|
|
|
+package db
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "fmt"
|
|
|
+ "github.com/gshopify/service-wrapper/config"
|
|
|
+ "github.com/gshopify/service-wrapper/db"
|
|
|
+ "github.com/gshopify/service-wrapper/model"
|
|
|
+ "github.com/jellydator/ttlcache/v3"
|
|
|
+ "github.com/mailru/dbr"
|
|
|
+ _ "github.com/mailru/go-clickhouse"
|
|
|
+ "gshopper.com/gshopify/products/graphql/generated"
|
|
|
+ "gshopper.com/gshopify/products/relation"
|
|
|
+ "net/url"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+)
|
|
|
+
|
|
|
+type clickhouse struct {
|
|
|
+ ctx context.Context
|
|
|
+ config *db.Config
|
|
|
+ session *dbr.Session
|
|
|
+ cache *ttlcache.Cache[string, any]
|
|
|
+}
|
|
|
+
|
|
|
+func New(ctx context.Context, forceDebug bool) (Database, error) {
|
|
|
+ r := &clickhouse{
|
|
|
+ ctx: ctx,
|
|
|
+ config: db.New(),
|
|
|
+ cache: ttlcache.New[string, any](
|
|
|
+ ttlcache.WithTTL[string, any](cacheTimeout),
|
|
|
+ ttlcache.WithCapacity[string, any](cacheCapacity),
|
|
|
+ ),
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := config.Instance().Load(ctx, r.config); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if forceDebug {
|
|
|
+ r.config.Params.Debug = true
|
|
|
+ }
|
|
|
+
|
|
|
+ //goland:noinspection HttpUrlsUsage
|
|
|
+ source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s",
|
|
|
+ url.QueryEscape(r.config.Username),
|
|
|
+ url.QueryEscape(r.config.Password),
|
|
|
+ r.config.Host,
|
|
|
+ r.config.Port,
|
|
|
+ r.config.Database))
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ kv := make(url.Values)
|
|
|
+ kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout))
|
|
|
+ kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout))
|
|
|
+ kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout))
|
|
|
+ kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug))
|
|
|
+ source.RawQuery = kv.Encode()
|
|
|
+
|
|
|
+ con, err := dbr.Open("clickhouse", source.String(), nil)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("could not establish Clickhouse session: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ r.session = con.NewSessionContext(ctx, nil)
|
|
|
+
|
|
|
+ if err = r.session.Ping(); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ go r.cache.Start()
|
|
|
+ return r, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (db *clickhouse) ProductCollections(ln model.LanguageCode, id string) ([]*generated.Collection, error) {
|
|
|
+ var (
|
|
|
+ collections []*generated.Collection
|
|
|
+ key = productCollectionKey("product.id=?", id)
|
|
|
+ l = ttlcache.LoaderFunc[string, any](
|
|
|
+ func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
|
|
|
+ var o []relation.Collection
|
|
|
+
|
|
|
+ rows, err := db.session.SelectBySql("SELECT "+
|
|
|
+ ln.SqlFieldSelection("title")+", "+ln.SqlFieldSelection("description")+", `id`, `handle`, `thumbnail`, "+
|
|
|
+ "`created_at`, `updated_at`, `deleted_at` "+
|
|
|
+ "FROM `product_collection` "+
|
|
|
+ "ARRAY JOIN (SELECT `collection_ids` FROM `product` WHERE `id` = ?) AS cid "+
|
|
|
+ "WHERE `id` = cid "+
|
|
|
+ "ORDER BY `created_at` ASC;", key.Args()...).
|
|
|
+ Load(&o)
|
|
|
+ if rows < 1 || err != nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ return ttl.Set(key.String(), o, key.TTL())
|
|
|
+ })
|
|
|
+ )
|
|
|
+
|
|
|
+ p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
|
|
|
+ if p == nil {
|
|
|
+ return nil, fmt.Errorf("not found")
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, row := range p.Value().([]relation.Collection) {
|
|
|
+ collections = append(collections, row.As())
|
|
|
+ }
|
|
|
+
|
|
|
+ return collections, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (db *clickhouse) Product(ln model.LanguageCode, handle *string, id *string) (*generated.Product, error) {
|
|
|
+ var (
|
|
|
+ clause = strings.Builder{}
|
|
|
+ vars = []any{relation.ProductStatusPublished}
|
|
|
+ )
|
|
|
+
|
|
|
+ clause.WriteString("status=?")
|
|
|
+
|
|
|
+ if id != nil {
|
|
|
+ clause.WriteString(" AND id=?")
|
|
|
+ vars = append(vars, *id)
|
|
|
+ }
|
|
|
+
|
|
|
+ if handle != nil {
|
|
|
+ clause.WriteString(" AND handle=?")
|
|
|
+ vars = append(vars, *handle)
|
|
|
+ }
|
|
|
+
|
|
|
+ var (
|
|
|
+ key = productKey(clause.String(), vars...)
|
|
|
+ l = ttlcache.LoaderFunc[string, any](
|
|
|
+ func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
|
|
|
+ o := relation.Product{}
|
|
|
+ rows, err := db.session.
|
|
|
+ Select(productSelection(ln)...).
|
|
|
+ From(key.Table()).
|
|
|
+ Where(key.Clause(), key.Args()...).
|
|
|
+ OrderBy("created_at").
|
|
|
+ Limit(1).
|
|
|
+ Load(&o)
|
|
|
+ if rows < 1 || err != nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ return ttl.Set(key.String(), o, key.TTL())
|
|
|
+ })
|
|
|
+ )
|
|
|
+
|
|
|
+ p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
|
|
|
+ if p == nil {
|
|
|
+ return nil, fmt.Errorf("not found")
|
|
|
+ }
|
|
|
+
|
|
|
+ product := p.Value().(relation.Product)
|
|
|
+ return product.As(), nil
|
|
|
+}
|
|
|
+
|
|
|
+func (db *clickhouse) ProductOptions(ln model.LanguageCode, id string) ([]*generated.ProductOption, error) {
|
|
|
+ var options []*generated.ProductOption
|
|
|
+ var o []relation.ProductOption
|
|
|
+ _, err := db.session.
|
|
|
+ Select(
|
|
|
+ "id",
|
|
|
+ "product_id",
|
|
|
+ "created_at", "updated_at", "deleted_at",
|
|
|
+ ln.SqlFieldSelection("name"),
|
|
|
+ ln.SqlArraySelection("values")).
|
|
|
+ From("product_option").
|
|
|
+ Where("product_id=?", id).
|
|
|
+ OrderBy("created_at").
|
|
|
+ Load(&o)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, v := range o {
|
|
|
+ options = append(options, v.As())
|
|
|
+ }
|
|
|
+
|
|
|
+ return options, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (db *clickhouse) CollectionProducts(ln model.LanguageCode, id string) ([]*generated.Product, error) {
|
|
|
+ var (
|
|
|
+ products []*generated.Product
|
|
|
+ key = productKey("has(collection_ids, ?)", id)
|
|
|
+ l = ttlcache.LoaderFunc[string, any](
|
|
|
+ func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] {
|
|
|
+ var o []relation.Product
|
|
|
+ rows, err := db.session.
|
|
|
+ Select(productSelection(ln)...).
|
|
|
+ From(key.Table()).
|
|
|
+ Where(key.Clause(), key.Args()...).
|
|
|
+ OrderBy("created_at").
|
|
|
+ Load(&o)
|
|
|
+ if rows < 1 || err != nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ return ttl.Set(key.String(), o, key.TTL())
|
|
|
+ },
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l))
|
|
|
+ if p == nil {
|
|
|
+ return nil, fmt.Errorf("not found")
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, row := range p.Value().([]relation.Product) {
|
|
|
+ products = append(products, row.As())
|
|
|
+ }
|
|
|
+
|
|
|
+ return products, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (db *clickhouse) Ping() error {
|
|
|
+ return db.session.Ping()
|
|
|
+}
|
|
|
+
|
|
|
+func (db *clickhouse) Close() error {
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ wg.Add(2)
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+
|
|
|
+ if db.cache != nil {
|
|
|
+ db.cache.DeleteAll()
|
|
|
+ db.cache.Stop()
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+
|
|
|
+ if db.session != nil {
|
|
|
+ _ = db.session.Close()
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|