Skip to content

Commit

Permalink
Simplify stream lookup logic
Browse files Browse the repository at this point in the history
  • Loading branch information
nukeop committed Feb 23, 2025
1 parent ec8b887 commit adbf429
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 97 deletions.
162 changes: 91 additions & 71 deletions packages/app/app/actions/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { createStandardAction } from 'typesafe-actions';
import { v4 } from 'uuid';

import { rest, StreamProvider } from '@nuclear/core';
import { getTrackArtist } from '@nuclear/ui';
import { getTrackArtist, getTrackTitle } from '@nuclear/ui';
import { Track } from '@nuclear/ui/lib/types';

import { safeAddUuid } from './helpers';
Expand Down Expand Up @@ -98,10 +98,6 @@ export const queueDrop = (paths) => ({
payload: paths
});

export const streamFailed = () => ({
type: Queue.STREAM_FAILED
});

export function repositionSong(itemFrom, itemTo) {
return {
type: Queue.REPOSITION_TRACK,
Expand Down Expand Up @@ -167,79 +163,103 @@ export const selectNewStream = (index: number, streamId: string) => async (dispa
}
};

const verifyStreamWithService = async (
track: QueueItem,
streamData: TrackStream[],
selectedStreamProvider: StreamProvider,
settings: RootState['settings']
): Promise<TrackStream[]> => {
if (!settings.useStreamVerification) {
return streamData;
}

try {
const StreamMappingsService = rest.NuclearStreamMappingsService.get(process.env.NUCLEAR_VERIFICATION_SERVICE_URL);
const topStream = await StreamMappingsService.getTopStream(
getTrackArtist(track),
getTrackTitle(track),
selectedStreamProvider.sourceName,
settings?.userId
);

if (!isSuccessCacheEntry(topStream)) {
return streamData;
}

const verifiedStream = streamData.find(stream => stream.id === topStream.value.stream_id);
const otherStreams = streamData.filter(stream => stream.id !== topStream.value.stream_id);
return verifiedStream ? [verifiedStream, ...otherStreams] : streamData;
} catch (e) {
logger.error('Failed to get top stream', e);
return streamData;
}
};

const resolveStreams = async (
track: QueueItem,
selectedStreamProvider: StreamProvider
): Promise<TrackStream[]> => {
const streamData = await getTrackStreams(track, selectedStreamProvider);

if (streamData.length === 0) {
return [];
}

return resolveSourceUrlForTheFirstStream(streamData, selectedStreamProvider);
};

export const findStreamsForTrack = (index: number) => async (dispatch, getState) => {
const {queue, settings}: RootState = getState();
const track = queue.queueItems[index];

if (track && !track.local && trackHasNoFirstStream(track)) {
if (!track.loading) {
dispatch(updateQueueItem({
...track,
loading: true
}));
if (!track || track.local || !trackHasNoFirstStream(track)) {
return;
}

if (!track.loading) {
dispatch(updateQueueItem({ ...track, loading: true }));
}

const selectedStreamProvider = getSelectedStreamProvider(getState);

try {
let streamData = await resolveStreams(track, selectedStreamProvider);

if (streamData.length === 0) {
dispatch(removeFromQueue(index));
return;
}
const selectedStreamProvider = getSelectedStreamProvider(getState);
try {
let streamData: TrackStream[] = await getTrackStreams(track, selectedStreamProvider);

if (settings.useStreamVerification) {
try {
const StreamMappingsService = rest.NuclearStreamMappingsService.get(process.env.NUCLEAR_VERIFICATION_SERVICE_URL);
const topStream = await StreamMappingsService.getTopStream(
track.artist,
track.name,
selectedStreamProvider.sourceName,
settings?.userId
);
// Use the top stream ID and put it at the top of the list
if (isSuccessCacheEntry(topStream)) {
streamData = [
streamData.find(stream => stream.id === topStream.value.stream_id),
...streamData.filter(stream => stream.id !== topStream.value.stream_id)
];
}
} catch (e) {
logger.error('Failed to get top stream', e);
}
}

if (streamData?.length === 0) {
dispatch(removeFromQueue(index));
} else {
streamData = await resolveSourceUrlForTheFirstStream(streamData, selectedStreamProvider);

const firstStream = streamData[0];
if (!firstStream?.stream) {
const remainingStreams = streamData.slice(1);
removeFirstStream(track, index, remainingStreams, dispatch);
return;
}

dispatch(
updateQueueItem({
...track,
loading: false,
error: false,
streams: streamData
})
);
}
} catch (e) {
logger.error(
`An error has occurred when searching for streams with ${selectedStreamProvider.sourceName} for "${track.artist} - ${track.name}."`
);
logger.error(e);
dispatch(
updateQueueItem({
...track,
loading: false,
error: {
message: `An error has occurred when searching for streams with ${selectedStreamProvider.sourceName}.`,
details: e.message
}
})
);
streamData = await verifyStreamWithService(track, streamData, selectedStreamProvider, settings);

const firstStream = streamData[0];
if (!firstStream?.stream) {
const remainingStreams = streamData.slice(1);
removeFirstStream(track, index, remainingStreams, dispatch);
return;
}

dispatch(updateQueueItem({
...track,
loading: false,
error: false,
streams: streamData
}));
} catch (e) {
logger.error(
`An error has occurred when searching for streams with ${selectedStreamProvider.sourceName} for "${track.artist} - ${track.name}". Retrying...`
);
logger.error(e);

dispatch(updateQueueItem({
...track,
loading: false,
streamLookupRetries: (track.streamLookupRetries ?? 0) + 1,
error: {
message: 'Stream lookup error. Retrying...',
details: e.message
}
}));
}
};

Expand Down
2 changes: 1 addition & 1 deletion packages/app/app/containers/PlayerBarContainer/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ export const useStreamLookup = () => {
const nextTrackWithNoStream = queue.queueItems.findIndex((item, index) =>
index !== queue.currentSong &&
!item.loading &&
isEmpty(item.streams)
isEmpty(item.streams) && (item.streamLookupRetries < 3 || item.streamLookupRetries === undefined)
);

if (nextTrackWithNoStream !== -1) {
Expand Down
29 changes: 4 additions & 25 deletions packages/app/app/reducers/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import _ from 'lodash';
import { Queue } from '../actions/actionTypes';
import { SELECT_STREAM_PROVIDER } from '../actions/plugins';
import { removeFromQueue } from '../actions/queue';
import { logger } from '@nuclear/core';

export type TrackStream = {
id: string;
Expand All @@ -24,6 +25,7 @@ export type QueueItem = {
message: string;
details: string;
};
streamLookupRetries?: number;
local?: boolean;
artist: string | { name: string };
name: string;
Expand All @@ -38,10 +40,6 @@ export class QueueStore {

const defaultState = {...new QueueStore()};

const findQueueItemIndex = (queueItems: QueueItem[], item: QueueItem) => {
return _.findIndex(queueItems, i => i.uuid === item.uuid);
};

function reduceRemoveFromQueue(state, action: ReturnType<typeof removeFromQueue>) {
const removeIx = action.payload.index;
let newCurrent = state.currentSong;
Expand Down Expand Up @@ -106,24 +104,6 @@ function reducePreviousSong(state) {
});
}

function reduceStreamFailed(state) {
return {
...state,
queueItems: state.queueItems.map((item, index) => {
if (index === state.currentSong) {
return {
...item,
error: {
message: 'Could not find a working stream using this source.',
details: 'Try re-rolling.'
}
};
}
return item;
})
};
}

function reduceSelectStreamProviders(state) {
return {
...state,
Expand Down Expand Up @@ -157,7 +137,8 @@ const reduceAddPlayNextItem = (state, action) => {
};
};

export default function QueueReducer(state = defaultState, action) {
export default function QueueReducer(state = defaultState, action): QueueStore {
logger.log('QueueReducer', action);
switch (action.type) {
case Queue.ADD_QUEUE_ITEM:
return {
Expand All @@ -183,8 +164,6 @@ export default function QueueReducer(state = defaultState, action) {
return reduceSelectSong(state, action);
case Queue.REPOSITION_TRACK:
return reduceRepositionSong(state, action);
case Queue.STREAM_FAILED:
return reduceStreamFailed(state);
case SELECT_STREAM_PROVIDER:
return reduceSelectStreamProviders(state);
default:
Expand Down

0 comments on commit adbf429

Please sign in to comment.