Compare commits

...

15 Commits

Author SHA1 Message Date
Deluan Quintão
93bd8bab37
Merge c1f12d148dcdda283f479f7b84ddeb749620be99 into 23f3556371321faf199866989b906f2ef06a8034 2026-04-03 11:41:02 +02:00
Deluan
23f3556371 fix(subsonic): strip OpenSubsonic extensions from playlists for legacy clients
buildOSPlaylist was the only OpenSubsonic builder function missing the
LegacyClients guard, causing attributes like `validUntil` and `readonly`
to appear in playlist XML responses for legacy clients like DSub2000.
This caused a crash when DSub2000 tried to parse evaluated smart
playlists containing the `validUntil` attribute.
2026-04-02 16:37:52 -04:00
Deluan
c60637de24 fix(subsonic): return proper artwork ID format in getInternetRadioStations
The coverArt field was returning the raw uploaded image filename instead
of the standard ra-{id} artwork ID format. This caused getCoverArt to
fail when clients passed the coverArt value directly. Now uses
CoverArtID().String() consistent with how albums, artists, and playlists
return their coverArt values. Fixes #5293.
2026-04-02 15:44:20 -04:00
Deluan
220019a9f1 fix: add missing viper defaults for mpvpath, artistimagefolder, and plugins.loglevel
Fix #5284

Several configOptions struct fields were missing corresponding
viper.SetDefault entries, making them invisible to environment variable
overrides and config file parsing. Added defaults for mpvpath (consistent
with ffmpegpath), artistimagefolder, and plugins.loglevel.

Signed-off-by: Deluan <deluan@navidrome.org>
2026-04-01 18:20:01 -04:00
Deluan
c1f12d148d test(stream): replace time.Sleep with Eventually assertion in throttle test 2026-03-25 08:35:32 -04:00
Deluan
6fe2b76ac1 feat(configuration): set maxconcurrenttranscodes to the maximum of 4 or available CPU cores
Signed-off-by: Deluan <deluan@navidrome.org>
2026-03-25 08:30:44 -04:00
Deluan
faa4acf583 refactor(stream): make TranscodingThrottle an internal component of mediaStreamer
The throttle is an implementation detail of mediaStreamer, not a
dependency that needs to be injected. Remove it from the constructor
and wire providers; create it internally via a singleton. This
simplifies the API and keeps Release() unexported.
2026-03-25 08:27:23 -04:00
Deluan
c7865ff159 fix(stream): make TranscodingThrottle a singleton and optimize hot path
- Use sync.Once to ensure a single shared throttle instance across
  all wire-injected consumers (subsonic + public routers)
- Optimize Acquire() fast path: TryAcquire before touching the atomic
  backlog counter, avoiding unnecessary atomic ops when slots are free

refactor(stream): use singleton package for TranscodingThrottle
2026-03-25 08:21:37 -04:00
Deluan
179ba1e600 test(subsonic): add handler tests for ErrTranscodingBusy in all endpoints 2026-03-25 08:08:55 -04:00
Deluan
803d96bf8c feat(subsonic): handle ErrTranscodingBusy in GetTranscodeStream with HTTP 503 2026-03-25 08:05:45 -04:00
Deluan
87e02a3c2d feat(subsonic): handle ErrTranscodingBusy in Stream and Download endpoints 2026-03-25 08:05:41 -04:00
Deluan
4d14ad812c feat(stream): integrate TranscodingThrottle into NewStream 2026-03-25 08:03:21 -04:00
Deluan
b018d7b634 feat(stream): wire TranscodingThrottle into MediaStreamer 2026-03-25 07:58:44 -04:00
Deluan
9219a8c2a4 feat(conf): add MaxConcurrentTranscodes and dev throttle config options 2026-03-25 07:58:41 -04:00
Deluan
58f7959204 feat(stream): add TranscodingThrottle with tests 2026-03-25 07:55:44 -04:00
12 changed files with 403 additions and 48 deletions

View File

