From dce7705999bcc0cd23cf6bb2416f5f7f17268de6 Mon Sep 17 00:00:00 2001 From: Deluan Date: Sun, 29 Jun 2025 10:18:05 -0400 Subject: [PATCH] feat(ui): implement new event stream connection logic Added a new event stream connection method to enhance the handling of server events. This includes a reconnect mechanism for improved reliability in case of connection errors. The configuration now allows toggling the new event stream feature via `devNewEventStream`. Additionally, tests were added to ensure the new functionality works as expected, including reconnection behavior after an error. Signed-off-by: Deluan --- conf/configuration.go | 2 ++ server/serve_index.go | 1 + server/serve_index_test.go | 1 + ui/src/config.js | 1 + ui/src/eventStream.js | 66 +++++++++++++++++++++++++++++++++++--- ui/src/eventStream.test.js | 49 ++++++++++++++++++++++++++++ 6 files changed, 116 insertions(+), 4 deletions(-) create mode 100644 ui/src/eventStream.test.js diff --git a/conf/configuration.go b/conf/configuration.go index 132c12130..7ea16bf4b 100644 --- a/conf/configuration.go +++ b/conf/configuration.go @@ -116,6 +116,7 @@ type configOptions struct { DevSidebarPlaylists bool DevShowArtistPage bool DevUIShowConfig bool + DevNewEventStream bool DevOffsetOptimize int DevArtworkMaxRequests int DevArtworkThrottleBacklogLimit int @@ -586,6 +587,7 @@ func setViperDefaults() { viper.SetDefault("devsidebarplaylists", true) viper.SetDefault("devshowartistpage", true) viper.SetDefault("devuishowconfig", true) + viper.SetDefault("devneweventstream", true) viper.SetDefault("devoffsetoptimize", 50000) viper.SetDefault("devartworkmaxrequests", max(2, runtime.NumCPU()/3)) viper.SetDefault("devartworkthrottlebackloglimit", consts.RequestThrottleBacklogLimit) diff --git a/server/serve_index.go b/server/serve_index.go index 19ecc7b35..38e646982 100644 --- a/server/serve_index.go +++ b/server/serve_index.go @@ -67,6 +67,7 @@ func serveIndex(ds model.DataStore, fs fs.FS, shareInfo *model.Share) http.Handl "lastFMEnabled": conf.Server.LastFM.Enabled, "devShowArtistPage": conf.Server.DevShowArtistPage, "devUIShowConfig": conf.Server.DevUIShowConfig, + "devNewEventStream": conf.Server.DevNewEventStream, "listenBrainzEnabled": conf.Server.ListenBrainz.Enabled, "enableExternalServices": conf.Server.EnableExternalServices, "enableReplayGain": conf.Server.EnableReplayGain, diff --git a/server/serve_index_test.go b/server/serve_index_test.go index b8addf9d1..4f179f22a 100644 --- a/server/serve_index_test.go +++ b/server/serve_index_test.go @@ -102,6 +102,7 @@ var _ = Describe("serveIndex", func() { Entry("defaultDownsamplingFormat", func() { conf.Server.DefaultDownsamplingFormat = "mp3" }, "defaultDownsamplingFormat", "mp3"), Entry("enableUserEditing", func() { conf.Server.EnableUserEditing = false }, "enableUserEditing", false), Entry("enableSharing", func() { conf.Server.EnableSharing = true }, "enableSharing", true), + Entry("devNewEventStream", func() { conf.Server.DevNewEventStream = true }, "devNewEventStream", true), ) DescribeTable("sets other UI configuration values", diff --git a/ui/src/config.js b/ui/src/config.js index c94a6ffb9..a53a97de7 100644 --- a/ui/src/config.js +++ b/ui/src/config.js @@ -32,6 +32,7 @@ const defaultConfig = { enableNowPlaying: true, devShowArtistPage: true, devUIShowConfig: true, + devNewEventStream: false, enableReplayGain: true, defaultDownsamplingFormat: 'opus', publicBaseUrl: '/share', diff --git a/ui/src/eventStream.js b/ui/src/eventStream.js index 7ab91056e..3d8ddcd1e 100644 --- a/ui/src/eventStream.js +++ b/ui/src/eventStream.js @@ -12,6 +12,49 @@ const newEventStream = async () => { return new EventSource(url) } +let eventStream +let reconnectTimer +const RECONNECT_DELAY = 5000 + +const setupHandlers = (stream, dispatchFn) => { + stream.addEventListener('serverStart', eventHandler(dispatchFn)) + stream.addEventListener('scanStatus', throttledEventHandler(dispatchFn)) + stream.addEventListener('refreshResource', eventHandler(dispatchFn)) + if (config.enableNowPlaying) { + stream.addEventListener('nowPlayingCount', eventHandler(dispatchFn)) + } + stream.addEventListener('keepAlive', eventHandler(dispatchFn)) + stream.onerror = (e) => { + // eslint-disable-next-line no-console + console.log('EventStream error', e) + dispatchFn(serverDown()) + if (stream) stream.close() + scheduleReconnect(dispatchFn) + } +} + +const scheduleReconnect = (dispatchFn) => { + if (!reconnectTimer) { + reconnectTimer = setTimeout(() => { + reconnectTimer = null + connect(dispatchFn) + }, RECONNECT_DELAY) + } +} + +const connect = async (dispatchFn) => { + try { + const stream = await newEventStream() + eventStream = stream + setupHandlers(stream, dispatchFn) + return stream + } catch (e) { + // eslint-disable-next-line no-console + console.log(`Error connecting to server:`, e) + scheduleReconnect(dispatchFn) + } +} + const eventHandler = (dispatchFn) => (event) => { const data = JSON.parse(event.data) if (event.type !== 'keepAlive') { @@ -22,10 +65,7 @@ const eventHandler = (dispatchFn) => (event) => { const throttledEventHandler = (dispatchFn) => throttle(eventHandler(dispatchFn), 100, { trailing: true }) -const startEventStream = async (dispatchFn) => { - if (!localStorage.getItem('is-authenticated')) { - return Promise.resolve() - } +const startEventStreamLegacy = async (dispatchFn) => { return newEventStream() .then((newStream) => { newStream.addEventListener('serverStart', eventHandler(dispatchFn)) @@ -51,4 +91,22 @@ const startEventStream = async (dispatchFn) => { }) } +const startEventStreamNew = async (dispatchFn) => { + if (eventStream) { + eventStream.close() + eventStream = null + } + return connect(dispatchFn) +} + +const startEventStream = async (dispatchFn) => { + if (!localStorage.getItem('is-authenticated')) { + return Promise.resolve() + } + if (config.devNewEventStream) { + return startEventStreamNew(dispatchFn) + } + return startEventStreamLegacy(dispatchFn) +} + export { startEventStream } diff --git a/ui/src/eventStream.test.js b/ui/src/eventStream.test.js new file mode 100644 index 000000000..77d061c19 --- /dev/null +++ b/ui/src/eventStream.test.js @@ -0,0 +1,49 @@ +import { describe, it, beforeEach, vi, expect } from 'vitest' +import { startEventStream } from './eventStream' +import { serverDown } from './actions' +import config from './config' + +class MockEventSource { + constructor(url) { + this.url = url + this.readyState = 1 + this.listeners = {} + this.onerror = null + } + addEventListener(type, handler) { + this.listeners[type] = handler + } + close() { + this.readyState = 2 + } +} + +describe('startEventStream', () => { + vi.useFakeTimers() + let dispatch + let instance + + beforeEach(() => { + dispatch = vi.fn() + global.EventSource = vi.fn((url) => { + instance = new MockEventSource(url) + return instance + }) + localStorage.setItem('is-authenticated', 'true') + localStorage.setItem('token', 'abc') + config.devNewEventStream = true + }) + + afterEach(() => { + config.devNewEventStream = false + }) + + it('reconnects after an error', async () => { + await startEventStream(dispatch) + expect(global.EventSource).toHaveBeenCalledTimes(1) + instance.onerror(new Event('error')) + expect(dispatch).toHaveBeenCalledWith(serverDown()) + vi.advanceTimersByTime(5000) + expect(global.EventSource).toHaveBeenCalledTimes(2) + }) +})