From 0c2ca2a5e48ff2b458b15940da67632f7c38d3dc Mon Sep 17 00:00:00 2001 From: Deluan Date: Thu, 1 Jul 2021 10:58:41 -0400 Subject: [PATCH] Assign event ids in the main loop, to avoid out-of-order events --- server/events/sse.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/server/events/sse.go b/server/events/sse.go index 30c5d404d..96429ad67 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -95,7 +95,6 @@ func (b *broker) SendMessage(ctx context.Context, evt Event) { func (b *broker) prepareMessage(ctx context.Context, event Event) message { msg := message{} - msg.id = atomic.AddUint32(&eventId, 1) msg.data = event.Data(event) msg.event = event.Name(event) msg.senderCtx = ctx @@ -212,8 +211,9 @@ func (b *broker) listen() { log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String()) // Send a serverStart event to new client - c.diode.put(b.prepareMessage(context.Background(), - &ServerStart{StartTime: consts.ServerStart, Version: consts.Version()})) + msg := b.prepareMessage(context.Background(), + &ServerStart{StartTime: consts.ServerStart, Version: consts.Version()}) + c.diode.put(msg) case c := <-b.unsubscribing: // A client has detached and we want to @@ -221,14 +221,15 @@ func (b *broker) listen() { delete(clients, c) log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String()) - case event := <-b.publish: - log.Trace("Got new published event", "event", event) + case msg := <-b.publish: + msg.id = atomic.AddUint32(&eventId, 1) + log.Trace("Got new published event", "event", msg) // We got a new event from the outside! // Send event to all connected clients for c := range clients { - if b.shouldSend(event, c) { - log.Trace("Putting event on client's queue", "client", c.String(), "event", event) - c.diode.put(event) + if b.shouldSend(msg, c) { + log.Trace("Putting event on client's queue", "client", c.String(), "event", msg) + c.diode.put(msg) } }