Merge c1f12d148dcdda283f479f7b84ddeb749620be99 into 6109bf519228b6cd6ae6b6a8b06a6a8c4e7a6cae

This commit is contained in:
Deluan Quintão 2026-04-01 16:11:35 +02:00 committed by GitHub
commit cf4e57b2c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 355 additions and 45 deletions

View File

@ -58,6 +58,7 @@ type configOptions struct {
PlaylistsPath string PlaylistsPath string
SmartPlaylistRefreshDelay time.Duration SmartPlaylistRefreshDelay time.Duration
AutoTranscodeDownload bool AutoTranscodeDownload bool
MaxConcurrentTranscodes int
DefaultDownsamplingFormat string DefaultDownsamplingFormat string
Search searchOptions `json:",omitzero"` Search searchOptions `json:",omitzero"`
SimilarSongsMatchThreshold int SimilarSongsMatchThreshold int
@ -128,6 +129,8 @@ type configOptions struct {
DevArtworkMaxRequests int DevArtworkMaxRequests int
DevArtworkThrottleBacklogLimit int DevArtworkThrottleBacklogLimit int
DevArtworkThrottleBacklogTimeout time.Duration DevArtworkThrottleBacklogTimeout time.Duration
DevTranscodeThrottleBacklogLimit int
DevTranscodeThrottleBacklogTimeout time.Duration
DevArtistInfoTimeToLive time.Duration DevArtistInfoTimeToLive time.Duration
DevAlbumInfoTimeToLive time.Duration DevAlbumInfoTimeToLive time.Duration
DevExternalScanner bool DevExternalScanner bool
@ -703,6 +706,7 @@ func setViperDefaults() {
viper.SetDefault("enablem3uexternalalbumart", false) viper.SetDefault("enablem3uexternalalbumart", false)
viper.SetDefault("enablemediafilecoverart", true) viper.SetDefault("enablemediafilecoverart", true)
viper.SetDefault("autotranscodedownload", false) viper.SetDefault("autotranscodedownload", false)
viper.SetDefault("maxconcurrenttranscodes", max(4, runtime.NumCPU()))
viper.SetDefault("defaultdownsamplingformat", consts.DefaultDownsamplingFormat) viper.SetDefault("defaultdownsamplingformat", consts.DefaultDownsamplingFormat)
viper.SetDefault("search.fullstring", false) viper.SetDefault("search.fullstring", false)
viper.SetDefault("search.backend", "fts") viper.SetDefault("search.backend", "fts")
@ -810,6 +814,8 @@ func setViperDefaults() {
viper.SetDefault("devartworkmaxrequests", max(4, runtime.NumCPU())) viper.SetDefault("devartworkmaxrequests", max(4, runtime.NumCPU()))
viper.SetDefault("devartworkthrottlebackloglimit", consts.RequestThrottleBacklogLimit) viper.SetDefault("devartworkthrottlebackloglimit", consts.RequestThrottleBacklogLimit)
viper.SetDefault("devartworkthrottlebacklogtimeout", consts.RequestThrottleBacklogTimeout) viper.SetDefault("devartworkthrottlebacklogtimeout", consts.RequestThrottleBacklogTimeout)
viper.SetDefault("devtranscodethrottlebackloglimit", consts.RequestThrottleBacklogLimit)
viper.SetDefault("devtranscodethrottlebacklogtimeout", consts.RequestThrottleBacklogTimeout)
viper.SetDefault("devartistinfotimetolive", consts.ArtistInfoTimeToLive) viper.SetDefault("devartistinfotimetolive", consts.ArtistInfoTimeToLive)
viper.SetDefault("devalbuminfotimetolive", consts.AlbumInfoTimeToLive) viper.SetDefault("devalbuminfotimetolive", consts.AlbumInfoTimeToLive)
viper.SetDefault("devexternalscanner", true) viper.SetDefault("devexternalscanner", true)

View File

@ -91,6 +91,11 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req
return s, nil return s, nil
} }
// Acquire throttle slot before accessing cache (which may spawn ffmpeg on miss)
if err := getTranscodingThrottle().Acquire(ctx); err != nil {
return nil, err
}
job := &streamJob{ job := &streamJob{
ms: ms, ms: ms,
mf: mf, mf: mf,
@ -104,12 +109,19 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req
} }
r, err := ms.cache.Get(ctx, job) r, err := ms.cache.Get(ctx, job)
if err != nil { if err != nil {
getTranscodingThrottle().Release()
log.Error(ctx, "Error accessing transcoding cache", "id", mf.ID, err) log.Error(ctx, "Error accessing transcoding cache", "id", mf.ID, err)
return nil, err return nil, err
} }
cached = r.Cached cached = r.Cached
if cached {
// Cache hit — no ffmpeg process running, release the slot immediately
getTranscodingThrottle().Release()
s.ReadCloser = r s.ReadCloser = r
} else {
// Cache miss — slot released when stream is closed
s.ReadCloser = &releaseOnClose{ReadCloser: r, release: getTranscodingThrottle().Release}
}
s.Seeker = r.Seeker s.Seeker = r.Seeker
log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", filePath, log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", filePath,

View File

@ -31,6 +31,7 @@ var _ = Describe("MediaStreamer", func() {
}) })
testCache := stream.NewTranscodingCache() testCache := stream.NewTranscodingCache()
Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue()) Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue())
conf.Server.MaxConcurrentTranscodes = 0 // Disable throttling for general tests
streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache) streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache)
}) })
AfterEach(func() { AfterEach(func() {
@ -72,4 +73,20 @@ var _ = Describe("MediaStreamer", func() {
Expect(s.Seekable()).To(BeTrue()) Expect(s.Seekable()).To(BeTrue())
}) })
}) })
Context("NewStream with throttle", func() {
var mf *model.MediaFile
BeforeEach(func() {
var err error
mf, err = ds.MediaFile(ctx).Get("123")
Expect(err).ToNot(HaveOccurred())
})
It("does not throttle raw/direct-play requests even when throttle is saturated", func() {
s, err := streamer.NewStream(ctx, mf, stream.Request{Format: "raw"})
Expect(err).ToNot(HaveOccurred())
Expect(s).ToNot(BeNil())
_ = s.Close()
})
})
}) })