@ -58,6 +58,7 @@ type configOptions struct {
PlaylistsPath string
SmartPlaylistRefreshDelay time.Duration
AutoTranscodeDownload bool
MaxConcurrentTranscodes int
DefaultDownsamplingFormat string
Search searchOptions `json:",omitzero"`
SimilarSongsMatchThreshold int
@ -128,6 +129,8 @@ type configOptions struct {
DevArtworkMaxRequests int
DevArtworkThrottleBacklogLimit int
DevArtworkThrottleBacklogTimeout time.Duration
DevTranscodeThrottleBacklogLimit int
DevTranscodeThrottleBacklogTimeout time.Duration
DevArtistInfoTimeToLive time.Duration
DevAlbumInfoTimeToLive time.Duration
DevExternalScanner bool
@ -703,6 +706,7 @@ func setViperDefaults() {
viper.SetDefault("enablem3uexternalalbumart", false)
viper.SetDefault("enablemediafilecoverart", true)
viper.SetDefault("autotranscodedownload", false)
viper.SetDefault("maxconcurrenttranscodes", max(4, runtime.NumCPU()))
viper.SetDefault("defaultdownsamplingformat", consts.DefaultDownsamplingFormat)
viper.SetDefault("search.fullstring", false)
viper.SetDefault("search.backend", "fts")
@ -712,10 +716,12 @@ func setViperDefaults() {
viper.SetDefault("ignoredarticles", "The El La Los Las Le Les Os As O A")
viper.SetDefault("indexgroups", "A B C D E F G H I J K L M N O P Q R S T U V W X-Z(XYZ) [Unknown]([)")
viper.SetDefault("ffmpegpath", "")
viper.SetDefault("mpvpath", "")
viper.SetDefault("mpvcmdtemplate", "mpv --audio-device=%d --no-audio-display %f --input-ipc-server=%s")
viper.SetDefault("coverartpriority", "cover.*, folder.*, front.*, embedded, external")
viper.SetDefault("coverartquality", 75)
viper.SetDefault("artistartpriority", "artist.*, album/artist.*, external")
viper.SetDefault("artistimagefolder", "")
viper.SetDefault("discartpriority", "disc*.*, cd*.*, cover.*, folder.*, front.*, discsubtitle, embedded")
viper.SetDefault("lyricspriority", ".lrc,.txt,embedded")
viper.SetDefault("enablegravatar", false)
@ -794,6 +800,7 @@ func setViperDefaults() {
viper.SetDefault("plugins.enabled", true)
viper.SetDefault("plugins.cachesize", "200MB")
viper.SetDefault("plugins.autoreload", false)
viper.SetDefault("plugins.loglevel", "")
// DevFlags. These are used to enable/disable debugging and incomplete features
viper.SetDefault("devlogsourceline", false)
@ -810,6 +817,8 @@ func setViperDefaults() {
viper.SetDefault("devartworkmaxrequests", max(4, runtime.NumCPU()))
viper.SetDefault("devartworkthrottlebackloglimit", consts.RequestThrottleBacklogLimit)
viper.SetDefault("devartworkthrottlebacklogtimeout", consts.RequestThrottleBacklogTimeout)
viper.SetDefault("devtranscodethrottlebackloglimit", consts.RequestThrottleBacklogLimit)
viper.SetDefault("devtranscodethrottlebacklogtimeout", consts.RequestThrottleBacklogTimeout)
viper.SetDefault("devartistinfotimetolive", consts.ArtistInfoTimeToLive)
viper.SetDefault("devalbuminfotimetolive", consts.AlbumInfoTimeToLive)
viper.SetDefault("devexternalscanner", true)

View File

@ -91,6 +91,11 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req
return s, nil
}
// Acquire throttle slot before accessing cache (which may spawn ffmpeg on miss)
if err := getTranscodingThrottle().Acquire(ctx); err != nil {
return nil, err
}
job := &streamJob{
ms: ms,
mf: mf,
@ -104,12 +109,19 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req
}
r, err := ms.cache.Get(ctx, job)
if err != nil {
getTranscodingThrottle().Release()
log.Error(ctx, "Error accessing transcoding cache", "id", mf.ID, err)
return nil, err
}
cached = r.Cached
if cached {
// Cache hit — no ffmpeg process running, release the slot immediately
getTranscodingThrottle().Release()
s.ReadCloser = r
} else {
// Cache miss — slot released when stream is closed
s.ReadCloser = &releaseOnClose{ReadCloser: r, release: getTranscodingThrottle().Release}
}
s.Seeker = r.Seeker
log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", filePath,

View File

@ -31,6 +31,7 @@ var _ = Describe("MediaStreamer", func() {
})
testCache := stream.NewTranscodingCache()
Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue())
conf.Server.MaxConcurrentTranscodes = 0 // Disable throttling for general tests
streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache)
})
AfterEach(func() {
@ -72,4 +73,20 @@ var _ = Describe("MediaStreamer", func() {
Expect(s.Seekable()).To(BeTrue())
})
})
Context("NewStream with throttle", func() {
var mf *model.MediaFile
BeforeEach(func() {
var err error
mf, err = ds.MediaFile(ctx).Get("123")
Expect(err).ToNot(HaveOccurred())
})
It("does not throttle raw/direct-play requests even when throttle is saturated", func() {
s, err := streamer.NewStream(ctx, mf, stream.Request{Format: "raw"})
Expect(err).ToNot(HaveOccurred())
Expect(s).ToNot(BeNil())
_ = s.Close()
})
})
})

