Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add helper class to watch for beatmap metadata changes #37

Merged
merged 8 commits into from
Mar 17, 2025

Conversation

peppy
Copy link
Member

@peppy peppy commented Mar 13, 2025

No description provided.

@peppy peppy force-pushed the beatmap-metadata-watcher branch 2 times, most recently from 21805ea to 525fb9a Compare March 13, 2025 07:47
@peppy peppy force-pushed the beatmap-metadata-watcher branch from 525fb9a to 1149f5d Compare March 13, 2025 07:47
@peppy peppy force-pushed the beatmap-metadata-watcher branch from 1149f5d to 71c9f95 Compare March 13, 2025 07:51
@peppy
Copy link
Member Author

peppy commented Mar 13, 2025

Sample implementation in osu-server-spectator:

diff --git a/osu.Server.Spectator/Database/DatabaseAccess.cs b/osu.Server.Spectator/Database/DatabaseAccess.cs
index ec574fc..161315d 100644
--- a/osu.Server.Spectator/Database/DatabaseAccess.cs
+++ b/osu.Server.Spectator/Database/DatabaseAccess.cs
@@ -1,7 +1,6 @@
 // Copyright (c) ppy Pty Ltd <[email protected]>. Licensed under the MIT Licence.
 // See the LICENCE file in the repository root for full licence text.
 
-using System;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Linq;
@@ -10,7 +9,6 @@ using Dapper;
 using Microsoft.Extensions.Logging;
 using Microsoft.IdentityModel.JsonWebTokens;
 using MySqlConnector;
-using osu.Game.Online.Metadata;
 using osu.Game.Online.Multiplayer;
 using osu.Game.Scoring;
 using osu.Server.Spectator.Database.Models;
@@ -327,26 +325,6 @@ namespace osu.Server.Spectator.Database
             return (await connection.QueryAsync<multiplayer_playlist_item>("SELECT * FROM multiplayer_playlist_items WHERE room_id = @RoomId", new { RoomId = roomId })).ToArray();
         }
 
