mirror of
https://github.com/navidrome/navidrome.git
synced 2026-05-03 06:51:16 +00:00
refactor(stream): make TranscodingThrottle an internal component of mediaStreamer
The throttle is an implementation detail of mediaStreamer, not a dependency that needs to be injected. Remove it from the constructor and wire providers; create it internally via a singleton. This simplifies the API and keeps Release() unexported.
This commit is contained in:
parent
c7865ff159
commit
faa4acf583
@ -96,8 +96,7 @@ func CreateSubsonicAPIRouter(ctx context.Context) *subsonic.Router {
|
||||
provider := external.NewProvider(dataStore, agentsAgents)
|
||||
artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider)
|
||||
transcodingCache := stream.GetTranscodingCache()
|
||||
transcodingThrottle := stream.GetTranscodingThrottle()
|
||||
mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache, transcodingThrottle)
|
||||
mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache)
|
||||
share := core.NewShare(dataStore)
|
||||
archiver := core.NewArchiver(mediaStreamer, dataStore, share)
|
||||
players := core.NewPlayers(dataStore)
|
||||
@ -125,8 +124,7 @@ func CreatePublicRouter() *public.Router {
|
||||
provider := external.NewProvider(dataStore, agentsAgents)
|
||||
artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider)
|
||||
transcodingCache := stream.GetTranscodingCache()
|
||||
transcodingThrottle := stream.GetTranscodingThrottle()
|
||||
mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache, transcodingThrottle)
|
||||
mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache)
|
||||
share := core.NewShare(dataStore)
|
||||
archiver := core.NewArchiver(mediaStreamer, dataStore, share)
|
||||
router := public.New(dataStore, artworkArtwork, mediaStreamer, share, archiver)
|
||||
|
||||
@ -27,15 +27,14 @@ type MediaStreamer interface {
|
||||
|
||||
type TranscodingCache cache.FileCache
|
||||
|
||||
func NewMediaStreamer(ds model.DataStore, t ffmpeg.FFmpeg, cache TranscodingCache, throttle *TranscodingThrottle) MediaStreamer {
|
||||
return &mediaStreamer{ds: ds, transcoder: t, cache: cache, throttle: throttle}
|
||||
func NewMediaStreamer(ds model.DataStore, t ffmpeg.FFmpeg, cache TranscodingCache) MediaStreamer {
|
||||
return &mediaStreamer{ds: ds, transcoder: t, cache: cache}
|
||||
}
|
||||
|
||||
type mediaStreamer struct {
|
||||
ds model.DataStore
|
||||
transcoder ffmpeg.FFmpeg
|
||||
cache cache.FileCache
|
||||
throttle *TranscodingThrottle
|
||||
}
|
||||
|
||||
type streamJob struct {
|
||||
@ -93,7 +92,7 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req
|
||||
}
|
||||
|
||||
// Acquire throttle slot before accessing cache (which may spawn ffmpeg on miss)
|
||||
if err := ms.throttle.Acquire(ctx); err != nil {
|
||||
if err := getTranscodingThrottle().Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -110,18 +109,18 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req
|
||||
}
|
||||
r, err := ms.cache.Get(ctx, job)
|
||||
if err != nil {
|
||||
ms.throttle.Release()
|
||||
getTranscodingThrottle().Release()
|
||||
log.Error(ctx, "Error accessing transcoding cache", "id", mf.ID, err)
|
||||
return nil, err
|
||||
}
|
||||
cached = r.Cached
|
||||
if cached {
|
||||
// Cache hit — no ffmpeg process running, release the slot immediately
|
||||
ms.throttle.Release()
|
||||
getTranscodingThrottle().Release()
|
||||
s.ReadCloser = r
|
||||
} else {
|
||||
// Cache miss — slot released when stream is closed
|
||||
s.ReadCloser = &releaseOnClose{ReadCloser: r, release: ms.throttle.Release}
|
||||
s.ReadCloser = &releaseOnClose{ReadCloser: r, release: getTranscodingThrottle().Release}
|
||||
}
|
||||
s.Seeker = r.Seeker
|
||||
|
||||
|
||||
@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
"github.com/navidrome/navidrome/conf/configtest"
|
||||
@ -32,7 +31,8 @@ var _ = Describe("MediaStreamer", func() {
|
||||
})
|
||||
testCache := stream.NewTranscodingCache()
|
||||
Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue())
|
||||
streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache, stream.NewTranscodingThrottle(0, 100, time.Minute))
|
||||
conf.Server.MaxConcurrentTranscodes = 0 // Disable throttling for general tests
|
||||
streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache)
|
||||
})
|
||||
AfterEach(func() {
|
||||
_ = os.RemoveAll(conf.Server.CacheFolder)
|
||||
@ -82,32 +82,11 @@ var _ = Describe("MediaStreamer", func() {
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
})
|
||||
|
||||
It("returns ErrTranscodingBusy when throttle rejects", func() {
|
||||
throttle := stream.NewTranscodingThrottle(1, 1, 50*time.Millisecond)
|
||||
Expect(throttle.Acquire(context.Background())).To(Succeed())
|
||||
|
||||
testCache := stream.NewTranscodingCache()
|
||||
Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue())
|
||||
throttledStreamer := stream.NewMediaStreamer(ds, ffmpeg, testCache, throttle)
|
||||
|
||||
_, err := throttledStreamer.NewStream(ctx, mf, stream.Request{Format: "mp3", BitRate: 64})
|
||||
Expect(err).To(MatchError(stream.ErrTranscodingBusy))
|
||||
throttle.Release()
|
||||
})
|
||||
|
||||
It("does not throttle raw/direct-play requests", func() {
|
||||
throttle := stream.NewTranscodingThrottle(1, 1, 50*time.Millisecond)
|
||||
Expect(throttle.Acquire(context.Background())).To(Succeed())
|
||||
|
||||
testCache := stream.NewTranscodingCache()
|
||||
Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue())
|
||||
throttledStreamer := stream.NewMediaStreamer(ds, ffmpeg, testCache, throttle)
|
||||
|
||||
s, err := throttledStreamer.NewStream(ctx, mf, stream.Request{Format: "raw"})
|
||||
It("does not throttle raw/direct-play requests even when throttle is saturated", func() {
|
||||
s, err := streamer.NewStream(ctx, mf, stream.Request{Format: "raw"})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(s).ToNot(BeNil())
|
||||
_ = s.Close()
|
||||
throttle.Release()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@ -18,8 +18,8 @@ import (
|
||||
// because the concurrency limit and backlog are both full.
|
||||
var ErrTranscodingBusy = errors.New("too many concurrent transcodes")
|
||||
|
||||
// TranscodingThrottle limits the number of concurrent transcoding operations.
|
||||
type TranscodingThrottle struct {
|
||||
// transcodingThrottle limits the number of concurrent transcoding operations.
|
||||
type transcodingThrottle struct {
|
||||
sem *semaphore.Weighted
|
||||
backlog atomic.Int64
|
||||
maxBacklog int64
|
||||
@ -27,14 +27,11 @@ type TranscodingThrottle struct {
|
||||
disabled bool
|
||||
}
|
||||
|
||||
// NewTranscodingThrottle creates a throttle that allows maxConcurrent simultaneous
|
||||
// transcodes with maxBacklog queued waiters and the given timeout.
|
||||
// If maxConcurrent is 0, throttling is disabled.
|
||||
func NewTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration) *TranscodingThrottle {
|
||||
func newTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration) *transcodingThrottle {
|
||||
if maxConcurrent <= 0 {
|
||||
return &TranscodingThrottle{disabled: true}
|
||||
return &transcodingThrottle{disabled: true}
|
||||
}
|
||||
return &TranscodingThrottle{
|
||||
return &transcodingThrottle{
|
||||
sem: semaphore.NewWeighted(int64(maxConcurrent)),
|
||||
maxBacklog: int64(maxBacklog),
|
||||
timeout: timeout,
|
||||
@ -42,7 +39,7 @@ func NewTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration
|
||||
}
|
||||
|
||||
// Acquire blocks until a transcoding slot is available, the backlog is full, or the timeout expires.
|
||||
func (t *TranscodingThrottle) Acquire(ctx context.Context) error {
|
||||
func (t *transcodingThrottle) Acquire(ctx context.Context) error {
|
||||
if t.disabled {
|
||||
return nil
|
||||
}
|
||||
@ -74,7 +71,7 @@ func (t *TranscodingThrottle) Acquire(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// Release frees a transcoding slot.
|
||||
func (t *TranscodingThrottle) Release() {
|
||||
func (t *transcodingThrottle) Release() {
|
||||
if t.disabled {
|
||||
return
|
||||
}
|
||||
@ -94,10 +91,10 @@ func (r *releaseOnClose) Close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// GetTranscodingThrottle returns a singleton TranscodingThrottle created from the current configuration.
|
||||
func GetTranscodingThrottle() *TranscodingThrottle {
|
||||
return singleton.GetInstance(func() *TranscodingThrottle {
|
||||
return NewTranscodingThrottle(
|
||||
// getTranscodingThrottle returns a singleton transcodingThrottle created from the current configuration.
|
||||
func getTranscodingThrottle() *transcodingThrottle {
|
||||
return singleton.GetInstance(func() *transcodingThrottle {
|
||||
return newTranscodingThrottle(
|
||||
conf.Server.MaxConcurrentTranscodes,
|
||||
conf.Server.DevTranscodeThrottleBacklogLimit,
|
||||
conf.Server.DevTranscodeThrottleBacklogTimeout,
|
||||
|
||||
@ -15,7 +15,7 @@ import (
|
||||
var _ = Describe("TranscodingThrottle", func() {
|
||||
Describe("Acquire/Release", func() {
|
||||
It("allows up to maxConcurrent acquires", func() {
|
||||
t := NewTranscodingThrottle(2, 10, time.Second)
|
||||
t := newTranscodingThrottle(2, 10, time.Second)
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
// Third should block, so test it doesn't return immediately
|
||||
@ -26,14 +26,14 @@ var _ = Describe("TranscodingThrottle", func() {
|
||||
})
|
||||
|
||||
It("releases a slot and allows new acquire", func() {
|
||||
t := NewTranscodingThrottle(1, 10, time.Second)
|
||||
t := newTranscodingThrottle(1, 10, time.Second)
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
t.Release()
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
})
|
||||
|
||||
It("returns ErrTranscodingBusy when backlog limit is reached", func() {
|
||||
t := NewTranscodingThrottle(1, 2, 5*time.Second)
|
||||
t := newTranscodingThrottle(1, 2, 5*time.Second)
|
||||
// Fill the slot
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
|
||||
@ -61,14 +61,14 @@ var _ = Describe("TranscodingThrottle", func() {
|
||||
})
|
||||
|
||||
It("returns ErrTranscodingBusy when timeout expires", func() {
|
||||
t := NewTranscodingThrottle(1, 10, 50*time.Millisecond)
|
||||
t := newTranscodingThrottle(1, 10, 50*time.Millisecond)
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
err := t.Acquire(context.Background())
|
||||
Expect(err).To(MatchError(ErrTranscodingBusy))
|
||||
})
|
||||
|
||||
It("respects context cancellation", func() {
|
||||
t := NewTranscodingThrottle(1, 10, 5*time.Second)
|
||||
t := newTranscodingThrottle(1, 10, 5*time.Second)
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
@ -77,7 +77,7 @@ var _ = Describe("TranscodingThrottle", func() {
|
||||
})
|
||||
|
||||
It("is disabled when maxConcurrent is 0", func() {
|
||||
t := NewTranscodingThrottle(0, 10, time.Second)
|
||||
t := newTranscodingThrottle(0, 10, time.Second)
|
||||
for i := 0; i < 100; i++ {
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
}
|
||||
|
||||
@ -16,7 +16,6 @@ import (
|
||||
var Set = wire.NewSet(
|
||||
stream.NewMediaStreamer,
|
||||
stream.GetTranscodingCache,
|
||||
stream.GetTranscodingThrottle,
|
||||
NewArchiver,
|
||||
NewPlayers,
|
||||
NewShare,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user