1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- package db
- import (
- "context"
- "fmt"
- "github.com/gshopify/service-wrapper/config"
- "github.com/gshopify/service-wrapper/db"
- "github.com/jellydator/ttlcache/v3"
- "github.com/mailru/dbr"
- _ "github.com/mailru/go-clickhouse"
- "net/url"
- "sync"
- )
- type clickhouse struct {
- ctx context.Context
- config *db.Config
- session *dbr.Session
- cache *ttlcache.Cache[string, any]
- }
- func Clickhouse(ctx context.Context, forceDebug bool) (Database, error) {
- r := &clickhouse{
- ctx: ctx,
- config: db.New(),
- cache: ttlcache.New[string, any](
- ttlcache.WithTTL[string, any](cacheTimeout),
- ttlcache.WithCapacity[string, any](cacheCapacity),
- ),
- }
- if err := config.Instance().Load(ctx, r.config); err != nil {
- return nil, err
- }
- if forceDebug {
- r.config.Params.Debug = true
- }
- //goland:noinspection HttpUrlsUsage
- source, err := url.Parse(fmt.Sprintf("http://%s:%s@%s:%d/%s",
- url.QueryEscape(r.config.Username),
- url.QueryEscape(r.config.Password),
- r.config.Host,
- r.config.Port,
- r.config.Database))
- if err != nil {
- return nil, err
- }
- kv := make(url.Values)
- kv.Set("timeout", fmt.Sprintf("%ds", r.config.Params.Timeout))
- kv.Set("read_timeout", fmt.Sprintf("%ds", r.config.Params.ReadTimeout))
- kv.Set("write_timeout", fmt.Sprintf("%ds", r.config.Params.WriteTimeout))
- kv.Set("debug", fmt.Sprintf("%v", r.config.Params.Debug))
- source.RawQuery = kv.Encode()
- con, err := dbr.Open("clickhouse", source.String(), nil)
- if err != nil {
- return nil, fmt.Errorf("could not establish Clickhouse session: %v", err)
- }
- r.session = con.NewSessionContext(ctx, nil)
- if err = r.session.Ping(); err != nil {
- return nil, err
- }
- go r.cache.Start()
- return r, nil
- }
- func (db *clickhouse) Ping() error {
- return db.session.Ping()
- }
- func (db *clickhouse) Close() error {
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- defer wg.Done()
- if db.cache != nil {
- db.cache.DeleteAll()
- db.cache.Stop()
- }
- }()
- go func() {
- defer wg.Done()
- if db.session != nil {
- _ = db.session.Close()
- }
- }()
- return nil
- }
|