123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- package download
- import (
- "context"
- "errors"
- "github.com/grafov/m3u8"
- "github.com/rs/zerolog/log"
- "io"
- "net/http"
- "net/url"
- "strings"
- "time"
- )
- type HLS struct {
- ctx context.Context
- ctxClose context.CancelFunc
- runtimeClose chan any
- client *http.Client
- request *http.Request
- }
- func NewHLS(r func() (*http.Request, error), t time.Duration) (*HLS, error) {
- if r == nil {
- return nil, errors.New("illegal state: empty requester")
- }
- var (
- d = HLS{
- client: &http.Client{
- Transport: func() *http.Transport {
- transport := http.DefaultTransport.(*http.Transport)
- transport.Proxy = http.ProxyFromEnvironment
- return transport
- }(),
- Timeout: t,
- },
- }
- err error
- )
- if d.request, err = r(); err != nil {
- return nil, err
- }
- d.ctx, d.ctxClose = context.WithCancel(context.Background())
- return &d, nil
- }
- func (d *HLS) Close() error {
- log.Info().Msg("request to close HLS recorder")
- if d.ctxClose != nil {
- d.ctxClose()
- }
- if d.runtimeClose != nil {
- d.runtimeClose <- 0
- }
- return nil
- }
- func (d *HLS) Start() chan []byte {
- d.runtimeClose = make(chan any)
- var (
- c = make(chan []byte)
- medialist *m3u8.MediaPlaylist
- segments = make(map[uint64]any)
- err error
- )
- go func() {
- defer close(c)
- for {
- select {
- case <-d.runtimeClose:
- c <- nil
- return
- default:
- if medialist, err = d.getMediaList(); err != nil {
- log.Debug().Err(err).Msg("could not retrieve m3u8 playlist")
- c <- nil
- return
- }
- if medialist.Closed {
- c <- nil
- return
- }
- duration := time.NewTicker(time.Duration(medialist.TargetDuration) * time.Second)
- for _, seq := range medialist.Segments {
- if seq == nil {
- continue
- }
- if _, ok := segments[seq.SeqId]; ok {
- continue
- }
- segments[seq.SeqId] = 0
- var (
- uri = seq.URI
- request *http.Request
- response *http.Response
- buff []byte
- )
- // pre-handle relative segment URIs
- if !strings.HasPrefix(uri, "http") {
- if s, err := d.request.URL.Parse(uri); err != nil {
- log.Debug().Err(err).Msg("could not build segment abs segment URI")
- c <- nil
- return
- } else {
- if uri, err = url.QueryUnescape(s.String()); err != nil {
- log.Debug().Err(err).Msg("could not build segment abs segment URI")
- c <- nil
- return
- }
- }
- }
- request = d.request.Clone(d.ctx)
- request.Method = http.MethodGet
- if request.URL, err = request.URL.Parse(uri); err != nil {
- log.Debug().Err(err).Msg("could not parse segment URI")
- c <- nil
- return
- }
- if response, err = d.client.Do(request); err != nil {
- log.Debug().Err(err).Msg("could not retrieve segment data")
- c <- nil
- return
- }
- if buff, err = io.ReadAll(response.Body); err != nil {
- log.Debug().Err(err).Msg("could not read segment data")
- c <- nil
- return
- }
- _ = response.Body.Close()
- c <- buff
- }
- <-duration.C
- }
- }
- }()
- return c
- }
- func (d *HLS) getMediaList() (*m3u8.MediaPlaylist, error) {
- var (
- request *http.Request
- response *http.Response
- playlist m3u8.Playlist
- t m3u8.ListType
- err error
- )
- request = d.request.Clone(d.ctx)
- if response, err = d.client.Do(request); err != nil {
- return nil, err
- }
- if playlist, t, err = m3u8.DecodeFrom(response.Body, false); err != nil {
- return nil, err
- }
- if m3u8.MEDIA != t {
- return nil, errors.New("invalid media type")
- }
- return playlist.(*m3u8.MediaPlaylist), nil
- }
|