2
0

action.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package daemon
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "git.beejay.kim/WatchDog/ward/internal/download"
  7. "git.beejay.kim/WatchDog/ward/model"
  8. "git.beejay.kim/WatchDog/ward/platform"
  9. "git.beejay.kim/tool/service/config"
  10. "github.com/rs/zerolog/log"
  11. "github.com/urfave/cli/v2"
  12. "google.golang.org/protobuf/proto"
  13. "io"
  14. "os"
  15. "path/filepath"
  16. "time"
  17. )
  18. var action = func(ctx *cli.Context) error {
  19. var (
  20. cfg = config.Get[Configuration](ctx)
  21. p platform.Platform
  22. downloader *download.HLS
  23. fOut *os.File
  24. exit = make(chan error)
  25. chMessage = make(chan proto.Message, 256)
  26. chStream chan []byte
  27. err error
  28. )
  29. defer func() {
  30. if fOut != nil {
  31. _ = fOut.Close()
  32. }
  33. if chMessage != nil {
  34. close(chMessage)
  35. }
  36. if chStream != nil {
  37. close(chStream)
  38. }
  39. }()
  40. if cfg == nil {
  41. return errors.New("could not load Configuration")
  42. }
  43. if p, err = cfg.Platform.Detect(pid, bid); err != nil {
  44. return err
  45. }
  46. if fOut, err = os.OpenFile(
  47. filepath.Join(path, fmt.Sprintf("%s_%s.TS", bid, time.Now().String())),
  48. os.O_APPEND|os.O_WRONLY|os.O_CREATE,
  49. 0666); err != nil {
  50. return err
  51. }
  52. go func() {
  53. if err = p.Connect(chMessage); err != nil {
  54. exit <- err
  55. }
  56. }()
  57. // usually HLS stream segments have 2s length;
  58. //
  59. if downloader, err = download.NewHLS(p.HLSStream, time.Second*2); err != nil {
  60. return err
  61. }
  62. chStream = downloader.Start()
  63. for {
  64. select {
  65. case err := <-exit:
  66. return err
  67. case buff := <-chStream:
  68. if buff == nil {
  69. go func() {
  70. <-time.After(time.Second * 5)
  71. chStream = downloader.Start()
  72. }()
  73. log.Info().Msg("stream has been closed; we arranged next retry attempt")
  74. continue
  75. }
  76. if _, err = io.Copy(fOut, bytes.NewBuffer(buff)); err != nil {
  77. return err
  78. }
  79. case m := <-chMessage:
  80. switch t := m.(type) {
  81. case *model.Message:
  82. log.Info().
  83. Any("user", t.User).
  84. Str("text", t.Text).
  85. Bool("emoticon", t.Sticker).
  86. Msg("message")
  87. case *model.RosterChange:
  88. //TODO:
  89. case *model.Online:
  90. log.Info().
  91. Uint64("total", t.Total).
  92. Uint64("mobile", t.DeviceMobile).
  93. Uint64("pc", t.DevicePc).
  94. Msg("online")
  95. case *model.Donation:
  96. log.Info().
  97. Any("user", t.User).
  98. Uint64("amount", t.Amount).
  99. Msg("donation")
  100. default:
  101. log.Info().Any("data", m).Msg("message")
  102. }
  103. }
  104. }
  105. }