Alexey Kim 1 yıl önce
ebeveyn
işleme
ddabd631c5
4 değiştirilmiş dosya ile 105 ekleme ve 83 silme
  1. 29 36
      app/daemon/action.go
  2. 45 4
      app/daemon/command.go
  3. 7 28
      app/daemon/flags.go
  4. 24 15
      platform/afreeca.go

+ 29 - 36
app/daemon/action.go

@@ -1,9 +1,7 @@
 package daemon
 
 import (
-	"bytes"
 	"errors"
-	"fmt"
 	"git.beejay.kim/WatchDog/ward/internal/download"
 	"git.beejay.kim/WatchDog/ward/model"
 	"git.beejay.kim/WatchDog/ward/platform"
@@ -11,35 +9,32 @@ import (
 	"github.com/rs/zerolog/log"
 	"github.com/urfave/cli/v2"
 	"google.golang.org/protobuf/proto"
-	"io"
-	"os"
-	"path/filepath"
 	"time"
 )
 
 var action = func(ctx *cli.Context) error {
 	var (
-		cfg = config.Get[Configuration](ctx)
-		p   platform.Platform
-
+		cfg        = config.Get[Configuration](ctx)
+		p          platform.Platform
 		downloader *download.HLS
-		fOut       *os.File
 
-		exit      = make(chan error)
 		chMessage = make(chan proto.Message, 256)
 		chStream  chan []byte
 		err       error
 	)
 
 	defer func() {
-		if fOut != nil {
-			_ = fOut.Close()
-		}
+		for i := range players {
+			if players[i] == nil {
+				continue
+			}
 
-		if chMessage != nil {
-			close(chMessage)
+			_ = players[i].Close()
 		}
 
+		close(chMessage)
+		close(ctrlSignal)
+
 		if chStream != nil {
 			close(chStream)
 		}
@@ -53,26 +48,28 @@ var action = func(ctx *cli.Context) error {
 		return err
 	}
 
-	if fOut, err = os.OpenFile(
-		filepath.Join(path, fmt.Sprintf("%s_%s.TS", bid, time.Now().String())),
-		os.O_APPEND|os.O_WRONLY|os.O_CREATE,
-		0666); err != nil {
-		return err
-	}
-
 	go func() {
 		if err = p.Connect(chMessage); err != nil {
 			exit <- err
 		}
 	}()
 
-	// usually HLS stream segments have 2s length;
-	//
-	if downloader, err = download.NewHLS(p.HLSStream, time.Second*2); err != nil {
-		return err
-	}
+	if len(players) > 0 {
+		if downloader, err = download.NewHLS(p.HLSStream, download.HLSOptions{
+			Timeout:     time.Second * 10,
+			MaxAttempts: 9,
+		}); err != nil {
+			return err
+		}
 
-	chStream = downloader.Start()
+		for i := range players {
+			if err = players[i].Start(exit); err != nil {
+				return err
+			}
+		}
+
+		chStream = downloader.Start()
+	}
 
 	for {
 		select {
@@ -80,17 +77,13 @@ var action = func(ctx *cli.Context) error {
 			return err
 		case buff := <-chStream:
 			if buff == nil {
-				go func() {
-					<-time.After(time.Second * 5)
-					chStream = downloader.Start()
-				}()
-
-				log.Info().Msg("stream has been closed; we arranged next retry attempt")
 				continue
 			}
 
-			if _, err = io.Copy(fOut, bytes.NewBuffer(buff)); err != nil {
-				return err
+			for i := range players {
+				if err = players[i].Write(buff); err != nil {
+					return err
+				}
 			}
 		case m := <-chMessage:
 			switch t := m.(type) {

+ 45 - 4
app/daemon/command.go

@@ -1,19 +1,60 @@
 package daemon
 
 import (
+	"errors"
+	"git.beejay.kim/WatchDog/ward/player"
 	"github.com/urfave/cli/v2"
+	"os"
+	"os/signal"
+	"syscall"
 )
 
 var (
-	topic string
-	pid   string
-	bid   string
-	path  string
+	topic      string
+	pid        string
+	bid        string
+	pathRecord string
+	pathPlayer string
+
+	players    []player.Player
+	exit       = make(chan error, 8)
+	ctrlSignal = make(chan os.Signal)
 
 	Command = cli.Command{
 		Name:   "daemon",
 		Usage:  "run daemon",
 		Flags:  flags,
 		Action: action,
+		Before: func(ctx *cli.Context) error {
+			// Intercept Interrupt signals
+			signal.Notify(ctrlSignal, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+			go func() {
+				<-ctrlSignal
+				exit <- errors.New("interrupted by user")
+			}()
+
+			var (
+				p   player.Player
+				err error
+			)
+
+			if pathRecord != "" {
+				if p, err = player.NewRecorder(pathRecord, bid); err != nil {
+					return err
+				}
+
+				players = append(players, p)
+			}
+
+			if pathPlayer == "vlc" {
+				if p, err = player.NewVLC(ctx.Context); err != nil {
+					return err
+				}
+
+				players = append(players, p)
+			}
+
+			return nil
+		},
 	}
 )

+ 7 - 28
app/daemon/flags.go

@@ -3,8 +3,6 @@ package daemon
 import (
 	"git.beejay.kim/tool/service/config"
 	"github.com/urfave/cli/v2"
-	"os"
-	"path/filepath"
 )
 
 var flags = []cli.Flag{
@@ -36,32 +34,13 @@ var flags = []cli.Flag{
 		Destination: &bid,
 	},
 	&cli.PathFlag{
-		Name:        "out",
+		Name:        "record",
 		TakesFile:   false,
-		Destination: &path,
-		Action: func(ctx *cli.Context, path cli.Path) error {
-			var (
-				f   *os.File
-				err error
-			)
-
-			defer func() {
-				if f == nil {
-					return
-				}
-
-				_ = f.Close()
-				_ = os.Remove(f.Name())
-			}()
-
-			if f, err = os.OpenFile(
-				filepath.Join(path, ".watchdog"),
-				os.O_APPEND|os.O_WRONLY|os.O_CREATE,
-				0666); err != nil {
-				return err
-			}
-
-			return nil
-		},
+		Destination: &pathRecord,
+	},
+	&cli.StringFlag{
+		Name:        "player",
+		Usage:       "whether we want to watch stream in realtime",
+		Destination: &pathPlayer,
 	},
 }

+ 24 - 15
platform/afreeca.go

@@ -29,8 +29,9 @@ const (
 )
 
 type Afreeca struct {
-	Username string `yaml:"username"`
-	Password string `yaml:"password"`
+	ForceAuth bool   `yaml:"force-auth"`
+	Username  string `yaml:"username"`
+	Password  string `yaml:"password"`
 
 	host string
 	rest *http.Client
@@ -65,6 +66,13 @@ func (p *Afreeca) Init(bid string) error {
 		Timeout: time.Second * 30,
 	}
 
+	// Authorize if forced
+	if p.ForceAuth {
+		if err = p.login(); err != nil {
+			return err
+		}
+	}
+
 	// Retrieve and validate channel data
 	if p.info, err = p.getChannelData(); err != nil {
 		return err
@@ -181,19 +189,20 @@ func (p *Afreeca) Connect(ch chan proto.Message) error {
 					return err
 				}
 			case afreeca.BridgeCommandUserCountExtended:
-				t := model.Online{
-					At:           timestamppb.New(time.Now()),
-					Platform:     p.Id(),
-					Broadcaster:  p.host,
-					Raw:          buf,
-					DevicePc:     cast.ToUint64(m.Data["uiJoinChPCUser"]),
-					DeviceMobile: cast.ToUint64(m.Data["uiJoinChMBUser"]),
-				}
-
-				t.Total = t.DeviceMobile + t.DevicePc + t.DeviceUnknown
-				if err = p.push(ch, &t); err != nil {
-					return err
-				}
+				//t := model.Online{
+				//	At:           timestamppb.New(time.Now()),
+				//	Platform:     p.Id(),
+				//	Broadcaster:  p.host,
+				//	Raw:          buf,
+				//	DevicePc:     cast.ToUint64(m.Data["uiJoinChPCUser"]),
+				//	DeviceMobile: cast.ToUint64(m.Data["uiJoinChMBUser"]),
+				//}
+				//
+				//t.Total = t.DeviceMobile + t.DevicePc + t.DeviceUnknown
+				//if err = p.push(ch, &t); err != nil {
+				//	return err
+				//}
+				continue
 			case afreeca.BridgeCommandClosed:
 				return p.onBridgeClosed(bridge, m.Data)
 			default: