2
0

action.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package daemon
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "git.beejay.kim/WatchDog/ward/internal/download"
  7. "git.beejay.kim/WatchDog/ward/model"
  8. "git.beejay.kim/WatchDog/ward/platform"
  9. "git.beejay.kim/tool/service/config"
  10. "github.com/rs/zerolog/log"
  11. "github.com/urfave/cli/v2"
  12. "google.golang.org/protobuf/proto"
  13. "io"
  14. "os"
  15. "path/filepath"
  16. "time"
  17. )
  18. var action = func(ctx *cli.Context) error {
  19. var (
  20. cfg = config.Get[Configuration](ctx)
  21. p platform.Platform
  22. downloader *download.HLS
  23. fOut *os.File
  24. exit = make(chan error)
  25. chMessage = make(chan proto.Message, 256)
  26. chStream chan []byte
  27. err error
  28. )
  29. defer func() {
  30. if fOut != nil {
  31. _ = fOut.Close()
  32. }
  33. if chMessage != nil {
  34. close(chMessage)
  35. }
  36. if chStream != nil {
  37. close(chStream)
  38. }
  39. }()
  40. if cfg == nil {
  41. return errors.New("could not load Configuration")
  42. }
  43. if p, err = cfg.Platform.Detect(pid, bid); err != nil {
  44. return err
  45. }
  46. if fOut, err = os.OpenFile(
  47. filepath.Join(path, fmt.Sprintf("%s_%s.TS", bid, time.Now().String())),
  48. os.O_APPEND|os.O_WRONLY|os.O_CREATE,
  49. 0666); err != nil {
  50. return err
  51. }
  52. go func() {
  53. if err = p.Connect(chMessage); err != nil {
  54. exit <- err
  55. }
  56. }()
  57. if downloader, err = download.NewHLS(p.HLSStream, time.Second*2); err != nil {
  58. return err
  59. }
  60. chStream = downloader.Start()
  61. for {
  62. select {
  63. case err := <-exit:
  64. return err
  65. case buff := <-chStream:
  66. if buff == nil {
  67. close(chStream)
  68. go func() {
  69. <-time.After(time.Second * 5)
  70. chStream = downloader.Start()
  71. }()
  72. log.Info().Msg("stream has been closed; we arranged next retry attempt")
  73. continue
  74. }
  75. if _, err = io.Copy(fOut, bytes.NewBuffer(buff)); err != nil {
  76. return err
  77. }
  78. case m := <-chMessage:
  79. switch t := m.(type) {
  80. case *model.Message:
  81. log.Info().
  82. Any("user", t.User).
  83. Str("text", t.Text).
  84. Bool("emoticon", t.Sticker).
  85. Msg("message")
  86. case *model.RosterChange:
  87. //TODO:
  88. case *model.Online:
  89. log.Info().
  90. Uint64("total", t.Total).
  91. Uint64("mobile", t.DeviceMobile).
  92. Uint64("pc", t.DevicePc).
  93. Msg("online")
  94. case *model.Donation:
  95. log.Info().
  96. Any("user", t.User).
  97. Uint64("amount", t.Amount).
  98. Msg("donation")
  99. default:
  100. log.Info().Any("data", m).Msg("message")
  101. }
  102. }
  103. }
  104. }