Browse Source

HLS Stream

- recorder
Alexey Kim 10 months ago
parent
commit
42b30b5881
1 changed files with 187 additions and 0 deletions
  1. 187 0
      internal/download/hls.go

+ 187 - 0
internal/download/hls.go

@@ -0,0 +1,187 @@
+package download
+
+import (
+	"context"
+	"errors"
+	"github.com/grafov/m3u8"
+	"github.com/rs/zerolog/log"
+	"io"
+	"net/http"
+	"net/url"
+	"strings"
+	"time"
+)
+
+type HLS struct {
+	ctx          context.Context
+	ctxClose     context.CancelFunc
+	runtimeClose chan any
+
+	client  *http.Client
+	request *http.Request
+}
+
+func NewHLS(r func() (*http.Request, error), t time.Duration) (*HLS, error) {
+	if r == nil {
+		return nil, errors.New("illegal state: empty requester")
+	}
+
+	var (
+		d = HLS{
+			client: &http.Client{
+				Transport: func() *http.Transport {
+					transport := http.DefaultTransport.(*http.Transport)
+					transport.Proxy = http.ProxyFromEnvironment
+					return transport
+				}(),
+				Timeout: t,
+			},
+		}
+		err error
+	)
+
+	if d.request, err = r(); err != nil {
+		return nil, err
+	}
+
+	d.ctx, d.ctxClose = context.WithCancel(context.Background())
+	return &d, nil
+}
+
+func (d *HLS) Close() error {
+	log.Info().Msg("request to close HLS recorder")
+
+	if d.ctxClose != nil {
+		d.ctxClose()
+	}
+
+	if d.runtimeClose != nil {
+		d.runtimeClose <- 0
+	}
+
+	return nil
+}
+
+func (d *HLS) Start() chan []byte {
+	d.runtimeClose = make(chan any)
+
+	var (
+		c         = make(chan []byte)
+		medialist *m3u8.MediaPlaylist
+		segments  = make(map[uint64]any)
+		err       error
+	)
+
+	go func() {
+		defer close(c)
+
+		for {
+			select {
+			case <-d.runtimeClose:
+				c <- nil
+				return
+			default:
+				if medialist, err = d.getMediaList(); err != nil {
+					log.Debug().Err(err).Msg("could not retrieve m3u8 playlist")
+					c <- nil
+					return
+				}
+
+				if medialist.Closed {
+					c <- nil
+					return
+				}
+
+				duration := time.NewTicker(time.Duration(medialist.TargetDuration) * time.Second)
+
+				for _, seq := range medialist.Segments {
+					if seq == nil {
+						continue
+					}
+
+					if _, ok := segments[seq.SeqId]; ok {
+						continue
+					}
+
+					segments[seq.SeqId] = 0
+
+					var (
+						uri      = seq.URI
+						request  *http.Request
+						response *http.Response
+						buff     []byte
+					)
+
+					// pre-handle relative segment URIs
+					if !strings.HasPrefix(uri, "http") {
+						if s, err := d.request.URL.Parse(uri); err != nil {
+							log.Debug().Err(err).Msg("could not build segment abs segment URI")
+							c <- nil
+							return
+						} else {
+							if uri, err = url.QueryUnescape(s.String()); err != nil {
+								log.Debug().Err(err).Msg("could not build segment abs segment URI")
+								c <- nil
+								return
+							}
+						}
+					}
+
+					request = d.request.Clone(d.ctx)
+					request.Method = http.MethodGet
+
+					if request.URL, err = request.URL.Parse(uri); err != nil {
+						log.Debug().Err(err).Msg("could not parse segment URI")
+						c <- nil
+						return
+					}
+
+					if response, err = d.client.Do(request); err != nil {
+						log.Debug().Err(err).Msg("could not retrieve segment data")
+						c <- nil
+						return
+					}
+
+					if buff, err = io.ReadAll(response.Body); err != nil {
+						log.Debug().Err(err).Msg("could not read segment data")
+						c <- nil
+						return
+					}
+
+					_ = response.Body.Close()
+					c <- buff
+				}
+
+				<-duration.C
+			}
+		}
+	}()
+
+	return c
+}
+
+func (d *HLS) getMediaList() (*m3u8.MediaPlaylist, error) {
+	var (
+		request  *http.Request
+		response *http.Response
+
+		playlist m3u8.Playlist
+		t        m3u8.ListType
+		err      error
+	)
+
+	request = d.request.Clone(d.ctx)
+	if response, err = d.client.Do(request); err != nil {
+		return nil, err
+	}
+
+	if playlist, t, err = m3u8.DecodeFrom(response.Body, false); err != nil {
+		return nil, err
+	}
+
+	if m3u8.MEDIA != t {
+		return nil, errors.New("invalid media type")
+	}
+
+	return playlist.(*m3u8.MediaPlaylist), nil
+}