103
core/stream/throttle.go Normal file
View File

@ -0,0 +1,103 @@
package stream
import (
"context"
"errors"
"io"
"sync"
"sync/atomic"
"time"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/utils/singleton"
"golang.org/x/sync/semaphore"
)
// ErrTranscodingBusy is returned when the transcoding throttle rejects a request
// because the concurrency limit and backlog are both full.
var ErrTranscodingBusy = errors.New("too many concurrent transcodes")
// transcodingThrottle limits the number of concurrent transcoding operations.
type transcodingThrottle struct {
sem *semaphore.Weighted
backlog atomic.Int64
maxBacklog int64
timeout time.Duration
disabled bool
}
func newTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration) *transcodingThrottle {
if maxConcurrent <= 0 {
return &transcodingThrottle{disabled: true}
}
return &transcodingThrottle{
sem: semaphore.NewWeighted(int64(maxConcurrent)),
maxBacklog: int64(maxBacklog),
timeout: timeout,
}
}
// Acquire blocks until a transcoding slot is available, the backlog is full, or the timeout expires.
func (t *transcodingThrottle) Acquire(ctx context.Context) error {
if t.disabled {
return nil
}
// Fast path: try to acquire without touching the backlog counter
if t.sem.TryAcquire(1) {
return nil
}
// Slow path: semaphore is full, enter backlog queue
// Increment-then-check-then-rollback to avoid TOCTOU race
current := t.backlog.Add(1)
if current > t.maxBacklog {
t.backlog.Add(-1)
log.Warn(ctx, "Transcoding request rejected, throttle backlog full", "backlog", current-1)
return ErrTranscodingBusy
}
log.Info(ctx, "Transcoding request queued, waiting for slot", "backlog", current)
ctx, cancel := context.WithTimeout(ctx, t.timeout)
defer cancel()
err := t.sem.Acquire(ctx, 1)
t.backlog.Add(-1)
if err != nil {
log.Warn(ctx, "Transcoding request rejected, timeout waiting for slot")
return ErrTranscodingBusy
}
return nil
}
// Release frees a transcoding slot.
func (t *transcodingThrottle) Release() {
if t.disabled {
return
}
t.sem.Release(1)
}
// releaseOnClose wraps a ReadCloser to call a release function exactly once on Close.
type releaseOnClose struct {
io.ReadCloser
release func()
once sync.Once
}
func (r *releaseOnClose) Close() error {
err := r.ReadCloser.Close()
r.once.Do(r.release)
return err
}
// getTranscodingThrottle returns a singleton transcodingThrottle created from the current configuration.
func getTranscodingThrottle() *transcodingThrottle {
return singleton.GetInstance(func() *transcodingThrottle {
return newTranscodingThrottle(
conf.Server.MaxConcurrentTranscodes,
conf.Server.DevTranscodeThrottleBacklogLimit,
conf.Server.DevTranscodeThrottleBacklogTimeout,
)
})
}

View File