103
core/stream/throttle.go Normal file
View File

@ -0,0 +1,103 @@
package stream
import (
"context"
"errors"
"io"
"sync"
"sync/atomic"
"time"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/utils/singleton"
"golang.org/x/sync/semaphore"
)
// ErrTranscodingBusy is returned when the transcoding throttle rejects a request
// because the concurrency limit and backlog are both full.
var ErrTranscodingBusy = errors.New("too many concurrent transcodes")
// transcodingThrottle limits the number of concurrent transcoding operations.
type transcodingThrottle struct {
sem *semaphore.Weighted
backlog atomic.Int64
maxBacklog int64
timeout time.Duration
disabled bool
}
func newTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration) *transcodingThrottle {
if maxConcurrent <= 0 {
return &transcodingThrottle{disabled: true}
}
return &transcodingThrottle{
sem: semaphore.NewWeighted(int64(maxConcurrent)),
maxBacklog: int64(maxBacklog),
timeout: timeout,
}
}
// Acquire blocks until a transcoding slot is available, the backlog is full, or the timeout expires.
func (t *transcodingThrottle) Acquire(ctx context.Context) error {
if t.disabled {
return nil
}
// Fast path: try to acquire without touching the backlog counter
if t.sem.TryAcquire(1) {
return nil
}
// Slow path: semaphore is full, enter backlog queue
// Increment-then-check-then-rollback to avoid TOCTOU race
current := t.backlog.Add(1)
if current > t.maxBacklog {
t.backlog.Add(-1)
log.Warn(ctx, "Transcoding request rejected, throttle backlog full", "backlog", current-1)
return ErrTranscodingBusy
}
log.Info(ctx, "Transcoding request queued, waiting for slot", "backlog", current)
ctx, cancel := context.WithTimeout(ctx, t.timeout)
defer cancel()
err := t.sem.Acquire(ctx, 1)
t.backlog.Add(-1)
if err != nil {
log.Warn(ctx, "Transcoding request rejected, timeout waiting for slot")
return ErrTranscodingBusy
}
return nil
}
// Release frees a transcoding slot.
func (t *transcodingThrottle) Release() {
if t.disabled {
return
}
t.sem.Release(1)
}
// releaseOnClose wraps a ReadCloser to call a release function exactly once on Close.
type releaseOnClose struct {
io.ReadCloser
release func()
once sync.Once
}
func (r *releaseOnClose) Close() error {
err := r.ReadCloser.Close()
r.once.Do(r.release)
return err
}
// getTranscodingThrottle returns a singleton transcodingThrottle created from the current configuration.
func getTranscodingThrottle() *transcodingThrottle {
return singleton.GetInstance(func() *transcodingThrottle {
return newTranscodingThrottle(
conf.Server.MaxConcurrentTranscodes,
conf.Server.DevTranscodeThrottleBacklogLimit,
conf.Server.DevTranscodeThrottleBacklogTimeout,
)
})
}