-        public async Task<BeatmapUpdates> GetUpdatedBeatmapSets(int? lastQueueId, int limit = 50)
-        {
-            var connection = await getConnectionAsync();
-
-            if (lastQueueId.HasValue)
-            {
-                var items = (await connection.QueryAsync<bss_process_queue_item>("SELECT * FROM bss_process_queue WHERE queue_id > @lastQueueId LIMIT @limit", new
-                {
-                    lastQueueId,
-                    limit
-                })).ToArray();
-
-                return new BeatmapUpdates(items.Select(i => i.beatmapset_id).ToArray(), items.LastOrDefault()?.queue_id ?? lastQueueId.Value);
-            }
-
-            var lastEntry = await connection.QueryFirstOrDefaultAsync<bss_process_queue_item>("SELECT * FROM bss_process_queue ORDER BY queue_id DESC LIMIT 1");
-
-            return new BeatmapUpdates(Array.Empty<int>(), lastEntry?.queue_id ?? 0);
-        }
-
         public async Task MarkScoreHasReplay(Score score)
         {
             var connection = await getConnectionAsync();
diff --git a/osu.Server.Spectator/Database/IDatabaseAccess.cs b/osu.Server.Spectator/Database/IDatabaseAccess.cs
index c7681d2..fd0272c 100644
--- a/osu.Server.Spectator/Database/IDatabaseAccess.cs
+++ b/osu.Server.Spectator/Database/IDatabaseAccess.cs
@@ -5,7 +5,6 @@ using System;
 using System.Collections.Generic;
 using System.Threading.Tasks;
 using Microsoft.IdentityModel.JsonWebTokens;
-using osu.Game.Online.Metadata;
 using osu.Game.Online.Multiplayer;
 using osu.Game.Scoring;
 using osu.Server.Spectator.Database.Models;
@@ -128,14 +127,6 @@ namespace osu.Server.Spectator.Database
         /// <param name="roomId">The room to retrieve playlist items from.</param>
         Task<multiplayer_playlist_item[]> GetAllPlaylistItemsAsync(long roomId);
 
-        /// <summary>
-        /// Retrieves any changed beatmap set IDs since last call.
-        /// </summary>
-        /// <param name="lastQueueId">A queue ID to fetch updated items since</param>
-        /// <param name="limit">Maximum number of entries to return. Defaults to 50.</param>
-        /// <returns>Update metadata.</returns>
-        Task<BeatmapUpdates> GetUpdatedBeatmapSets(int? lastQueueId, int limit = 50);
-
         /// <summary>
         /// Mark a score as having a replay available.
         /// </summary>
diff --git a/osu.Server.Spectator/Hubs/Metadata/MetadataBroadcaster.cs b/osu.Server.Spectator/Hubs/Metadata/MetadataBroadcaster.cs
index f45d63a..ac3913d 100644
--- a/osu.Server.Spectator/Hubs/Metadata/MetadataBroadcaster.cs
+++ b/osu.Server.Spectator/Hubs/Metadata/MetadataBroadcaster.cs
@@ -3,13 +3,12 @@
 
 using System;
 using System.Linq;
-using System.Threading;
-using System.Timers;
 using Microsoft.AspNetCore.SignalR;
 using Microsoft.Extensions.Logging;
 using osu.Game.Online.Metadata;
+using osu.Server.QueueProcessor;
 using osu.Server.Spectator.Database;
-using Timer = System.Timers.Timer;
+using BeatmapUpdates = osu.Server.QueueProcessor.BeatmapUpdates;
 
 namespace osu.Server.Spectator.Hubs.Metadata
 {
@@ -21,12 +20,10 @@ namespace osu.Server.Spectator.Hubs.Metadata
         private readonly IDatabaseFactory databaseFactory;
         private readonly IHubContext<MetadataHub> metadataHubContext;
 
-        private readonly Timer timer;
-        private readonly CancellationTokenSource timerCancellationSource;
-        private readonly CancellationToken timerCancellationToken;
         private readonly ILogger logger;
 
         private int? lastQueueId;
+        private readonly IDisposable poller;
 
         public MetadataBroadcaster(
             ILoggerFactory loggerFactory,
@@ -37,51 +34,25 @@ namespace osu.Server.Spectator.Hubs.Metadata
             this.metadataHubContext = metadataHubContext;
             this.logger = loggerFactory.CreateLogger(nameof(MetadataBroadcaster));
 
-            timerCancellationSource = new CancellationTokenSource();
-            timerCancellationToken = timerCancellationSource.Token;
-
-            timer = new Timer(5000);
-            timer.AutoReset = false;
-            timer.Elapsed += pollForChanges;
-            timer.Start();
+            poller = BeatmapStatusWatcher.StartPollingAsync(handleUpdates, 5000).Result;
         }
 
         // ReSharper disable once AsyncVoidMethod
-        private async void pollForChanges(object? sender, ElapsedEventArgs args)
+        private async void handleUpdates(BeatmapUpdates updates)
         {
-            try
-            {
-                using (var db = databaseFactory.GetInstance())
-                {
-                    var updates = await db.GetUpdatedBeatmapSets(lastQueueId);
-
-                    lastQueueId = updates.LastProcessedQueueID;
-                    logger.LogInformation("Polled beatmap changes up to last queue id {lastProcessedQueueID}", updates.LastProcessedQueueID);
+            lastQueueId = updates.LastProcessedQueueID;
+            logger.LogInformation("Polled beatmap changes up to last queue id {lastProcessedQueueID}", updates.LastProcessedQueueID);
 
-                    if (updates.BeatmapSetIDs.Any())
-                    {
-                        logger.LogInformation("Broadcasting new beatmaps to client: {beatmapIds}", string.Join(',', updates.BeatmapSetIDs.Select(i => i.ToString())));
-                        await metadataHubContext.Clients.All.SendAsync(nameof(IMetadataClient.BeatmapSetsUpdated), updates, cancellationToken: timerCancellationToken);
-                    }
-                }
-            }
-            catch (Exception e)
-            {
-                logger.LogError(e, $"Error during beatmap update polling");
-            }
-            finally
+            if (updates.BeatmapSetIDs.Any())
             {
-                if (timerCancellationToken.IsCancellationRequested)
-                    timer.Dispose();
-                else
-                    timer.Start();
+                logger.LogInformation("Broadcasting new beatmaps to client: {beatmapIds}", string.Join(',', updates.BeatmapSetIDs.Select(i => i.ToString())));
+                await metadataHubContext.Clients.All.SendAsync(nameof(IMetadataClient.BeatmapSetsUpdated), updates);
             }
         }
 
         public void Dispose()
         {
-            timerCancellationSource.Cancel();
-            timerCancellationSource.Dispose();
+            poller.Dispose();
         }
     }
 }
