feat(stream): wire TranscodingThrottle into MediaStreamer

This commit is contained in:
Deluan 2026-03-25 07:58:44 -04:00
parent 9219a8c2a4
commit b018d7b634
5 changed files with 20 additions and 5 deletions

View File

@ -96,7 +96,8 @@ func CreateSubsonicAPIRouter(ctx context.Context) *subsonic.Router {
provider := external.NewProvider(dataStore, agentsAgents) provider := external.NewProvider(dataStore, agentsAgents)
artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider) artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider)
transcodingCache := stream.GetTranscodingCache() transcodingCache := stream.GetTranscodingCache()
mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache) transcodingThrottle := stream.GetTranscodingThrottle()
mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache, transcodingThrottle)
share := core.NewShare(dataStore) share := core.NewShare(dataStore)
archiver := core.NewArchiver(mediaStreamer, dataStore, share) archiver := core.NewArchiver(mediaStreamer, dataStore, share)
players := core.NewPlayers(dataStore) players := core.NewPlayers(dataStore)
@ -124,7 +125,8 @@ func CreatePublicRouter() *public.Router {
provider := external.NewProvider(dataStore, agentsAgents) provider := external.NewProvider(dataStore, agentsAgents)
artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider) artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider)
transcodingCache := stream.GetTranscodingCache() transcodingCache := stream.GetTranscodingCache()
mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache) transcodingThrottle := stream.GetTranscodingThrottle()
mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache, transcodingThrottle)
share := core.NewShare(dataStore) share := core.NewShare(dataStore)
archiver := core.NewArchiver(mediaStreamer, dataStore, share) archiver := core.NewArchiver(mediaStreamer, dataStore, share)
router := public.New(dataStore, artworkArtwork, mediaStreamer, share, archiver) router := public.New(dataStore, artworkArtwork, mediaStreamer, share, archiver)

View File

@ -27,14 +27,15 @@ type MediaStreamer interface {
type TranscodingCache cache.FileCache type TranscodingCache cache.FileCache
func NewMediaStreamer(ds model.DataStore, t ffmpeg.FFmpeg, cache TranscodingCache) MediaStreamer { func NewMediaStreamer(ds model.DataStore, t ffmpeg.FFmpeg, cache TranscodingCache, throttle *TranscodingThrottle) MediaStreamer {
return &mediaStreamer{ds: ds, transcoder: t, cache: cache} return &mediaStreamer{ds: ds, transcoder: t, cache: cache, throttle: throttle}
} }
type mediaStreamer struct { type mediaStreamer struct {
ds model.DataStore ds model.DataStore
transcoder ffmpeg.FFmpeg transcoder ffmpeg.FFmpeg
cache cache.FileCache cache cache.FileCache
throttle *TranscodingThrottle
} }
type streamJob struct { type streamJob struct {

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"io" "io"
"os" "os"
"time"
"github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/conf/configtest" "github.com/navidrome/navidrome/conf/configtest"
@ -31,7 +32,7 @@ var _ = Describe("MediaStreamer", func() {
}) })
testCache := stream.NewTranscodingCache() testCache := stream.NewTranscodingCache()
Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue()) Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue())
streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache) streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache, stream.NewTranscodingThrottle(0, 100, time.Minute))
}) })
AfterEach(func() { AfterEach(func() {
_ = os.RemoveAll(conf.Server.CacheFolder) _ = os.RemoveAll(conf.Server.CacheFolder)

View File

@ -8,6 +8,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/log"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
) )
@ -90,3 +91,12 @@ func (r *releaseOnClose) Close() error {
r.once.Do(r.release) r.once.Do(r.release)
return err return err
} }
// GetTranscodingThrottle creates a TranscodingThrottle from the current configuration.
func GetTranscodingThrottle() *TranscodingThrottle {
return NewTranscodingThrottle(
conf.Server.MaxConcurrentTranscodes,
conf.Server.DevTranscodeThrottleBacklogLimit,
conf.Server.DevTranscodeThrottleBacklogTimeout,
)
}

View File

@ -16,6 +16,7 @@ import (
var Set = wire.NewSet( var Set = wire.NewSet(
stream.NewMediaStreamer, stream.NewMediaStreamer,
stream.GetTranscodingCache, stream.GetTranscodingCache,
stream.GetTranscodingThrottle,
NewArchiver, NewArchiver,
NewPlayers, NewPlayers,
NewShare, NewShare,