View File

@ -0,0 +1,109 @@
package stream
import (
"context"
"errors"
"io"
"strings"
"sync"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("TranscodingThrottle", func() {
Describe("Acquire/Release", func() {
It("allows up to maxConcurrent acquires", func() {
t := newTranscodingThrottle(2, 10, time.Second)
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
// Third should block, so test it doesn't return immediately
ctx, cancel := context.WithTimeout(GinkgoT().Context(), 50*time.Millisecond)
defer cancel()
err := t.Acquire(ctx)
Expect(err).To(MatchError(ErrTranscodingBusy))
})
It("releases a slot and allows new acquire", func() {
t := newTranscodingThrottle(1, 10, time.Second)
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
t.Release()
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
})
It("returns ErrTranscodingBusy when backlog limit is reached", func() {
t := newTranscodingThrottle(1, 2, 5*time.Second)
// Fill the slot
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
// Fill the backlog (2 waiters) — they block in goroutines
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Go(func() { _ = t.Acquire(GinkgoT().Context()) })
}
// Wait until both goroutines are in the backlog
Eventually(func() int64 { return t.backlog.Load() }).Should(BeNumerically(">=", 2))
// Third waiter should be rejected immediately (backlog full)
err := t.Acquire(GinkgoT().Context())
Expect(err).To(MatchError(ErrTranscodingBusy))
// Clean up: release all
t.Release()
t.Release()
t.Release()
wg.Wait()
})
It("returns ErrTranscodingBusy when timeout expires", func() {
t := newTranscodingThrottle(1, 10, 50*time.Millisecond)
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
Expect(t.Acquire(GinkgoT().Context())).To(MatchError(ErrTranscodingBusy))
})
It("respects context cancellation", func() {
t := newTranscodingThrottle(1, 10, 5*time.Second)
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
ctx, cancel := context.WithCancel(GinkgoT().Context())
cancel()
err := t.Acquire(ctx)
Expect(err).To(MatchError(ErrTranscodingBusy))
})
It("is disabled when maxConcurrent is 0", func() {
t := newTranscodingThrottle(0, 10, time.Second)
for i := 0; i < 100; i++ {
Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
}
})
})
})
var _ = Describe("releaseOnClose", func() {
It("calls release exactly once on Close", func() {
var count int
rc := &releaseOnClose{
ReadCloser: io.NopCloser(strings.NewReader("data")),
release: func() { count++ },
}
Expect(rc.Close()).To(Succeed())
Expect(rc.Close()).To(Succeed()) // double close
Expect(count).To(Equal(1))
})
It("propagates close error from underlying ReadCloser", func() {
rc := &releaseOnClose{
ReadCloser: &failCloser{},
release: func() {},
}
err := rc.Close()
Expect(err).To(MatchError("close failed"))
})
})
// failCloser is a ReadCloser whose Close always returns an error
type failCloser struct{ io.Reader }
func (f *failCloser) Read(p []byte) (int, error) { return 0, io.EOF }
func (f *failCloser) Close() error { return errors.New("close failed") }