diff --git a/osu.Server.Spectator/Hubs/Metadata/MetadataHub.cs b/osu.Server.Spectator/Hubs/Metadata/MetadataHub.cs
index 9c2f942..230bc70 100644
--- a/osu.Server.Spectator/Hubs/Metadata/MetadataHub.cs
+++ b/osu.Server.Spectator/Hubs/Metadata/MetadataHub.cs
@@ -13,11 +13,13 @@ using Microsoft.Extensions.Logging;
 using osu.Game.Online;
 using osu.Game.Online.Metadata;
 using osu.Game.Users;
+using osu.Server.QueueProcessor;
 using osu.Server.Spectator.Database;
 using osu.Server.Spectator.Database.Models;
 using osu.Server.Spectator.Entities;
 using osu.Server.Spectator.Extensions;
 using osu.Server.Spectator.Hubs.Spectator;
+using BeatmapUpdates = osu.Game.Online.Metadata.BeatmapUpdates;
 
 namespace osu.Server.Spectator.Hubs.Metadata
 {
@@ -107,8 +109,7 @@ namespace osu.Server.Spectator.Hubs.Metadata
 
         public async Task<BeatmapUpdates> GetChangesSince(int queueId)
         {
-            using (var db = databaseFactory.GetInstance())
-                return await db.GetUpdatedBeatmapSets(queueId);
+            return await BeatmapStatusWatcher.GetUpdatedBeatmapSetsAsync(queueId);
         }
 
         public async Task BeginWatchingUserPresence()

Two things to note:

  • BeatmapUpdates either needs to be in osu-queue-processor and osu-queue-processor needs to be pulled by osu.Game, or vice-versa. Or we change this to use a tuple and avoid a custom type?

JetBrains Rider-EAP 2025-03-13 at 08 05 17

  • The new method is using DatabaseAccess for database connection retrieval, which differs from how server-spectator is doing things. I don't know if this is going to be perceived as an issue or blocker, please discuss.

@bdach
Copy link
Contributor

bdach commented Mar 13, 2025

BeatmapUpdates either needs to be in osu-queue-processor and osu-queue-processor needs to be pulled by osu.Game, or vice-versa.

Both of these hypotheticals sound really bad. I think having a copy local to this project is completely fine honestly. I've gotten a lot more skittish on sharing models over the years, yes it may be stupid boilerplate but reuse has much more insidious ways of biting back.

The new method is using DatabaseAccess for database connection retrieval, which differs from how server-spectator is doing things. I don't know if this is going to be perceived as an issue or blocker, please discuss.

I don't see it as an issue, they're all using mysqlconnector underneath. Could even say that spectator server could be seemingly quite easily made to use this project's DatabaseAccess (with the subtle caveat that some of the envvar defaults would change, like DB_USER).

@peppy
Copy link
Member Author

peppy commented Mar 13, 2025

I think having a copy local to this project is completely fine honestly.

It's used in osu.Game though, which is why I brought this up. Are you proposing we have two version of the same class?

@bdach
Copy link
Contributor

bdach commented Mar 13, 2025

Are you proposing we have two version of the same class?

Yes, the same way that you could say we have 2 versions of various interop classes. There is precedent for this, we already do this sort of thing with the redis-pushed data I believe:

I really don't want some wild dependency chains for this. You could even argue that the setup we have with osu-server-spectator and osu is borderline pathological, bumps are way too much of a pain in the ass with that model sharing done there (it's only excused by us needing more out of the game nuget in server spectator than just the data structures).

The "proper" way to do this would be to have a lightweight nuget with just data contracts but I know you're on record hating on complex nuget setups, so duplication is the second best option as far as I'm concerned.

@peppy
Copy link
Member Author

peppy commented Mar 13, 2025

I'd be fine with moving all models to this project and renaming it for the record.

Will duplicate for now though.

@peppy peppy requested a review from bdach March 17, 2025 02:46
Copy link
Contributor

@bdach bdach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably fine


if (lastQueueId.HasValue)
{
var items = (await connection.QueryAsync<bss_process_queue_item>("SELECT * FROM bss_process_queue WHERE queue_id > @lastQueueId LIMIT @limit", new
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how much it matters but here's a loose thought: the way this is written sort of seems to implicitly rely on this query being ORDER BY queue_id ASC. Which it is probably going to be, but I'm not sure anything in the SQL standard actually guarantees that a SELECT will respect primary key ordering, and reading stuff like https://dev.mysql.com/doc/refman/8.4/en/limit-optimization.html having LIMIT in can apparently alter row ordering in some cases.

Maybe worth specifying explicitly, maybe not. Dunno.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's explicitly enforce it. I'll make the change.

@bdach bdach merged commit 299e0f6 into ppy:master Mar 17, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants