feat(stream): integrate TranscodingThrottle into NewStream

This commit is contained in:
Deluan 2026-03-25 08:03:21 -04:00
parent b018d7b634
commit 4d14ad812c
2 changed files with 51 additions and 2 deletions

View File

@ -92,6 +92,11 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req
return s, nil
}
// Acquire throttle slot before accessing cache (which may spawn ffmpeg on miss)
if err := ms.throttle.Acquire(ctx); err != nil {
return nil, err
}
job := &streamJob{
ms: ms,
mf: mf,
@ -105,12 +110,19 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req
}
r, err := ms.cache.Get(ctx, job)
if err != nil {
ms.throttle.Release()
log.Error(ctx, "Error accessing transcoding cache", "id", mf.ID, err)
return nil, err
}
cached = r.Cached
s.ReadCloser = r
if cached {
// Cache hit — no ffmpeg process running, release the slot immediately
ms.throttle.Release()
s.ReadCloser = r
} else {
// Cache miss — slot released when stream is closed
s.ReadCloser = &releaseOnClose{ReadCloser: r, release: ms.throttle.Release}
}
s.Seeker = r.Seeker
log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", filePath,

View File

@ -73,4 +73,41 @@ var _ = Describe("MediaStreamer", func() {
Expect(s.Seekable()).To(BeTrue())
})
})
Context("NewStream with throttle", func() {
var mf *model.MediaFile
BeforeEach(func() {
var err error
mf, err = ds.MediaFile(ctx).Get("123")
Expect(err).ToNot(HaveOccurred())
})
It("returns ErrTranscodingBusy when throttle rejects", func() {
throttle := stream.NewTranscodingThrottle(1, 1, 50*time.Millisecond)
Expect(throttle.Acquire(context.Background())).To(Succeed())
testCache := stream.NewTranscodingCache()
Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue())
throttledStreamer := stream.NewMediaStreamer(ds, ffmpeg, testCache, throttle)
_, err := throttledStreamer.NewStream(ctx, mf, stream.Request{Format: "mp3", BitRate: 64})
Expect(err).To(MatchError(stream.ErrTranscodingBusy))
throttle.Release()
})
It("does not throttle raw/direct-play requests", func() {
throttle := stream.NewTranscodingThrottle(1, 1, 50*time.Millisecond)
Expect(throttle.Acquire(context.Background())).To(Succeed())
testCache := stream.NewTranscodingCache()
Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue())
throttledStreamer := stream.NewMediaStreamer(ds, ffmpeg, testCache, throttle)
s, err := throttledStreamer.NewStream(ctx, mf, stream.Request{Format: "raw"})
Expect(err).ToNot(HaveOccurred())
Expect(s).ToNot(BeNil())
_ = s.Close()
throttle.Release()
})
})
})