Browse Source

Daemon

- draft implementation
Alexey Kim 10 months ago
parent
commit
a3a9087acc
6 changed files with 260 additions and 39 deletions
  1. 121 0
      app/daemon/action.go
  2. 0 35
      app/daemon/cmd.go
  3. 19 0
      app/daemon/command.go
  4. 49 1
      app/daemon/config.go
  5. 67 0
      app/daemon/flags.go
  6. 4 3
      app/main.go

+ 121 - 0
app/daemon/action.go

@@ -0,0 +1,121 @@
+package daemon
+
+import (
+	"bytes"
+	"errors"
+	"fmt"
+	"git.beejay.kim/WatchDog/ward/internal/download"
+	"git.beejay.kim/WatchDog/ward/model"
+	"git.beejay.kim/WatchDog/ward/platform"
+	"git.beejay.kim/tool/service/config"
+	"github.com/rs/zerolog/log"
+	"github.com/urfave/cli/v2"
+	"google.golang.org/protobuf/proto"
+	"io"
+	"os"
+	"path/filepath"
+	"time"
+)
+
+var action = func(ctx *cli.Context) error {
+	var (
+		cfg = config.Get[Configuration](ctx)
+		p   platform.Platform
+
+		downloader *download.HLS
+		fOut       *os.File
+
+		exit      = make(chan error)
+		chMessage = make(chan proto.Message, 256)
+		chStream  chan []byte
+		err       error
+	)
+
+	defer func() {
+		if fOut != nil {
+			_ = fOut.Close()
+		}
+
+		if chMessage != nil {
+			close(chMessage)
+		}
+
+		if chStream != nil {
+			close(chStream)
+		}
+	}()
+
+	if cfg == nil {
+		return errors.New("could not load Configuration")
+	}
+
+	if p, err = cfg.Platform.Detect(pid, bid); err != nil {
+		return err
+	}
+
+	if fOut, err = os.OpenFile(
+		filepath.Join(path, fmt.Sprintf("%s_%s.TS", bid, time.Now().String())),
+		os.O_APPEND|os.O_WRONLY|os.O_CREATE,
+		0666); err != nil {
+		return err
+	}
+
+	go func() {
+		if err = p.Connect(chMessage); err != nil {
+			exit <- err
+		}
+	}()
+
+	if downloader, err = download.NewHLS(p.HLSStream, time.Second*2); err != nil {
+		return err
+	}
+
+	chStream = downloader.Start()
+
+	for {
+		select {
+		case err := <-exit:
+			return err
+		case buff := <-chStream:
+			if buff == nil {
+				close(chStream)
+
+				go func() {
+					<-time.After(time.Second * 5)
+					chStream = downloader.Start()
+				}()
+
+				log.Info().Msg("stream has been closed; we arranged next retry attempt")
+				continue
+			}
+
+			if _, err = io.Copy(fOut, bytes.NewBuffer(buff)); err != nil {
+				return err
+			}
+		case m := <-chMessage:
+			switch t := m.(type) {
+			case *model.Message:
+				log.Info().
+					Any("user", t.User).
+					Str("text", t.Text).
+					Bool("emoticon", t.Sticker).
+					Msg("message")
+			case *model.RosterChange:
+			//TODO:
+			case *model.Online:
+				log.Info().
+					Uint64("total", t.Total).
+					Uint64("mobile", t.DeviceMobile).
+					Uint64("pc", t.DevicePc).
+					Msg("online")
+			case *model.Donation:
+				log.Info().
+					Any("user", t.User).
+					Uint64("amount", t.Amount).
+					Msg("donation")
+			default:
+				log.Info().Any("data", m).Msg("message")
+			}
+		}
+	}
+}

+ 0 - 35
app/daemon/cmd.go

@@ -1,35 +0,0 @@
-package daemon
-
-import (
-	"fmt"
-	"git.beejay.kim/tool/service/config"
-	"github.com/urfave/cli/v2"
-)
-
-var (
-	CMD = cli.Command{
-		Name:  "daemon",
-		Usage: "run daemon",
-		Action: func(ctx *cli.Context) error {
-			var (
-				cfg = config.Get[Configuration](ctx)
-			)
-
-			if cfg == nil {
-				return fmt.Errorf("could not load Configuration")
-			}
-
-			return fmt.Errorf("implement me")
-		},
-		Flags: []cli.Flag{
-			&cli.PathFlag{
-				Name:      "config",
-				Usage:     "config file path",
-				Required:  true,
-				Aliases:   []string{"C"},
-				TakesFile: false,
-				Action:    config.Load[Configuration],
-			},
-		},
-	}
-)

+ 19 - 0
app/daemon/command.go

