hls.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package download
  2. import (
  3. "context"
  4. "errors"
  5. "github.com/grafov/m3u8"
  6. "github.com/rs/zerolog/log"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "strings"
  11. "time"
  12. )
  13. type HLS struct {
  14. ctx context.Context
  15. ctxClose context.CancelFunc
  16. runtimeClose chan any
  17. client *http.Client
  18. request *http.Request
  19. }
  20. func NewHLS(r func() (*http.Request, error), t time.Duration) (*HLS, error) {
  21. if r == nil {
  22. return nil, errors.New("illegal state: empty requester")
  23. }
  24. var (
  25. d = HLS{
  26. client: &http.Client{
  27. Transport: func() *http.Transport {
  28. transport := http.DefaultTransport.(*http.Transport)
  29. transport.Proxy = http.ProxyFromEnvironment
  30. return transport
  31. }(),
  32. Timeout: t,
  33. },
  34. }
  35. err error
  36. )
  37. if d.request, err = r(); err != nil {
  38. return nil, err
  39. }
  40. d.ctx, d.ctxClose = context.WithCancel(context.Background())
  41. return &d, nil
  42. }
  43. func (d *HLS) Close() error {
  44. log.Info().Msg("request to close HLS recorder")
  45. if d.ctxClose != nil {
  46. d.ctxClose()
  47. }
  48. if d.runtimeClose != nil {
  49. d.runtimeClose <- 0
  50. }
  51. return nil
  52. }
  53. func (d *HLS) Start() chan []byte {
  54. d.runtimeClose = make(chan any)
  55. var (
  56. c = make(chan []byte)
  57. medialist *m3u8.MediaPlaylist
  58. segments = make(map[uint64]any)
  59. err error
  60. )
  61. go func() {
  62. defer close(c)
  63. for {
  64. select {
  65. case <-d.runtimeClose:
  66. c <- nil
  67. return
  68. default:
  69. if medialist, err = d.getMediaList(); err != nil {
  70. log.Debug().Err(err).Msg("could not retrieve m3u8 playlist")
  71. c <- nil
  72. return
  73. }
  74. if medialist.Closed {
  75. c <- nil
  76. return
  77. }
  78. duration := time.NewTicker(time.Duration(medialist.TargetDuration) * time.Second)
  79. for _, seq := range medialist.Segments {
  80. if seq == nil {
  81. continue
  82. }
  83. if _, ok := segments[seq.SeqId]; ok {
  84. continue
  85. }
  86. segments[seq.SeqId] = 0
  87. var (
  88. uri = seq.URI
  89. request *http.Request
  90. response *http.Response
  91. buff []byte
  92. )
  93. // pre-handle relative segment URIs
  94. if !strings.HasPrefix(uri, "http") {
  95. if s, err := d.request.URL.Parse(uri); err != nil {
  96. log.Debug().Err(err).Msg("could not build segment abs segment URI")
  97. c <- nil
  98. return
  99. } else {
  100. if uri, err = url.QueryUnescape(s.String()); err != nil {
  101. log.Debug().Err(err).Msg("could not build segment abs segment URI")
  102. c <- nil
  103. return
  104. }
  105. }
  106. }
  107. request = d.request.Clone(d.ctx)
  108. request.Method = http.MethodGet
  109. if request.URL, err = request.URL.Parse(uri); err != nil {
  110. log.Debug().Err(err).Msg("could not parse segment URI")
  111. c <- nil
  112. return
  113. }
  114. if response, err = d.client.Do(request); err != nil {
  115. log.Debug().Err(err).Msg("could not retrieve segment data")
  116. c <- nil
  117. return
  118. }
  119. if buff, err = io.ReadAll(response.Body); err != nil {
  120. log.Debug().Err(err).Msg("could not read segment data")
  121. c <- nil
  122. return
  123. }
  124. _ = response.Body.Close()
  125. c <- buff
  126. }
  127. <-duration.C
  128. }
  129. }
  130. }()
  131. return c
  132. }
  133. func (d *HLS) getMediaList() (*m3u8.MediaPlaylist, error) {
  134. var (
  135. request *http.Request
  136. response *http.Response
  137. playlist m3u8.Playlist
  138. t m3u8.ListType
  139. err error
  140. )
  141. request = d.request.Clone(d.ctx)
  142. if response, err = d.client.Do(request); err != nil {
  143. return nil, err
  144. }
  145. if playlist, t, err = m3u8.DecodeFrom(response.Body, false); err != nil {
  146. return nil, err
  147. }
  148. if m3u8.MEDIA != t {
  149. return nil, errors.New("invalid media type")
  150. }
  151. return playlist.(*m3u8.MediaPlaylist), nil
  152. }