From 4d14ad812cc2536032a468939039b08caeb041b1 Mon Sep 17 00:00:00 2001 From: Deluan Date: Wed, 25 Mar 2026 08:03:21 -0400 Subject: [PATCH] feat(stream): integrate TranscodingThrottle into NewStream --- core/stream/media_streamer.go | 16 +++++++++++-- core/stream/media_streamer_test.go | 37 ++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/core/stream/media_streamer.go b/core/stream/media_streamer.go index 84fa84321..da605fb8e 100644 --- a/core/stream/media_streamer.go +++ b/core/stream/media_streamer.go @@ -92,6 +92,11 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req return s, nil } + // Acquire throttle slot before accessing cache (which may spawn ffmpeg on miss) + if err := ms.throttle.Acquire(ctx); err != nil { + return nil, err + } + job := &streamJob{ ms: ms, mf: mf, @@ -105,12 +110,19 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req } r, err := ms.cache.Get(ctx, job) if err != nil { + ms.throttle.Release() log.Error(ctx, "Error accessing transcoding cache", "id", mf.ID, err) return nil, err } cached = r.Cached - - s.ReadCloser = r + if cached { + // Cache hit — no ffmpeg process running, release the slot immediately + ms.throttle.Release() + s.ReadCloser = r + } else { + // Cache miss — slot released when stream is closed + s.ReadCloser = &releaseOnClose{ReadCloser: r, release: ms.throttle.Release} + } s.Seeker = r.Seeker log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", filePath, diff --git a/core/stream/media_streamer_test.go b/core/stream/media_streamer_test.go index c7c7268fd..a5380324b 100644 --- a/core/stream/media_streamer_test.go +++ b/core/stream/media_streamer_test.go @@ -73,4 +73,41 @@ var _ = Describe("MediaStreamer", func() { Expect(s.Seekable()).To(BeTrue()) }) }) + + Context("NewStream with throttle", func() { + var mf *model.MediaFile + BeforeEach(func() { + var err error + mf, err = ds.MediaFile(ctx).Get("123") + Expect(err).ToNot(HaveOccurred()) + }) + + It("returns ErrTranscodingBusy when throttle rejects", func() { + throttle := stream.NewTranscodingThrottle(1, 1, 50*time.Millisecond) + Expect(throttle.Acquire(context.Background())).To(Succeed()) + + testCache := stream.NewTranscodingCache() + Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue()) + throttledStreamer := stream.NewMediaStreamer(ds, ffmpeg, testCache, throttle) + + _, err := throttledStreamer.NewStream(ctx, mf, stream.Request{Format: "mp3", BitRate: 64}) + Expect(err).To(MatchError(stream.ErrTranscodingBusy)) + throttle.Release() + }) + + It("does not throttle raw/direct-play requests", func() { + throttle := stream.NewTranscodingThrottle(1, 1, 50*time.Millisecond) + Expect(throttle.Acquire(context.Background())).To(Succeed()) + + testCache := stream.NewTranscodingCache() + Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue()) + throttledStreamer := stream.NewMediaStreamer(ds, ffmpeg, testCache, throttle) + + s, err := throttledStreamer.NewStream(ctx, mf, stream.Request{Format: "raw"}) + Expect(err).ToNot(HaveOccurred()) + Expect(s).ToNot(BeNil()) + _ = s.Close() + throttle.Release() + }) + }) })