|
@@ -1 +1,104 @@
|
|
|
package clickhouse
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "fmt"
|
|
|
+ "git.beejay.kim/tool/service"
|
|
|
+ "github.com/ClickHouse/clickhouse-go/v2"
|
|
|
+ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
|
|
+ "github.com/google/uuid"
|
|
|
+ "github.com/rs/zerolog/log"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+type _ch struct {
|
|
|
+ config *Config
|
|
|
+ cli clickhouse.Conn
|
|
|
+ debug bool
|
|
|
+}
|
|
|
+
|
|
|
+func New(config Config, debug bool) (service.Clickhouse, error) {
|
|
|
+ var err error
|
|
|
+ ch := &_ch{
|
|
|
+ config: &config,
|
|
|
+ debug: debug,
|
|
|
+ }
|
|
|
+
|
|
|
+ if ch.cli, err = clickhouse.Open(&clickhouse.Options{
|
|
|
+ Protocol: clickhouse.Native,
|
|
|
+ Addr: []string{
|
|
|
+ fmt.Sprintf("%s:%d", config.Host, config.Port),
|
|
|
+ },
|
|
|
+ Auth: clickhouse.Auth{
|
|
|
+ Database: config.Database,
|
|
|
+ Username: config.Username,
|
|
|
+ Password: config.Password,
|
|
|
+ },
|
|
|
+ Debug: debug,
|
|
|
+ Compression: &clickhouse.Compression{
|
|
|
+ Method: clickhouse.CompressionLZ4,
|
|
|
+ },
|
|
|
+ DialTimeout: time.Duration(config.Params.Timeout) * time.Second,
|
|
|
+ ReadTimeout: time.Duration(config.Params.ReadTimeout) * time.Second,
|
|
|
+ }); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if err = ch.Ping(context.TODO()); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return ch, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ch *_ch) ID() uuid.UUID {
|
|
|
+ return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("db.clickhouse"))
|
|
|
+}
|
|
|
+
|
|
|
+func (ch *_ch) Close() error {
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ wg.Add(1)
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+
|
|
|
+ if ch.cli != nil {
|
|
|
+ _ = ch.cli.Close()
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Debug().
|
|
|
+ Str("clickhouse", "stop session").
|
|
|
+ Send()
|
|
|
+ }()
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ch *_ch) Ping(ctx context.Context) error {
|
|
|
+ return ch.cli.Ping(ctx)
|
|
|
+}
|
|
|
+
|
|
|
+func (ch *_ch) Query(ctx context.Context, query string, args ...any) (driver.Rows, error) {
|
|
|
+ return ch.cli.Query(ctx, query, args...)
|
|
|
+}
|
|
|
+
|
|
|
+//goland:noinspection ALL
|
|
|
+func (ch *_ch) Batch(ctx context.Context, tblName string, release bool) (driver.Batch, error) {
|
|
|
+ if tblName = strings.TrimSpace(tblName); tblName == "" {
|
|
|
+ return nil, fmt.Errorf("illegal argumets: empty table name")
|
|
|
+ }
|
|
|
+
|
|
|
+ var (
|
|
|
+ query = fmt.Sprintf("INSERT INTO `%s`", tblName)
|
|
|
+ opts []driver.PrepareBatchOption
|
|
|
+ )
|
|
|
+
|
|
|
+ if release {
|
|
|
+ opts = append(opts, driver.WithReleaseConnection())
|
|
|
+ }
|
|
|
+
|
|
|
+ return ch.cli.PrepareBatch(ctx, query, opts...)
|
|
|
+}
|