@@ -0,0 +1,19 @@
+package daemon
+
+import (
+	"github.com/urfave/cli/v2"
+)
+
+var (
+	topic string
+	pid   string
+	bid   string
+	path  string
+
+	Command = cli.Command{
+		Name:   "daemon",
+		Usage:  "run daemon",
+		Flags:  flags,
+		Action: action,
+	}
+)

+ 49 - 1
app/daemon/config.go

@@ -1,11 +1,59 @@
 package daemon
 
-import "git.beejay.kim/tool/service/config"
+import (
+	"errors"
+	"git.beejay.kim/WatchDog/ward/platform"
+	"git.beejay.kim/tool/service/config"
+	"git.beejay.kim/tool/service/consumer"
+	"git.beejay.kim/tool/service/producer"
+	"reflect"
+	"strings"
+)
 
 type Configuration struct {
 	config.Configuration `yaml:",inline"`
+
+	Consumer consumer.Config `yaml:"consumer"`
+	Producer producer.Config `yaml:"producer"`
+
+	Platform _platform `yaml:"platform"`
 }
 
 func (c Configuration) Invalidate() error {
 	return config.Invalidate(c)
 }
+
+type _platform struct {
+	Afreeca *platform.Afreeca `yaml:"afreeca"`
+}
+
+func (r *_platform) Detect(pid, bid string) (platform.Platform, error) {
+	if pid = strings.TrimSpace(pid); pid == "" {
+		return nil, errors.New("empty platform")
+	}
+
+	val := reflect.ValueOf(r).Elem()
+	if val.Kind() != reflect.Struct {
+		if val.IsZero() {
+			return nil, errors.New("config is empty")
+		}
+
+		val = reflect.ValueOf(r).Elem()
+	}
+
+	for k := 0; k < val.NumField(); k++ {
+		if !val.Field(k).CanInterface() || val.Field(k).IsZero() {
+			continue
+		}
+
+		if elm, ok := val.Field(k).Interface().(platform.Platform); ok && elm.Id() == pid {
+			if err := elm.Init(bid); err != nil {
+				return nil, err
+			}
+
+			return elm, nil
+		}
+	}
+
+	return nil, errors.New("platform not found")
+}

+ 67 - 0
app/daemon/flags.go

@@ -0,0 +1,67 @@
+package daemon
+
+import (
+	"git.beejay.kim/tool/service/config"
+	"github.com/urfave/cli/v2"
+	"os"
+	"path/filepath"
+)
+
+var flags = []cli.Flag{
+	&cli.PathFlag{
+		Name:      "config",
+		Usage:     "config file path",
+		Required:  true,
+		Aliases:   []string{"C"},
+		TakesFile: false,
+		Action:    config.Load[Configuration],
+	},
+	&cli.StringFlag{
+		Name:        "topic",
+		Usage:       "kafka producer topic name",
+		Required:    true,
+		Destination: &topic,
+		Aliases:     []string{"T"},
+	},
+	&cli.StringFlag{
+		Name:        "pid",
+		Usage:       "platform ID",
+		Required:    true,
+		Destination: &pid,
+	},
+	&cli.StringFlag{
+		Name:        "bid",
+		Usage:       "broadcaster ID",
+		Required:    true,
+		Destination: &bid,
+	},
+	&cli.PathFlag{
+		Name:        "out",
+		TakesFile:   false,
+		Destination: &path,
+		Action: func(ctx *cli.Context, path cli.Path) error {
+			var (
+				f   *os.File
+				err error
+			)
+
+			defer func() {
+				if f == nil {
+					return
+				}
+
+				_ = f.Close()
+				_ = os.Remove(f.Name())
+			}()
+
+			if f, err = os.OpenFile(
+				filepath.Join(path, ".watchdog"),
+				os.O_APPEND|os.O_WRONLY|os.O_CREATE,
+				0666); err != nil {
+				return err
+			}
+
+			return nil
+		},
+	},
+}

+ 4 - 3
app/main.go

@@ -16,6 +16,7 @@ func init() {
 	// Default log level is info, unless debug flag is present
 	zerolog.SetGlobalLevel(zerolog.InfoLevel)
 	zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
+	log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
 
 	app = &cli.App{
 		Name:                 version.Name,
@@ -33,7 +34,7 @@ func init() {
 			&cli.BoolFlag{
 				Name:    "debug",
 				Usage:   "sets log level to debug",
-				Aliases: []string{"d"},
+				Aliases: []string{"D"},
 				Action: func(ctx *cli.Context, debug bool) error {
 					zerolog.SetGlobalLevel(zerolog.DebugLevel)
 					return nil
@@ -41,13 +42,13 @@ func init() {
 			},
 		},
 		Commands: []*cli.Command{
-			&daemon.CMD,
+			&daemon.Command,
 		},
 	}
 }
 
 func main() {
 	if err := app.Run(os.Args); err != nil {
-		log.Fatal().Err(err).Send()
+		log.Err(err).Send()
 	}
 }