package db import ( "context" "fmt" "github.com/gshopify/service-wrapper/config" "github.com/gshopify/service-wrapper/db" "github.com/gshopify/service-wrapper/model" "github.com/jellydator/ttlcache/v3" "github.com/mailru/dbr" _ "github.com/mailru/go-clickhouse" "gshopper.com/gshopify/shop/graphql/generated" "gshopper.com/gshopify/shop/relation" "net/url" "sync" ) type clickhouse struct { ctx context.Context config *db.Config session *dbr.Session cache *ttlcache.Cache[string, any] } func New(ctx context.Context, forceDebug bool) (Database, error) { r := &clickhouse{ ctx: ctx, config: db.New(), cache: ttlcache.New[string, any]( ttlcache.WithTTL[string, any](cacheTimeout), ttlcache.WithCapacity[string, any](cacheCapacity), ), } if err := config.Instance().Load(ctx, r.config); err != nil { return nil, err } if forceDebug { r.config.Params.Debug = true } //goland:noinspection HttpUrlsUsage source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s", url.QueryEscape(r.config.Username), url.QueryEscape(r.config.Password), r.config.Host, r.config.Port, r.config.Database)) if err != nil { return nil, err } kv := make(url.Values) kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout)) kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout)) kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout)) kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug)) source.RawQuery = kv.Encode() con, err := dbr.Open("clickhouse", source.String(), nil) if err != nil { return nil, fmt.Errorf("could not establish Clickhouse session: %v", err) } r.session = con.NewSessionContext(ctx, nil) if err = r.session.Ping(); err != nil { return nil, err } go r.cache.Start() return r, nil } func (db *clickhouse) Location(ln model.LanguageCode, id string) (*generated.Location, error) { var ( key = locationKey("id=?", id) l = ttlcache.LoaderFunc[string, any]( func(ttl *ttlcache.Cache[string, any], _ string) *ttlcache.Item[string, any] { var loc relation.Location rows, err := db.session. Select(locationSelection(ln)...). From(key.Table()). Where(key.Clause(), key.Args()...). Limit(1). Load(&loc) if rows < 1 || err != nil { return nil } return ttl.Set(key.String(), loc, key.TTL()) }, ) ) p := db.cache.Get(key.String(), ttlcache.WithLoader[string, any](l)) if p == nil { return nil, fmt.Errorf("not found") } loc := p.Value().(relation.Location) return loc.As(), nil } func (db *clickhouse) Ping() error { return db.session.Ping() } func (db *clickhouse) Close() error { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() if db.cache != nil { db.cache.DeleteAll() db.cache.Stop() } }() go func() { defer wg.Done() if db.session != nil { _ = db.session.Close() } }() return nil }