2
0

hls.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package download
  2. import (
  3. "context"
  4. "errors"
  5. "github.com/grafov/m3u8"
  6. "github.com/rs/zerolog/log"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "strings"
  11. "sync/atomic"
  12. "time"
  13. )
  14. type HLSOptions struct {
  15. Timeout time.Duration
  16. MaxAttempts int
  17. }
  18. type HLS struct {
  19. ctx context.Context
  20. ctxClose context.CancelFunc
  21. exit chan any
  22. options HLSOptions
  23. client *http.Client
  24. request *http.Request
  25. retryAttempts atomic.Uint32
  26. }
  27. func NewHLS(r func() (*http.Request, error), opts HLSOptions) (*HLS, error) {
  28. if r == nil {
  29. return nil, errors.New("illegal state: empty requester")
  30. }
  31. var (
  32. d = HLS{
  33. options: opts,
  34. client: &http.Client{
  35. Transport: func() *http.Transport {
  36. transport := http.DefaultTransport.(*http.Transport)
  37. transport.Proxy = http.ProxyFromEnvironment
  38. return transport
  39. }(),
  40. Timeout: opts.Timeout,
  41. },
  42. }
  43. err error
  44. )
  45. if d.request, err = r(); err != nil {
  46. return nil, err
  47. }
  48. d.ctx, d.ctxClose = context.WithCancel(context.Background())
  49. return &d, nil
  50. }
  51. func (d *HLS) Close() error {
  52. log.Info().Msg("request to close HLS recorder")
  53. if d.ctxClose != nil {
  54. d.ctxClose()
  55. }
  56. d.exit <- 0
  57. return nil
  58. }
  59. func (d *HLS) Start() chan []byte {
  60. d.exit = make(chan any)
  61. var (
  62. c = make(chan []byte)
  63. medialist *m3u8.MediaPlaylist
  64. segments = make(map[uint64]any)
  65. err error
  66. )
  67. go func() {
  68. defer close(c)
  69. for {
  70. select {
  71. case <-d.exit:
  72. c <- nil
  73. return
  74. default:
  75. if medialist, err = d.getMediaList(); err != nil {
  76. log.Debug().
  77. Err(err).
  78. Msg("could not retrieve m3u8 playlist")
  79. c <- nil
  80. return
  81. }
  82. if medialist.Closed {
  83. c <- nil
  84. return
  85. }
  86. duration := time.NewTicker(time.Duration(medialist.TargetDuration) * time.Second)
  87. for _, seq := range medialist.Segments {
  88. if seq == nil {
  89. continue
  90. }
  91. if _, ok := segments[seq.SeqId]; ok {
  92. continue
  93. }
  94. segments[seq.SeqId] = 0
  95. var (
  96. uri = seq.URI
  97. request *http.Request
  98. response *http.Response
  99. buff []byte
  100. )
  101. // pre-handle relative segment URIs
  102. if !strings.HasPrefix(uri, "http") {
  103. if s, err := d.request.URL.Parse(uri); err != nil {
  104. log.Debug().Err(err).Msg("could not build segment abs segment URI")
  105. c <- nil
  106. return
  107. } else {
  108. if uri, err = url.QueryUnescape(s.String()); err != nil {
  109. log.Debug().Err(err).Msg("could not build segment abs segment URI")
  110. c <- nil
  111. return
  112. }
  113. }
  114. }
  115. request = d.request.Clone(d.ctx)
  116. request.Method = http.MethodGet
  117. if request.URL, err = request.URL.Parse(uri); err != nil {
  118. log.Debug().Err(err).Msg("could not parse segment URI")
  119. c <- nil
  120. return
  121. }
  122. if response, err = d.client.Do(request); err != nil {
  123. log.Debug().Err(err).Msg("could not retrieve segment data")
  124. c <- nil
  125. return
  126. }
  127. if buff, err = io.ReadAll(response.Body); err != nil {
  128. log.Debug().Err(err).Msg("could not read segment data")
  129. c <- nil
  130. return
  131. }
  132. _ = response.Body.Close()
  133. c <- buff
  134. }
  135. <-duration.C
  136. }
  137. }
  138. }()
  139. return c
  140. }
  141. func (d *HLS) requestMediaList(request *http.Request) (*http.Response, error) {
  142. var (
  143. response *http.Response
  144. err error
  145. )
  146. response, err = d.client.Do(request)
  147. if err != nil && errors.Is(err, http.ErrHandlerTimeout) {
  148. if d.retryAttempts.Load() == uint32(d.options.MaxAttempts) {
  149. return nil, err
  150. }
  151. select {
  152. case <-time.After(time.Second):
  153. }
  154. if d.options.MaxAttempts != -1 {
  155. d.retryAttempts.Add(1)
  156. }
  157. return d.requestMediaList(request)
  158. }
  159. if err != nil {
  160. return nil, err
  161. }
  162. d.retryAttempts.Store(0)
  163. return response, nil
  164. }
  165. func (d *HLS) getMediaList() (*m3u8.MediaPlaylist, error) {
  166. var (
  167. request *http.Request
  168. response *http.Response
  169. playlist m3u8.Playlist
  170. t m3u8.ListType
  171. err error
  172. )
  173. request = d.request.Clone(d.ctx)
  174. if response, err = d.requestMediaList(request); err != nil {
  175. return nil, err
  176. }
  177. if playlist, t, err = m3u8.DecodeFrom(response.Body, false); err != nil {
  178. return nil, err
  179. }
  180. if m3u8.MEDIA != t {
  181. return nil, errors.New("invalid media type")
  182. }
  183. return playlist.(*m3u8.MediaPlaylist), nil
  184. }