@ -0,0 +1,109 @@
package stream
import (
"context"
"errors"
"io"
"strings"
"sync"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("TranscodingThrottle", func() {
Describe("Acquire/Release", func() {
It("allows up to maxConcurrent acquires", func() {
t := newTranscodingThrottle(2, 10, time.Second)
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
// Third should block, so test it doesn't return immediately
ctx, cancel := context.WithTimeout(GinkgoT().Context(), 50*time.Millisecond)
defer cancel()
err := t.Acquire(ctx)
Expect(err).To(MatchError(ErrTranscodingBusy))
})
It("releases a slot and allows new acquire", func() {
t := newTranscodingThrottle(1, 10, time.Second)
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
t.Release()
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
})
It("returns ErrTranscodingBusy when backlog limit is reached", func() {
t := newTranscodingThrottle(1, 2, 5*time.Second)
// Fill the slot
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
// Fill the backlog (2 waiters) — they block in goroutines
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Go(func() { _ = t.Acquire(GinkgoT().Context()) })
}
// Wait until both goroutines are in the backlog
Eventually(func() int64 { return t.backlog.Load() }).Should(BeNumerically(">=", 2))
// Third waiter should be rejected immediately (backlog full)
err := t.Acquire(GinkgoT().Context())
Expect(err).To(MatchError(ErrTranscodingBusy))
// Clean up: release all
t.Release()
t.Release()
t.Release()
wg.Wait()
})
It("returns ErrTranscodingBusy when timeout expires", func() {
t := newTranscodingThrottle(1, 10, 50*time.Millisecond)
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
Expect(t.Acquire(GinkgoT().Context())).To(MatchError(ErrTranscodingBusy))
})
It("respects context cancellation", func() {
t := newTranscodingThrottle(1, 10, 5*time.Second)
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
ctx, cancel := context.WithCancel(GinkgoT().Context())
cancel()
err := t.Acquire(ctx)
Expect(err).To(MatchError(ErrTranscodingBusy))
})
It("is disabled when maxConcurrent is 0", func() {
t := newTranscodingThrottle(0, 10, time.Second)
for i := 0; i < 100; i++ {
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
}
})
})
})
var _ = Describe("releaseOnClose", func() {
It("calls release exactly once on Close", func() {
var count int
rc := &releaseOnClose{
ReadCloser: io.NopCloser(strings.NewReader("data")),
release: func() { count++ },
}
Expect(rc.Close()).To(Succeed())
Expect(rc.Close()).To(Succeed()) // double close
Expect(count).To(Equal(1))
})
It("propagates close error from underlying ReadCloser", func() {
rc := &releaseOnClose{
ReadCloser: &failCloser{},
release: func() {},
}
err := rc.Close()
Expect(err).To(MatchError("close failed"))
})
})
// failCloser is a ReadCloser whose Close always returns an error
type failCloser struct{ io.Reader }
func (f *failCloser) Read(p []byte) (int, error) { return 0, io.EOF }
func (f *failCloser) Close() error { return errors.New("close failed") }

View File

@ -159,6 +159,10 @@ func (api *Router) buildPlaylist(ctx context.Context, p model.Playlist) response
}
func buildOSPlaylist(ctx context.Context, p model.Playlist) *responses.OpenSubsonicPlaylist {
player, ok := request.PlayerFrom(ctx)
if ok && isClientInList(conf.Server.Subsonic.LegacyClients, player.Client) {
return nil
}
pls := responses.OpenSubsonicPlaylist{}
if p.IsSmartPlaylist() {

View File

@ -128,6 +128,23 @@ var _ = Describe("buildPlaylist", func() {
})
})
Context("with legacy client", func() {
BeforeEach(func() {
conf.Server.Subsonic.LegacyClients = "legacy-client"
player := model.Player{Client: "legacy-client"}
ctx = request.WithPlayer(ctx, player)
})
It("returns all standard fields but no OpenSubsonic extensions", func() {
result := router.buildPlaylist(ctx, playlist)
Expect(result.Comment).To(Equal("Test comment"))
Expect(result.Owner).To(Equal("admin"))
Expect(result.Public).To(BeTrue())
Expect(result.OpenSubsonicPlaylist).To(BeNil())
})
})
Context("when no player in context", func() {
It("returns all fields", func() {
result := router.buildPlaylist(ctx, playlist)
@ -213,6 +230,23 @@ var _ = Describe("buildPlaylist", func() {
Expect(result.ValidUntil).To(Equal(&validUntil))
})
})
Context("with legacy client", func() {
BeforeEach(func() {
conf.Server.Subsonic.LegacyClients = "legacy-client"
player := model.Player{Client: "legacy-client"}
ctx = request.WithPlayer(ctx, player)
})
It("returns standard fields but no OpenSubsonic extensions", func() {
result := router.buildPlaylist(ctx, playlist)
Expect(result.Comment).To(Equal("Test comment"))
Expect(result.Owner).To(Equal("admin"))
Expect(result.Public).To(BeTrue())
Expect(result.OpenSubsonicPlaylist).To(BeNil())
})
})
})
})