View File

@ -1,12 +1,14 @@
package subsonic package subsonic
import ( import (
"errors"
"fmt" "fmt"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/core/stream"
"github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model" "github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/model/request" "github.com/navidrome/navidrome/model/request"
@ -31,22 +33,25 @@ func (api *Router) Stream(w http.ResponseWriter, r *http.Request) (*responses.Su
} }
streamReq := api.transcodeDecision.ResolveRequest(ctx, mf, format, maxBitRate, timeOffset) streamReq := api.transcodeDecision.ResolveRequest(ctx, mf, format, maxBitRate, timeOffset)
stream, err := api.streamer.NewStream(ctx, mf, streamReq) s, err := api.streamer.NewStream(ctx, mf, streamReq)
if err != nil { if err != nil {
if errors.Is(err, stream.ErrTranscodingBusy) {
return nil, newError(responses.ErrorGeneric, "too many concurrent transcodes, try again later")
}
return nil, err return nil, err
} }
// Make sure the stream will be closed at the end, to avoid leakage // Make sure the stream will be closed at the end, to avoid leakage
defer func() { defer func() {
if err := stream.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { if err := s.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) {
log.Error("Error closing stream", "id", id, "file", stream.Name(), err) log.Error("Error closing stream", "id", id, "file", s.Name(), err)
} }
}() }()
w.Header().Set("X-Content-Type-Options", "nosniff") w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Set("X-Content-Duration", strconv.FormatFloat(float64(stream.Duration()), 'G', -1, 32)) w.Header().Set("X-Content-Duration", strconv.FormatFloat(float64(s.Duration()), 'G', -1, 32))
_, err = stream.Serve(ctx, w, r) _, err = s.Serve(ctx, w, r)
return nil, err return nil, err
} }
@ -100,22 +105,25 @@ func (api *Router) Download(w http.ResponseWriter, r *http.Request) (*responses.
switch v := entity.(type) { switch v := entity.(type) {
case *model.MediaFile: case *model.MediaFile:
streamReq := api.transcodeDecision.ResolveRequest(ctx, v, format, maxBitRate, 0) streamReq := api.transcodeDecision.ResolveRequest(ctx, v, format, maxBitRate, 0)
stream, err := api.streamer.NewStream(ctx, v, streamReq) s, err := api.streamer.NewStream(ctx, v, streamReq)
if err != nil { if err != nil {
if errors.Is(err, stream.ErrTranscodingBusy) {
return nil, newError(responses.ErrorGeneric, "too many concurrent transcodes, try again later")
}
return nil, err return nil, err
} }
// Make sure the stream will be closed at the end, to avoid leakage // Make sure the stream will be closed at the end, to avoid leakage
defer func() { defer func() {
if err := stream.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { if err := s.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) {
log.Error("Error closing stream", "id", id, "file", stream.Name(), err) log.Error("Error closing stream", "id", id, "file", s.Name(), err)
} }
}() }()
disposition := fmt.Sprintf("attachment; filename=\"%s\"", stream.Name()) disposition := fmt.Sprintf("attachment; filename=\"%s\"", s.Name())
w.Header().Set("Content-Disposition", disposition) w.Header().Set("Content-Disposition", disposition)
_, err = stream.Serve(ctx, w, r) _, err = s.Serve(ctx, w, r)
return nil, err return nil, err
case *model.Album: case *model.Album:
setHeaders(v.Name) setHeaders(v.Name)

View File

@ -379,8 +379,13 @@ func (api *Router) GetTranscodeStream(w http.ResponseWriter, r *http.Request) (*
} }
// Create stream // Create stream
stream, err := api.streamer.NewStream(ctx, mf, streamReq) s, err := api.streamer.NewStream(ctx, mf, streamReq)
if err != nil { if err != nil {
if errors.Is(err, stream.ErrTranscodingBusy) {
log.Warn(ctx, "Transcoding throttle full", "mediaID", mediaID)
http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
return nil, nil
}
log.Error(ctx, "Error creating stream", "mediaID", mediaID, err) log.Error(ctx, "Error creating stream", "mediaID", mediaID, err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError) http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return nil, nil return nil, nil
@ -388,14 +393,14 @@ func (api *Router) GetTranscodeStream(w http.ResponseWriter, r *http.Request) (*
// Make sure the stream will be closed at the end // Make sure the stream will be closed at the end
defer func() { defer func() {
if err := stream.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { if err := s.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) {
log.Error("Error closing stream", "id", mediaID, "file", stream.Name(), err) log.Error("Error closing stream", "id", mediaID, "file", s.Name(), err)
} }
}() }()
w.Header().Set("X-Content-Type-Options", "nosniff") w.Header().Set("X-Content-Type-Options", "nosniff")
n, err := stream.Serve(ctx, w, r) n, err := s.Serve(ctx, w, r)
if err != nil || n == 0 { if err != nil || n == 0 {
http.Error(w, "Internal Server Error", http.StatusInternalServerError) http.Error(w, "Internal Server Error", http.StatusInternalServerError)
} }

View File

@ -7,6 +7,8 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/conf/configtest"
"github.com/navidrome/navidrome/core/stream" "github.com/navidrome/navidrome/core/stream"
"github.com/navidrome/navidrome/model" "github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/tests" "github.com/navidrome/navidrome/tests"
@ -322,6 +324,47 @@ var _ = Describe("Transcode endpoints", func() {
Expect(fakeStreamer.captured.Channels).To(Equal(2)) Expect(fakeStreamer.captured.Channels).To(Equal(2))
Expect(fakeStreamer.captured.Offset).To(Equal(10)) Expect(fakeStreamer.captured.Offset).To(Equal(10))
}) })
It("returns 503 when transcoding throttle is full", func() {
busyStreamer := &busyMediaStreamer{}
router = New(ds, nil, busyStreamer, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, mockTD)
mockMFRepo.SetData(model.MediaFiles{{ID: "song-1"}})
mockTD.resolvedReq = stream.Request{Format: "mp3", BitRate: 128}
r := newGetRequest("mediaId=song-1", "mediaType=song", "transcodeParams=valid-token")
resp, err := router.GetTranscodeStream(w, r)
Expect(err).ToNot(HaveOccurred())
Expect(resp).To(BeNil())
Expect(w.Code).To(Equal(http.StatusServiceUnavailable))
})
})
Describe("Stream - throttle", func() {
It("returns error when transcoding throttle is full", func() {
busyStreamer := &busyMediaStreamer{}
router = New(ds, nil, busyStreamer, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, mockTD)
mockMFRepo.SetData(model.MediaFiles{{ID: "song-1"}})
r := newGetRequest("id=song-1")
_, err := router.Stream(w, r)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("too many concurrent transcodes"))
})
})
Describe("Download - throttle", func() {
It("returns error when transcoding throttle is full", func() {
DeferCleanup(configtest.SetupConfig())
conf.Server.EnableDownloads = true
busyStreamer := &busyMediaStreamer{}
router = New(ds, nil, busyStreamer, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, mockTD)
mockMFRepo.SetData(model.MediaFiles{{ID: "song-1"}})
r := newGetRequest("id=song-1")
_, err := router.Download(w, r)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("too many concurrent transcodes"))
})
}) })
Describe("bpsToKbps", func() { Describe("bpsToKbps", func() {
@ -422,3 +465,10 @@ func (f *fakeMediaStreamer) NewStream(_ context.Context, _ *model.MediaFile, req
f.captured = &req f.captured = &req
return nil, errStreamCaptured return nil, errStreamCaptured
} }
// busyMediaStreamer always returns ErrTranscodingBusy from NewStream
type busyMediaStreamer struct{}
func (b *busyMediaStreamer) NewStream(_ context.Context, _ *model.MediaFile, _ stream.Request) (*stream.Stream, error) {
return nil, stream.ErrTranscodingBusy
}