View File

@ -75,8 +75,12 @@ func (api *Router) GetInternetRadios(r *http.Request) (*responses.Subsonic, erro
continue
}
// Add coverArt if not legacy client
var coverArt string
if g.UploadedImage != "" {
coverArt = g.CoverArtID().String()
}
res[i].OpenSubsonicRadio = &responses.OpenSubsonicRadio{
CoverArt: g.UploadedImage,
CoverArt: coverArt,
}
}

View File

@ -71,7 +71,7 @@ var _ = Describe("Radio", func() {
Expect(err).ToNot(HaveOccurred())
Expect(response.InternetRadioStations.Radios).To(HaveLen(2))
Expect(response.InternetRadioStations.Radios[0].OpenSubsonicRadio).ToNot(BeNil())
Expect(response.InternetRadioStations.Radios[0].CoverArt).To(Equal("rd-1_cover.jpg"))
Expect(response.InternetRadioStations.Radios[0].CoverArt).To(Equal("ra-rd-1_0"))
Expect(response.InternetRadioStations.Radios[1].OpenSubsonicRadio).ToNot(BeNil())
Expect(response.InternetRadioStations.Radios[1].CoverArt).To(BeEmpty())
})
@ -129,7 +129,7 @@ var _ = Describe("Radio", func() {
Expect(err).ToNot(HaveOccurred())
Expect(response.InternetRadioStations.Radios[0].OpenSubsonicRadio).ToNot(BeNil())
Expect(response.InternetRadioStations.Radios[0].CoverArt).To(Equal("rd-1_cover.jpg"))
Expect(response.InternetRadioStations.Radios[0].CoverArt).To(Equal("ra-rd-1_0"))
})
})

View File

@ -1,12 +1,14 @@
package subsonic
import (
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/core/stream"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/model/request"
@ -31,22 +33,25 @@ func (api *Router) Stream(w http.ResponseWriter, r *http.Request) (*responses.Su
}
streamReq := api.transcodeDecision.ResolveRequest(ctx, mf, format, maxBitRate, timeOffset)
stream, err := api.streamer.NewStream(ctx, mf, streamReq)
s, err := api.streamer.NewStream(ctx, mf, streamReq)
if err != nil {
if errors.Is(err, stream.ErrTranscodingBusy) {
return nil, newError(responses.ErrorGeneric, "too many concurrent transcodes, try again later")
}
return nil, err
}
// Make sure the stream will be closed at the end, to avoid leakage
defer func() {
if err := stream.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) {
log.Error("Error closing stream", "id", id, "file", stream.Name(), err)
if err := s.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) {
log.Error("Error closing stream", "id", id, "file", s.Name(), err)
}
}()
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Set("X-Content-Duration", strconv.FormatFloat(float64(stream.Duration()), 'G', -1, 32))
w.Header().Set("X-Content-Duration", strconv.FormatFloat(float64(s.Duration()), 'G', -1, 32))
_, err = stream.Serve(ctx, w, r)
_, err = s.Serve(ctx, w, r)
return nil, err
}
@ -100,22 +105,25 @@ func (api *Router) Download(w http.ResponseWriter, r *http.Request) (*responses.
switch v := entity.(type) {
case *model.MediaFile:
streamReq := api.transcodeDecision.ResolveRequest(ctx, v, format, maxBitRate, 0)
stream, err := api.streamer.NewStream(ctx, v, streamReq)
s, err := api.streamer.NewStream(ctx, v, streamReq)
if err != nil {
if errors.Is(err, stream.ErrTranscodingBusy) {
return nil, newError(responses.ErrorGeneric, "too many concurrent transcodes, try again later")
}
return nil, err
}
// Make sure the stream will be closed at the end, to avoid leakage
defer func() {
if err := stream.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) {
log.Error("Error closing stream", "id", id, "file", stream.Name(), err)
if err := s.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) {
log.Error("Error closing stream", "id", id, "file", s.Name(), err)
}
}()
disposition := fmt.Sprintf("attachment; filename=\"%s\"", stream.Name())
disposition := fmt.Sprintf("attachment; filename=\"%s\"", s.Name())
w.Header().Set("Content-Disposition", disposition)
_, err = stream.Serve(ctx, w, r)
_, err = s.Serve(ctx, w, r)
return nil, err
case *model.Album:
setHeaders(v.Name)

View File

@ -379,8 +379,13 @@ func (api *Router) GetTranscodeStream(w http.ResponseWriter, r *http.Request) (*
}
// Create stream
stream, err := api.streamer.NewStream(ctx, mf, streamReq)
s, err := api.streamer.NewStream(ctx, mf, streamReq)
if err != nil {
if errors.Is(err, stream.ErrTranscodingBusy) {
log.Warn(ctx, "Transcoding throttle full", "mediaID", mediaID)
http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
return nil, nil
}
log.Error(ctx, "Error creating stream", "mediaID", mediaID, err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return nil, nil
@ -388,14 +393,14 @@ func (api *Router) GetTranscodeStream(w http.ResponseWriter, r *http.Request) (*
// Make sure the stream will be closed at the end
defer func() {
if err := stream.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) {
log.Error("Error closing stream", "id", mediaID, "file", stream.Name(), err)
if err := s.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) {
log.Error("Error closing stream", "id", mediaID, "file", s.Name(), err)
}
}()
w.Header().Set("X-Content-Type-Options", "nosniff")
n, err := stream.Serve(ctx, w, r)
n, err := s.Serve(ctx, w, r)
if err != nil || n == 0 {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}

View File

@ -7,6 +7,8 @@ import (
"net/http"
"net/http/httptest"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/conf/configtest"
"github.com/navidrome/navidrome/core/stream"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/tests"
@ -322,6 +324,47 @@ var _ = Describe("Transcode endpoints", func() {
Expect(fakeStreamer.captured.Channels).To(Equal(2))
Expect(fakeStreamer.captured.Offset).To(Equal(10))
})
It("returns 503 when transcoding throttle is full", func() {
busyStreamer := &busyMediaStreamer{}
router = New(ds, nil, busyStreamer, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, mockTD)
mockMFRepo.SetData(model.MediaFiles{{ID: "song-1"}})
mockTD.resolvedReq = stream.Request{Format: "mp3", BitRate: 128}
r := newGetRequest("mediaId=song-1", "mediaType=song", "transcodeParams=valid-token")
resp, err := router.GetTranscodeStream(w, r)
Expect(err).ToNot(HaveOccurred())
Expect(resp).To(BeNil())
Expect(w.Code).To(Equal(http.StatusServiceUnavailable))
})
})
Describe("Stream - throttle", func() {
It("returns error when transcoding throttle is full", func() {
busyStreamer := &busyMediaStreamer{}
router = New(ds, nil, busyStreamer, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, mockTD)
mockMFRepo.SetData(model.MediaFiles{{ID: "song-1"}})
r := newGetRequest("id=song-1")
_, err := router.Stream(w, r)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("too many concurrent transcodes"))
})
})
Describe("Download - throttle", func() {
It("returns error when transcoding throttle is full", func() {
DeferCleanup(configtest.SetupConfig())
conf.Server.EnableDownloads = true
busyStreamer := &busyMediaStreamer{}
router = New(ds, nil, busyStreamer, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, mockTD)
mockMFRepo.SetData(model.MediaFiles{{ID: "song-1"}})
r := newGetRequest("id=song-1")
_, err := router.Download(w, r)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("too many concurrent transcodes"))
})
})
Describe("bpsToKbps", func() {
@ -422,3 +465,10 @@ func (f *fakeMediaStreamer) NewStream(_ context.Context, _ *model.MediaFile, req
f.captured = &req
return nil, errStreamCaptured
}
// busyMediaStreamer always returns ErrTranscodingBusy from NewStream
type busyMediaStreamer struct{}
func (b *busyMediaStreamer) NewStream(_ context.Context, _ *model.MediaFile, _ stream.Request) (*stream.Stream, error) {
return nil, stream.ErrTranscodingBusy
}