浏览代码

feat: basic audio playback demo

    * feat: working demo of playing a song (based on id)
    * fix: dispatch side effects to server
    * feat: TTL for clients in redis, with refactored disconnection logic
    * feat: unique name per client
Fela Maslen 5 年之前
父节点
当前提交
2f4dd0051c

+ 2 - 0
gmus/package.json

@@ -28,6 +28,7 @@
     ]
   },
   "dependencies": {
+    "@react-hook/throttle": "^2.2.0",
     "@testing-library/jest-dom": "^5.11.4",
     "@testing-library/react": "^11.1.0",
     "@testing-library/user-event": "^12.1.10",
@@ -40,6 +41,7 @@
     "eslint-plugin-prettier": "^3.3.0",
     "jest-websocket-mock": "^2.2.0",
     "mock-socket": "^9.0.3",
+    "nanoid": "^3.1.20",
     "prettier": "^2.2.1",
     "react": "^17.0.1",
     "react-dom": "^17.0.1",

+ 15 - 9
gmus/src/actions/actions.ts

@@ -1,4 +1,4 @@
-import { MusicPlayer } from '../types/state';
+import { Member, MusicPlayer } from '../types/state';
 
 interface Action<T extends string = string, P = unknown> {
   type: T;
@@ -7,30 +7,36 @@ interface Action<T extends string = string, P = unknown> {
 
 export enum ActionTypeRemote {
   StateSet = 'STATE_SET',
-  ClientConnected = 'CLIENT_CONNECTED',
-  ClientDisconnected = 'CLIENT_DISCONNECTED',
+  ClientListUpdated = 'CLIENT_LIST_UPDATED',
 }
 
 // Remote actions - these only come FROM the socket
 export type ActionStateSetRemote = Action<ActionTypeRemote.StateSet, MusicPlayer | null>;
 
-export type ActionClientConnected = Action<ActionTypeRemote.ClientConnected, string[]>;
-export type ActionClientDisconnected = Action<ActionTypeRemote.ClientDisconnected, string[]>;
+export type ActionClientListUpdated = Action<ActionTypeRemote.ClientListUpdated, Member[]>;
 
-export type RemoteAction = ActionStateSetRemote | ActionClientConnected | ActionClientDisconnected;
+export type RemoteAction = ActionStateSetRemote | ActionClientListUpdated;
 
 // Local actions - these are dispatched from this client
 export enum ActionTypeLocal {
+  NameSet = '@@local/NAME_SET',
   StateSet = '@@local/STATE_SET',
 }
 
-export type ActionStateSetLocal = Action<ActionTypeLocal.StateSet, MusicPlayer>;
+export type ActionNameSet = Action<ActionTypeLocal.NameSet, string>;
 
-export const stateSet = (state: MusicPlayer): ActionStateSetLocal => ({
+export const nameSet = (name: string): ActionNameSet => ({
+  type: ActionTypeLocal.NameSet,
+  payload: name,
+});
+
+export type ActionStateSetLocal = Action<ActionTypeLocal.StateSet, Partial<MusicPlayer>>;
+
+export const stateSet = (state: Partial<MusicPlayer> = {}): ActionStateSetLocal => ({
   type: ActionTypeLocal.StateSet,
   payload: state,
 });
 
-export type LocalAction = ActionStateSetLocal;
+export type LocalAction = ActionNameSet | ActionStateSetLocal;
 
 export type AnyAction = LocalAction | RemoteAction;

+ 8 - 4
gmus/src/components/client-list/index.tsx

@@ -1,8 +1,11 @@
 import React, { useCallback } from 'react';
+
 import { useCTA } from '../../hooks/cta';
+import { Member } from '../../types/state';
 
 export type Props = {
-  clients: string[];
+  myClientName: string;
+  clients: Member[];
 };
 
 type PropsListItem = {
@@ -20,7 +23,7 @@ const ClientListItem: React.FC<PropsListItem> = ({ name, onSelect }) => {
   return <li {...eventProps}>{name}</li>;
 };
 
-export const ClientList: React.FC<Props> = ({ clients }) => {
+export const ClientList: React.FC<Props> = ({ myClientName, clients }) => {
   const onSelectClient = useCallback((name: string) => {
     console.log('Selected client!', name);
     // TODO
@@ -28,9 +31,10 @@ export const ClientList: React.FC<Props> = ({ clients }) => {
 
   return (
     <div>
-      <h6>Client list</h6>
+      <h5>Client list</h5>
+      <h6>My name: {myClientName}</h6>
       <ul>
-        {clients.map((name) => (
+        {clients.map(({ name }) => (
           <ClientListItem key={name} name={name} onSelect={onSelectClient} />
         ))}
       </ul>

+ 52 - 17
gmus/src/components/gmus/index.tsx

@@ -1,19 +1,42 @@
-import React, { useCallback, useState } from 'react';
+import { useThrottleCallback } from '@react-hook/throttle';
+import React, { Dispatch, useCallback, useEffect, useRef, useState } from 'react';
 
-import { stateSet } from '../../actions';
-import { useKeepalive } from '../../hooks/socket';
-import { useGlobalState } from '../../hooks/state';
+import { AnyAction, stateSet } from '../../actions';
+import { masterStateUpdateTimeout } from '../../constants/system';
+import { useDispatchEffects, useKeepalive } from '../../hooks/socket';
+import { GlobalState } from '../../reducer/types';
 import { ClientList } from '../client-list';
+import { Player } from '../player';
 
 export type Props = {
-  myClientName: string;
   socket: WebSocket;
+  state: GlobalState;
+  dispatch: Dispatch<AnyAction>;
 };
 
-export const Gmus: React.FC<Props> = ({ myClientName, socket }) => {
+function useMaster(dispatch: Dispatch<AnyAction>, isMaster: boolean): void {
+  const masterUpdateTimer = useRef<number>(0);
+  useEffect(() => {
+    if (isMaster) {
+      masterUpdateTimer.current = window.setInterval(() => {
+        dispatch(stateSet());
+      }, masterStateUpdateTimeout);
+    }
+
+    return (): void => {
+      window.clearInterval(masterUpdateTimer.current);
+    };
+  }, [dispatch, isMaster]);
+}
+
+export const Gmus: React.FC<Props> = ({ socket, state, dispatch }) => {
   useKeepalive(socket);
+  useDispatchEffects(socket, state);
 
-  const [{ clientList, player }, dispatch] = useGlobalState(socket);
+  const { clientList, player, myClientName } = state;
+
+  const isMaster = player.master === myClientName;
+  useMaster(dispatch, isMaster);
 
   const [tempSongId, setTempSongId] = useState<number>(0);
 
@@ -25,21 +48,26 @@ export const Gmus: React.FC<Props> = ({ myClientName, socket }) => {
     dispatch(
       stateSet({
         songId: tempSongId,
-        playTimeSeconds: 0,
+        currentTime: 0,
         playing: true,
-        currentClient: myClientName,
+        master: myClientName,
       }),
     );
   }, [dispatch, tempSongId, myClientName]);
 
   const playPause = useCallback(() => {
-    dispatch(
-      stateSet({
-        ...player,
-        playing: !player.playing,
-      }),
-    );
-  }, [dispatch, player]);
+    dispatch(stateSet({ playing: !player.playing }));
+  }, [dispatch, player.playing]);
+
+  const onTimeUpdate = useCallback(
+    (currentTime: number): void => {
+      if (isMaster) {
+        dispatch(stateSet({ currentTime }));
+      }
+    },
+    [dispatch, isMaster],
+  );
+  const onTimeUpdateThrottled = useThrottleCallback(onTimeUpdate, 1000);
 
   return (
     <div>
@@ -55,11 +83,18 @@ export const Gmus: React.FC<Props> = ({ myClientName, socket }) => {
         />
         <button onClick={playSong}>Change track</button>
       </div>
-      <ClientList clients={clientList} />
+      <ClientList myClientName={myClientName} clients={clientList} />
       <div>
         <h6>Player State</h6>
         <pre>{JSON.stringify(player, null, 2)}</pre>
       </div>
+      {isMaster && !!player.songId && (
+        <Player
+          playing={player.playing}
+          onTimeUpdate={onTimeUpdateThrottled}
+          songId={player.songId}
+        />
+      )}
     </div>
   );
 };

+ 36 - 0
gmus/src/components/player/index.tsx

@@ -0,0 +1,36 @@
+/* eslint-disable jsx-a11y/media-has-caption */
+import React, { CSSProperties, useEffect, useRef } from 'react';
+import { getSongUrl } from '../../utils/url';
+
+type Props = {
+  playing: boolean;
+  onTimeUpdate: (time: number) => void;
+  songId: number;
+};
+
+const hidden: CSSProperties = { visibility: 'hidden' };
+
+export const Player: React.FC<Props> = ({ playing, onTimeUpdate, songId }) => {
+  const audio = useRef<HTMLAudioElement | null>(null);
+
+  useEffect(() => {
+    if (audio.current) {
+      audio.current.ontimeupdate = (): void => {
+        onTimeUpdate(audio.current?.currentTime ?? 0);
+      };
+    }
+  }, [onTimeUpdate]);
+
+  useEffect(() => {
+    if (!audio.current) {
+      return;
+    }
+    if (playing) {
+      audio.current.play();
+    } else {
+      audio.current.pause();
+    }
+  }, [playing]);
+
+  return <audio ref={audio} src={getSongUrl(songId)} style={hidden} />;
+};

+ 21 - 4
gmus/src/components/root/index.tsx

@@ -1,15 +1,32 @@
-import React from 'react';
+import React, { Reducer, useCallback, useReducer } from 'react';
+import { AnyAction, nameSet } from '../../actions';
 
-import { useSocket } from '../../hooks/socket';
+import { useOnMessage, useSocket } from '../../hooks/socket';
+import { composedGlobalReducer, GlobalState, init, initialState } from '../../reducer';
 import { Gmus } from '../gmus';
 import { Identify } from '../identify';
 
 export const Root: React.FC = () => {
-  const { name, onIdentify, socket, connecting, connected, error } = useSocket();
+  const [state, dispatch] = useReducer<Reducer<GlobalState, AnyAction>, GlobalState>(
+    composedGlobalReducer,
+    initialState,
+    init,
+  );
+
+  const onMessage = useOnMessage(dispatch);
+
+  const onLogin = useCallback(
+    (name: string): void => {
+      dispatch(nameSet(name));
+    },
+    [dispatch],
+  );
+
+  const { name, onIdentify, socket, connecting, connected, error } = useSocket(onMessage, onLogin);
 
   if (!(socket && connected && name) || error) {
     return <Identify connecting={connecting} onIdentify={onIdentify} />;
   }
 
-  return <Gmus myClientName={name} socket={socket} />;
+  return <Gmus socket={socket} state={state} dispatch={dispatch} />;
 };

+ 2 - 0
gmus/src/constants/system.ts

@@ -1 +1,3 @@
 export const socketKeepaliveTimeout = 20000;
+
+export const masterStateUpdateTimeout = 5000;

+ 4 - 3
gmus/src/effects/effects.spec.ts

@@ -1,4 +1,5 @@
 import { ActionStateSetRemote, ActionTypeLocal, ActionTypeRemote, stateSet } from '../actions';
+import { composedGlobalReducer, initialState } from '../reducer';
 import { MusicPlayer } from '../types/state';
 import { globalEffects } from './effects';
 
@@ -10,11 +11,11 @@ describe(globalEffects.name, () => {
       const localPlayer: MusicPlayer = {
         songId: 123,
         playing: false,
-        playTimeSeconds: 83,
-        currentClient: 'my-client',
+        currentTime: 83,
+        master: 'my-client',
       };
 
-      const result = globalEffects(stateSet(localPlayer));
+      const result = globalEffects(composedGlobalReducer(initialState, stateSet(localPlayer)));
 
       expect(result).toStrictEqual<ActionStateSetRemote>({
         type: ActionTypeRemote.StateSet,

+ 6 - 5
gmus/src/effects/effects.ts

@@ -1,13 +1,14 @@
-import { ActionTypeLocal, ActionTypeRemote, AnyAction, RemoteAction } from '../actions';
+import { ActionTypeLocal, ActionTypeRemote, RemoteAction } from '../actions';
+import { GlobalState } from '../reducer';
 
-export function globalEffects(lastAction: AnyAction | null): RemoteAction | null {
-  if (!lastAction) {
+export function globalEffects(state: GlobalState): RemoteAction | null {
+  if (!state.lastAction) {
     return null;
   }
 
-  switch (lastAction.type) {
+  switch (state.lastAction.type) {
     case ActionTypeLocal.StateSet:
-      return { type: ActionTypeRemote.StateSet, payload: lastAction.payload };
+      return { type: ActionTypeRemote.StateSet, payload: state.player };
 
     default:
       return null;

+ 132 - 7
gmus/src/hooks/socket.spec.tsx

@@ -1,15 +1,93 @@
 import { act, fireEvent, render, RenderResult } from '@testing-library/react';
 import WS from 'jest-websocket-mock';
-import React from 'react';
+import React, { Dispatch } from 'react';
 import * as storageHooks from 'react-storage-hooks';
 
-import { useSocket } from './socket';
+import { AnyAction, RemoteAction } from '../actions';
+import * as effects from '../effects/effects';
+import { GlobalState } from '../reducer';
+
+import { useDispatchEffects, useOnMessage, useSocket } from './socket';
+
+jest.mock('nanoid', () => ({
+  nanoid: (): string => 'A5v3D',
+}));
+
+describe(useOnMessage.name, () => {
+  const dispatch: Dispatch<AnyAction> = jest.fn();
+
+  const testMessage = {
+    data: JSON.stringify({
+      type: 'SOME_ACTION_FROM_SOCKET',
+      payload: {
+        some: 'thing',
+      },
+    }),
+  } as MessageEvent<unknown>;
+
+  const TestComponent: React.FC = () => {
+    const onMessage = useOnMessage(dispatch);
+
+    return <button onClick={(): void => onMessage(testMessage)}>Simulate message!</button>;
+  };
+
+  it('should return a function which dispatches actions', () => {
+    expect.assertions(2);
+
+    const { getByText } = render(<TestComponent />);
+    act(() => {
+      fireEvent.click(getByText('Simulate message!'));
+    });
+
+    expect(dispatch).toHaveBeenCalledTimes(1);
+    expect(dispatch).toHaveBeenCalledWith({
+      type: 'SOME_ACTION_FROM_SOCKET',
+      payload: {
+        some: 'thing',
+      },
+    });
+  });
+});
+
+describe(useDispatchEffects.name, () => {
+  const someAction = ({
+    type: 'SOME_ACTION',
+    payload: 'yes',
+  } as unknown) as RemoteAction;
+
+  const state = {} as GlobalState;
+
+  const socket = ({
+    send: jest.fn(),
+  } as unknown) as WebSocket;
+
+  const TestComponent: React.FC = () => {
+    useDispatchEffects(socket, state);
+    return null;
+  };
+
+  describe('when an action is dispatched locally which produces an effect', () => {
+    it('should send the effect action to the socket', async () => {
+      expect.assertions(2);
+
+      jest.spyOn(effects, 'globalEffects').mockReturnValueOnce(someAction);
+
+      render(<TestComponent />);
+
+      expect(socket.send).toHaveBeenCalledTimes(1);
+      expect(socket.send).toHaveBeenCalledWith(JSON.stringify(someAction));
+    });
+  });
+});
 
 describe(useSocket.name, () => {
   afterEach(WS.clean);
 
+  const onMessage = jest.fn();
+  const onLogin = jest.fn();
+
   const TestComponent: React.FC = () => {
-    const { onIdentify, socket, ...hookResult } = useSocket();
+    const { onIdentify, socket, ...hookResult } = useSocket(onMessage, onLogin);
 
     return (
       <>
@@ -56,12 +134,12 @@ describe(useSocket.name, () => {
       return renderResult;
     };
 
-    it('should create a new connection to the socket, using the client name in the query', async () => {
+    it('should create a new connection to the socket, using a unique client name in the query', async () => {
       expect.assertions(1);
       setupIdentify();
 
       const res = await server.connected;
-      expect(res.url).toBe('ws://my-api.url:1234/pubsub?client-name=my-client-name');
+      expect(res.url).toBe('ws://my-api.url:1234/pubsub?client-name=my-client-name-A5v3D');
     });
 
     it('should open a new socket', async () => {
@@ -94,6 +172,20 @@ describe(useSocket.name, () => {
       );
     });
 
+    it('should return the unique name', async () => {
+      expect.assertions(1);
+      const { getByTestId } = setupIdentify();
+      await act(async () => {
+        await server.connected;
+      });
+
+      expect(JSON.parse(getByTestId('hook-result').innerHTML)).toStrictEqual(
+        expect.objectContaining({
+          name: 'my-client-name-A5v3D',
+        }),
+      );
+    });
+
     it('should save the client name', async () => {
       expect.assertions(2);
       setupIdentify();
@@ -104,6 +196,17 @@ describe(useSocket.name, () => {
       expect(saveName).toHaveBeenCalledTimes(1);
       expect(saveName).toHaveBeenCalledWith('my-client-name');
     });
+
+    it('should call onLogin', async () => {
+      expect.assertions(2);
+      setupIdentify();
+      await act(async () => {
+        await server.connected;
+      });
+
+      expect(onLogin).toHaveBeenCalledTimes(1);
+      expect(onLogin).toHaveBeenCalledWith('my-client-name-A5v3D');
+    });
   });
 
   describe('when the name is stored in localStorage', () => {
@@ -128,13 +231,13 @@ describe(useSocket.name, () => {
       );
     });
 
-    it('should open a socket immediately, using the stored name', async () => {
+    it('should open a socket immediately, using a unique version of the stored name', async () => {
       expect.assertions(3);
       const { getByText } = render(<TestComponent />);
 
       const res = await server.connected;
 
-      expect(res.url).toBe('ws://my-api.url:1234/pubsub?client-name=my-stored-name');
+      expect(res.url).toBe('ws://my-api.url:1234/pubsub?client-name=my-stored-name-A5v3D');
 
       act(() => {
         fireEvent.click(getByText('Say hello!'));
@@ -158,4 +261,26 @@ describe(useSocket.name, () => {
       );
     });
   });
+
+  describe('when a message is received from the server', () => {
+    let server: WS;
+    beforeEach(() => {
+      server = new WS('ws://my-api.url:1234/pubsub');
+    });
+
+    it('should call onMessage', async () => {
+      expect.assertions(2);
+      const { getByText } = render(<TestComponent />);
+      act(() => {
+        fireEvent.click(getByText('Identify!'));
+      });
+
+      await server.connected;
+
+      server.send('foo');
+
+      expect(onMessage).toHaveBeenCalledTimes(1);
+      expect(onMessage).toHaveBeenCalledWith(expect.objectContaining({ data: 'foo' }));
+    });
+  });
 });

+ 52 - 8
gmus/src/hooks/socket.ts

@@ -1,10 +1,44 @@
-import { useEffect, useRef, useState } from 'react';
+import { nanoid } from 'nanoid';
+import { Dispatch, useCallback, useEffect, useRef, useState } from 'react';
 import { useStorageState } from 'react-storage-hooks';
 
+import { AnyAction } from '../actions';
 import { socketKeepaliveTimeout } from '../constants/system';
+import { globalEffects } from '../effects';
+import { GlobalState } from '../reducer';
 import { getPubsubUrl } from '../utils/url';
 
-export function useSocket(): {
+const getUniqueName = (name: string): string => (name.length ? `${name}-${nanoid(5)}` : '');
+
+export type OnMessage = (message: MessageEvent<unknown>) => void;
+
+export function useOnMessage(dispatch: Dispatch<AnyAction>): OnMessage {
+  return useCallback<OnMessage>(
+    ({ data }: MessageEvent<unknown>): void => {
+      try {
+        const action = JSON.parse(data as string) as AnyAction;
+        dispatch(action);
+      } catch (err) {
+        console.warn('Error parsing message from websocket', err.message);
+      }
+    },
+    [dispatch],
+  );
+}
+
+export function useDispatchEffects(socket: WebSocket, state: GlobalState): void {
+  useEffect(() => {
+    const remoteEffect = globalEffects(state);
+    if (remoteEffect) {
+      socket.send(JSON.stringify(remoteEffect));
+    }
+  }, [socket, state]);
+}
+
+export function useSocket(
+  onMessage: OnMessage,
+  onLogin: (name: string) => void,
+): {
   name: string | null;
   onIdentify: (name: string) => void;
   socket: WebSocket | null;
@@ -12,21 +46,27 @@ export function useSocket(): {
   connecting: boolean;
   connected: boolean;
 } {
-  const [name, saveName] = useStorageState<string>(localStorage, 'client-name', '');
-  const [tempName, setTempName] = useState<string>(name);
+  const [storedName, saveName] = useStorageState<string>(localStorage, 'client-name', '');
+  const [uniqueName, setUniqueName] = useState<string>(getUniqueName(storedName));
+  const [tempName, setTempName] = useState<string>(storedName);
 
   const [socket, setSocket] = useState<WebSocket | null>(null);
   const [error, setError] = useState<boolean>(false);
 
   const [connecting, setConnecting] = useState<boolean>(false);
 
+  const onIdentify = useCallback((newName: string) => {
+    setTempName(newName);
+  }, []);
+
   useEffect(() => {
     let cancelled = false;
     let ws: WebSocket | undefined;
     if (tempName) {
       setConnecting(true);
 
-      ws = new WebSocket(`${getPubsubUrl()}?client-name=${tempName}`);
+      const uniqueTempName = getUniqueName(tempName);
+      ws = new WebSocket(`${getPubsubUrl()}?client-name=${uniqueTempName}`);
 
       ws.onopen = (): void => {
         if (!cancelled && ws && ws.readyState === ws.OPEN) {
@@ -34,11 +74,15 @@ export function useSocket(): {
           setConnecting(false);
 
           saveName(tempName);
+          setUniqueName(uniqueTempName);
 
           setSocket(ws);
+          onLogin(uniqueTempName);
         }
       };
 
+      ws.onmessage = onMessage;
+
       ws.onclose = (): void => {
         if (cancelled) {
           return;
@@ -56,11 +100,11 @@ export function useSocket(): {
       cancelled = true;
       ws?.close();
     };
-  }, [tempName, saveName]);
+  }, [onMessage, onLogin, tempName, saveName]);
 
   return {
-    name,
-    onIdentify: setTempName,
+    name: uniqueName,
+    onIdentify,
     socket,
     error,
     connecting,

+ 0 - 91
gmus/src/hooks/state.spec.tsx

@@ -1,91 +0,0 @@
-import { act, fireEvent, render, RenderResult } from '@testing-library/react';
-import WS from 'jest-websocket-mock';
-import React from 'react';
-
-import { AnyAction, RemoteAction } from '../actions';
-import * as effects from '../effects/effects';
-import * as reducer from '../reducer/reducer';
-
-import { useGlobalState } from './state';
-
-describe(useGlobalState.name, () => {
-  afterEach(WS.clean);
-
-  let server: WS;
-  beforeEach(() => {
-    server = new WS('ws://my.api:1234');
-  });
-
-  const someAction = ({
-    type: 'SOME_ACTION',
-    payload: 'yes',
-  } as unknown) as AnyAction;
-
-  const otherAction = ({
-    type: 'OTHER_ACTION',
-    payload: {
-      three: Infinity,
-    },
-  } as unknown) as RemoteAction;
-
-  const TestComponent: React.FC<{ socket: WebSocket }> = ({ socket }) => {
-    const [state, dispatch] = useGlobalState(socket);
-
-    return (
-      <>
-        <div data-testid="state">{JSON.stringify(state)}</div>
-        <button onClick={(): void => dispatch(someAction)}>Dispatch!</button>
-      </>
-    );
-  };
-
-  const setup = async (): Promise<RenderResult> => {
-    const socket = new WebSocket('ws://my.api:1234');
-    await server.connected;
-    return render(<TestComponent socket={socket} />);
-  };
-
-  describe('when a message comes in from the socket', () => {
-    it('should dispatch the action to the global reducer', async () => {
-      expect.assertions(1);
-
-      jest.spyOn(reducer, 'composedGlobalReducer').mockImplementationOnce((state, action) => {
-        if (((action as unknown) as Record<string, unknown>).type === 'OTHER_ACTION') {
-          return { ...state, it: 'worked' } as reducer.GlobalState;
-        }
-        return state;
-      });
-
-      const { getByTestId } = await setup();
-
-      act(() => {
-        server.send(JSON.stringify(otherAction));
-      });
-
-      expect(JSON.parse(getByTestId('state').innerHTML)).toStrictEqual(
-        expect.objectContaining({ it: 'worked' }),
-      );
-    });
-  });
-
-  describe('when an action is dispatched locally which produces an effect', () => {
-    it('should send the effect action to the socket', async () => {
-      expect.assertions(2);
-
-      jest.spyOn(reducer, 'globalReducer').mockReturnValueOnce({
-        ...reducer.initialState,
-        lastAction: someAction,
-      });
-
-      jest.spyOn(effects, 'globalEffects').mockReturnValueOnce(otherAction);
-
-      const { getByText } = await setup();
-      act(() => {
-        fireEvent.click(getByText('Dispatch!'));
-      });
-
-      await expect(server).toReceiveMessage(JSON.stringify(otherAction));
-      expect(server).toHaveReceivedMessages([JSON.stringify(otherAction)]);
-    });
-  });
-});

+ 0 - 41
gmus/src/hooks/state.ts

@@ -1,41 +0,0 @@
-import { Dispatch, Reducer, useCallback, useEffect, useReducer } from 'react';
-
-import { AnyAction } from '../actions';
-import { globalEffects } from '../effects';
-
-import { composedGlobalReducer, GlobalState, initialState } from '../reducer';
-
-function init(state: GlobalState): GlobalState {
-  return state;
-}
-
-export function useGlobalState(socket: WebSocket): [GlobalState, Dispatch<AnyAction>] {
-  const [state, dispatch] = useReducer<Reducer<GlobalState, AnyAction>, GlobalState>(
-    composedGlobalReducer,
-    initialState,
-    init,
-  );
-
-  const onMessage = useCallback(({ data }: { data: string }): void => {
-    try {
-      const action = JSON.parse(data) as AnyAction;
-      dispatch(action);
-    } catch (err) {
-      console.warn('Error parsing message from websocket', err.message);
-    }
-  }, []);
-
-  useEffect(() => {
-    // eslint-disable-next-line no-param-reassign
-    socket.onmessage = onMessage;
-  }, [socket, onMessage]);
-
-  useEffect(() => {
-    const remoteEffect = globalEffects(state.lastAction);
-    if (remoteEffect) {
-      socket.send(JSON.stringify(remoteEffect));
-    }
-  }, [socket, state.lastAction]);
-
-  return [state, dispatch];
-}

+ 7 - 0
gmus/src/reducer/index.ts

@@ -1 +1,8 @@
+import { GlobalState } from './types';
+
 export * from './reducer';
+export * from './types';
+
+export function init(state: GlobalState): GlobalState {
+  return state;
+}

+ 109 - 34
gmus/src/reducer/reducer.spec.ts

@@ -1,12 +1,15 @@
 import {
-  ActionClientConnected,
-  ActionClientDisconnected,
+  ActionClientListUpdated,
   ActionStateSetLocal,
   ActionStateSetRemote,
   ActionTypeLocal,
   ActionTypeRemote,
+  nameSet,
+  stateSet,
 } from '../actions';
-import { composedGlobalReducer, globalReducer, initialState } from './reducer';
+import { MusicPlayer } from '../types/state';
+import { composedGlobalReducer, globalReducer, initialState, nullPlayer } from './reducer';
+import { GlobalState } from './types';
 
 describe(globalReducer.name, () => {
   describe(ActionTypeRemote.StateSet, () => {
@@ -15,8 +18,8 @@ describe(globalReducer.name, () => {
       payload: {
         songId: 123,
         playing: true,
-        playTimeSeconds: 75,
-        currentClient: 'some-client',
+        currentTime: 75,
+        master: 'some-client',
       },
     };
 
@@ -27,22 +30,67 @@ describe(globalReducer.name, () => {
       expect(result.player).toStrictEqual({
         songId: 123,
         playing: true,
-        playTimeSeconds: 75,
-        currentClient: 'some-client',
+        currentTime: 75,
+        master: 'some-client',
+      });
+    });
+
+    describe('when the client is master', () => {
+      const stateMaster: GlobalState = {
+        ...initialState,
+        player: {
+          ...nullPlayer,
+          master: 'some-client',
+          currentTime: 31,
+        },
+        myClientName: 'some-client',
+      };
+
+      it('should not update the currentTime', () => {
+        expect.assertions(1);
+        const result = globalReducer(stateMaster, action);
+
+        expect(result.player).toStrictEqual<MusicPlayer>({
+          songId: 123,
+          playing: true,
+          currentTime: 31, // not updated from the action
+          master: 'some-client',
+        });
+      });
+    });
+
+    describe('when the client was the master but no longer', () => {
+      const stateDifferentMaster: GlobalState = {
+        ...initialState,
+        player: {
+          ...nullPlayer,
+          master: 'a-client-16b3',
+          currentTime: 31,
+        },
+        myClientName: 'a-client-16b3',
+      };
+
+      it('should update the currentTime', () => {
+        expect.assertions(1);
+        const result = globalReducer(stateDifferentMaster, action);
+
+        expect(result.player).toStrictEqual<MusicPlayer>({
+          songId: 123,
+          playing: true,
+          currentTime: 75,
+          master: 'some-client',
+        });
       });
     });
   });
 
   describe(ActionTypeLocal.StateSet, () => {
-    const action: ActionStateSetLocal = {
-      type: ActionTypeLocal.StateSet,
-      payload: {
-        songId: 123,
-        playing: true,
-        playTimeSeconds: 75,
-        currentClient: 'some-client',
-      },
-    };
+    const action = stateSet({
+      songId: 123,
+      playing: true,
+      currentTime: 75,
+      master: 'some-client',
+    });
 
     it('should set the player state', () => {
       expect.assertions(1);
@@ -51,32 +99,59 @@ describe(globalReducer.name, () => {
       expect(result.player).toStrictEqual({
         songId: 123,
         playing: true,
-        playTimeSeconds: 75,
-        currentClient: 'some-client',
+        currentTime: 75,
+        master: 'some-client',
+      });
+    });
+
+    describe('when the state update is partial', () => {
+      const actionPartial: ActionStateSetLocal = stateSet({
+        playing: false,
+      });
+
+      it('should update the given part of the state', () => {
+        expect.assertions(1);
+
+        const result = globalReducer(globalReducer(initialState, action), actionPartial);
+
+        expect(result.player).toStrictEqual({
+          songId: 123,
+          playing: false,
+          currentTime: 75,
+          master: 'some-client',
+        });
       });
     });
   });
 
-  const actionClientConnected: ActionClientConnected = {
-    type: ActionTypeRemote.ClientConnected,
-    payload: ['client1', 'client2'],
-  };
-
-  const actionClientDisconnected: ActionClientDisconnected = {
-    type: ActionTypeRemote.ClientDisconnected,
-    payload: ['client1'],
-  };
-
-  describe.each`
-    actionType                             | action                      | expectedClientList
-    ${ActionTypeRemote.ClientConnected}    | ${actionClientConnected}    | ${['client1', 'client2']}
-    ${ActionTypeRemote.ClientDisconnected} | ${actionClientDisconnected} | ${['client1']}
-  `('$actionType', ({ action, expectedClientList }) => {
+  describe(ActionTypeLocal.NameSet, () => {
+    it('should set the name', () => {
+      expect.assertions(1);
+
+      expect(globalReducer(initialState, nameSet('foo')).myClientName).toBe('foo');
+    });
+  });
+
+  describe(ActionTypeRemote.ClientListUpdated, () => {
+    const action: ActionClientListUpdated = {
+      type: ActionTypeRemote.ClientListUpdated,
+      payload: [
+        {
+          name: 'client1-ab54x',
+          lastPing: 1665912239,
+        },
+        {
+          name: 'client1-ab54x',
+          lastPing: 1665912262,
+        },
+      ],
+    };
+
     it('should update the client list', () => {
       expect.assertions(1);
       const result = globalReducer(initialState, action);
 
-      expect(result.clientList).toStrictEqual(expectedClientList);
+      expect(result.clientList).toStrictEqual(action.payload);
     });
   });
 });

+ 25 - 13
gmus/src/reducer/reducer.ts

@@ -1,33 +1,45 @@
-import { ActionTypeLocal, ActionTypeRemote, AnyAction } from '../actions';
+import { ActionStateSetRemote, ActionTypeLocal, ActionTypeRemote, AnyAction } from '../actions';
+import { isMaster } from '../selectors';
 import { MusicPlayer } from '../types/state';
+import { GlobalState } from './types';
 
-export type GlobalState = {
-  lastAction: AnyAction | null;
-  player: MusicPlayer;
-  clientList: string[];
-};
-
-const nullPlayer: MusicPlayer = {
+export const nullPlayer: MusicPlayer = {
   songId: null,
   playing: false,
-  playTimeSeconds: 0,
-  currentClient: '',
+  currentTime: 0,
+  master: '',
 };
 
 export const initialState: GlobalState = {
   lastAction: null,
   player: nullPlayer,
   clientList: [],
+  myClientName: '',
 };
 
+function onRemoteStateSet(state: GlobalState, action: ActionStateSetRemote): GlobalState {
+  const isAndWillBeMaster =
+    isMaster(state) && !(action.payload?.master && action.payload.master !== state.player.master);
+
+  const currentTime = isAndWillBeMaster
+    ? state.player.currentTime
+    : action.payload?.currentTime ?? state.player.currentTime;
+
+  return { ...state, player: { ...(action.payload ?? nullPlayer), currentTime } };
+}
+
 export function globalReducer(state: GlobalState, action: AnyAction): GlobalState {
   switch (action.type) {
     case ActionTypeRemote.StateSet:
+      return onRemoteStateSet(state, action);
+
     case ActionTypeLocal.StateSet:
-      return { ...state, player: action.payload ?? nullPlayer };
+      return { ...state, player: { ...state.player, ...action.payload } };
+
+    case ActionTypeLocal.NameSet:
+      return { ...state, myClientName: action.payload };
 
-    case ActionTypeRemote.ClientConnected:
-    case ActionTypeRemote.ClientDisconnected:
+    case ActionTypeRemote.ClientListUpdated:
       return { ...state, clientList: action.payload };
 
     default:

+ 9 - 0
gmus/src/reducer/types.ts

@@ -0,0 +1,9 @@
+import { AnyAction } from '../actions';
+import { Member, MusicPlayer } from '../types/state';
+
+export type GlobalState = {
+  lastAction: AnyAction | null;
+  player: MusicPlayer;
+  clientList: Member[];
+  myClientName: string;
+};

+ 3 - 0
gmus/src/selectors.ts

@@ -0,0 +1,3 @@
+import { GlobalState } from './reducer/types';
+
+export const isMaster = (state: GlobalState): boolean => state.player.master === state.myClientName;

+ 7 - 2
gmus/src/types/state.ts

@@ -1,6 +1,11 @@
+export type Member = {
+  name: string;
+  lastPing: number;
+};
+
 export type MusicPlayer = {
   songId: number | null;
   playing: boolean;
-  playTimeSeconds: number;
-  currentClient: string;
+  currentTime: number;
+  master: string;
 };

+ 8 - 1
gmus/src/utils/url.spec.ts

@@ -1,4 +1,4 @@
-import { getPubsubUrl } from './url';
+import { getPubsubUrl, getSongUrl } from './url';
 
 describe(getPubsubUrl.name, () => {
   it('should return a websocket URL', () => {
@@ -25,3 +25,10 @@ describe(getPubsubUrl.name, () => {
     });
   });
 });
+
+describe(getSongUrl.name, () => {
+  it('should return a correct URL', () => {
+    expect.assertions(1);
+    expect(getSongUrl(12372)).toBe('http://my-api.url:1234/stream?songid=12372');
+  });
+});

+ 4 - 0
gmus/src/utils/url.ts

@@ -4,3 +4,7 @@ export function getPubsubUrl(): string {
     apiUrl.port ? `:${apiUrl.port}` : ''
   }/pubsub`;
 }
+
+export function getSongUrl(songId: number): string {
+  return `${process.env.REACT_APP_API_URL}/stream?songid=${songId}`;
+}

+ 17 - 0
gmus/yarn.lock

@@ -1500,6 +1500,18 @@
     schema-utils "^2.6.5"
     source-map "^0.7.3"
 
+"@react-hook/latest@^1.0.2":
+  version "1.0.3"
+  resolved "https://registry.yarnpkg.com/@react-hook/latest/-/latest-1.0.3.tgz#c2d1d0b0af8b69ec6e2b3a2412ba0768ac82db80"
+  integrity sha512-dy6duzl+JnAZcDbNTfmaP3xHiKtbXYOaz3G51MGVljh548Y8MWzTr+PHLOfvpypEVW9zwvl+VyKjbWKEVbV1Rg==
+
+"@react-hook/throttle@^2.2.0":
+  version "2.2.0"
+  resolved "https://registry.yarnpkg.com/@react-hook/throttle/-/throttle-2.2.0.tgz#d0402714a06e1ba0bc1da1fdf5c3c5cd0e08d45a"
+  integrity sha512-LJ5eg+yMV8lXtqK3lR+OtOZ2WH/EfWvuiEEu0M3bhR7dZRfTyEJKxH1oK9uyBxiXPtWXiQggWbZirMCXam51tg==
+  dependencies:
+    "@react-hook/latest" "^1.0.2"
+
 "@rollup/plugin-node-resolve@^7.1.1":
   version "7.1.3"
   resolved "https://registry.yarnpkg.com/@rollup/plugin-node-resolve/-/plugin-node-resolve-7.1.3.tgz#80de384edfbd7bfc9101164910f86078151a3eca"
@@ -7444,6 +7456,11 @@ nanoid@^3.1.18:
   resolved "https://registry.yarnpkg.com/nanoid/-/nanoid-3.1.18.tgz#0680db22ab01c372e89209f5d18283d98de3e96d"
   integrity sha512-rndlDjbbHbcV3xi+R2fpJ+PbGMdfBxz5v1fATIQFq0DP64FsicQdwnKLy47K4kZHdRpmQXtz24eGsxQqamzYTA==
 
+nanoid@^3.1.20:
+  version "3.1.20"
+  resolved "https://registry.yarnpkg.com/nanoid/-/nanoid-3.1.20.tgz#badc263c6b1dcf14b71efaa85f6ab4c1d6cfc788"
+  integrity sha512-a1cQNyczgKbLX9jwbS/+d7W8fX/RfgYR7lVWwWOGIPNgK2m0MWvrGF6/m4kk6U3QcFMnZf3RIhL0v2Jgh/0Uxw==
+
 nanomatch@^1.2.9:
   version "1.2.13"
   resolved "https://registry.yarnpkg.com/nanomatch/-/nanomatch-1.2.13.tgz#b87a8aa4fc0de8fe6be88895b38983ff265bd119"

+ 1 - 2
music-player/pkg/server/actions.go

@@ -10,8 +10,7 @@ type ActionType string
 
 const (
   StateSet ActionType = "STATE_SET"
-  ClientConnected ActionType = "CLIENT_CONNECTED"
-  ClientDisconnected ActionType = "CLIENT_DISCONNECTED"
+  ClientListUpdated = "CLIENT_LIST_UPDATED"
 )
 
 type Action struct {

+ 44 - 30
music-player/pkg/server/clients.go

@@ -2,6 +2,7 @@ package server
 
 import (
 	"net/http"
+	"time"
 
 	"github.com/felamaslen/go-music-player/pkg/logger"
 	"github.com/go-redis/redis/v7"
@@ -21,45 +22,55 @@ func endSubscription(sub *redis.PubSub) error {
   return nil
 }
 
-func (c *Client) exposeToNetwork(l *logger.Logger, rdb *redis.Client) error {
-  // Expose the client to all pods running the server
-  if _, err := rdb.SAdd(KEY_CLIENT_NAMES, c.name).Result(); err != nil {
+func publishClientList(l *logger.Logger, rdb *redis.Client) error {
+  clients, err := rdb.ZRangeWithScores(KEY_CLIENT_NAMES, 0, -1).Result()
+  if err != nil {
     return err
   }
 
-  allClients, err := rdb.SMembers(KEY_CLIENT_NAMES).Result()
-  if err != nil {
-    return err
+  var members []*Member
+  for _, m := range(clients) {
+    members = append(members, &Member{
+      Name: m.Member.(string),
+      LastPing: int64(m.Score),
+    })
   }
 
-  if err := publishAction(rdb, &Action{
-    Type: ClientConnected,
-    Payload: allClients,
-  }); err != nil {
-    return err
+  actionClientListUpdated := Action{
+    Type: ClientListUpdated,
+    Payload: members,
   }
 
+  if err := publishAction(rdb, &actionClientListUpdated); err != nil {
+    return err
+  }
   return nil
 }
 
-func (c *Client) disposeFromNetwork(l *logger.Logger, rdb *redis.Client) error {
-  // Make sure other clients know when one goes away
-  if _, err := rdb.SRem(KEY_CLIENT_NAMES, c.name).Result(); err != nil {
+func (c *Client) exposeToNetwork(l *logger.Logger, rdb *redis.Client) error {
+  // Expose the client to all pods running the server
+  now := time.Now().Unix()
+
+  if _, err := rdb.ZAdd(KEY_CLIENT_NAMES, &redis.Z{
+    Score: float64(now),
+    Member: c.name,
+  }).Result(); err != nil {
     return err
   }
-
-  allClients, err := rdb.SMembers(KEY_CLIENT_NAMES).Result()
-  if err != nil {
+  if err := publishClientList(l, rdb); err != nil {
     return err
   }
+  return nil
+}
 
-  if err := publishAction(rdb, &Action{
-    Type: ClientDisconnected,
-    Payload: allClients,
-  }); err != nil {
+func (c *Client) disposeFromNetwork(l *logger.Logger, rdb *redis.Client) error {
+  // Make sure other clients know when one goes away
+  if _, err := rdb.ZRem(KEY_CLIENT_NAMES, c.name).Result(); err != nil {
+    return err
+  }
+  if err := publishClientList(l, rdb); err != nil {
     return err
   }
-
   return nil
 }
 
@@ -68,15 +79,19 @@ func (c *Client) subscribeToMe(l *logger.Logger, rdb *redis.Client) {
   // onward publishing to other pods where necessary, via internal pubsub
 
   for {
-    select {
-    case <- c.closeChan:
+    var actionFromClient Action
+    if err := c.conn.ReadJSON(&actionFromClient); err != nil {
+      l.Verbose("calling close(c.closeChan) %s\n", c.name)
+      close(c.closeChan)
       return
-    default:
-      var actionFromClient Action
-      if err := c.conn.ReadJSON(&actionFromClient); err != nil {
-	return
-      }
+    }
 
+    if actionFromClient.Type == "PING" {
+      c.conn.WriteJSON(Action{
+	Type: "PONG",
+      })
+      c.exposeToNetwork(l, rdb)
+    } else {
       l.Debug("[->Client] %s (%s)\n", actionFromClient.Type, c.name)
 
       actionFromClient.FromClient = &c.name
@@ -101,7 +116,6 @@ func (c *Client) onConnect(l *logger.Logger, rdb *redis.Client) error {
 
 func (c *Client) onDisconnect(l *logger.Logger, rdb *redis.Client) error {
   l.Verbose("[Client disconnected] %s\n", c.name)
-  close(c.closeChan)
 
   if err := c.disposeFromNetwork(l, rdb); err != nil {
     l.Error("Error disposing client from network: %v\n", err)

+ 4 - 2
music-player/pkg/server/constants.go

@@ -2,6 +2,8 @@ package server
 
 const TOPIC_BROADCAST = "GMUSIC_BROADCAST"
 
-const KEY_CLIENT_NAMES = "GMUSIC_CLIENTS"
-
 const CLIENT_QUERY_NAME = "client-name"
+
+// Redis constants
+const KEY_CLIENT_NAMES = "GMUSIC_CLIENTS"
+const CLIENT_TTL_SEC = 30

+ 24 - 9
music-player/pkg/server/pubsub.go

@@ -2,7 +2,9 @@ package server
 
 import (
 	"encoding/json"
+	"fmt"
 	"net/http"
+	"time"
 
 	"github.com/felamaslen/go-music-player/pkg/logger"
 	"github.com/go-redis/redis/v7"
@@ -44,16 +46,19 @@ func handleClientSubscription(thisPodClients *map[string]*Client) RouteHandler {
 
     defer conn.Close()
 
-    conn.SetCloseHandler(func(code int, text string) error {
-      if _, ok := (*thisPodClients)[client.name]; ok {
-	delete(*thisPodClients, client.name)
+    go func() {
+      for {
+	select {
+	case <- client.closeChan:
+	  l.Verbose("Caught closeChan call, closing... %s\n", client.name)
+	  if _, ok := (*thisPodClients)[client.name]; ok {
+	    delete(*thisPodClients, client.name)
+	  }
+	  client.onDisconnect(l, rdb)
+	  return
+	}
       }
-
-      if err := client.onDisconnect(l, rdb); err != nil {
-	return err
-      }
-      return nil
-    })
+    }()
 
     if err := client.onConnect(l, rdb); err != nil {
       l.Error("Error connecting client: %v\n", err)
@@ -101,9 +106,19 @@ func subscribeToBroadcast(
   }
 }
 
+func pruneDisappearedClients(l *logger.Logger, rdb *redis.Client) {
+  for {
+    now := time.Now().Unix()
+    rdb.ZRemRangeByScore(KEY_CLIENT_NAMES, "0", fmt.Sprintf("%d", now - CLIENT_TTL_SEC))
+
+    time.Sleep(CLIENT_TTL_SEC * time.Second)
+  }
+}
+
 func initPubsub(l *logger.Logger, rdb *redis.Client, router *mux.Router) {
   thisPodClients := make(map[string]*Client)
   go subscribeToBroadcast(l, rdb, &thisPodClients)
+  go pruneDisappearedClients(l, rdb)
 
   router.Path("/pubsub").Methods("GET").HandlerFunc(
     routeHandler(l, rdb, handleClientSubscription(&thisPodClients)),

+ 18 - 8
music-player/pkg/server/state.go

@@ -10,17 +10,27 @@ type Client struct {
   closeChan chan bool
 }
 
-// This state lives on the server and is common to all clients.
-// It describes the state of the music player, including who is currently
-// responsible for playing the sound.
-// Only one client is allowed to play the music at any one time (this could change later).
-// Each client should have a fairly up-to-date (~2s) copy of this state, in order to
-// accurately reflect the current state to the frontend.
+type Member struct {
+  Name string 		`json:"name"`
+  LastPing int64 	`json:"lastPing"`
+}
+
+// Except for the client list, the application is stateless server-side.
+// The source of truth for the current state of the player is that of the
+// master client.
+//
+// If more than one client thinks that they are master, whichever sends
+// an action across first should cause the other to obey the instruction
+// and treat the first as master.
+//
+// The master client is responsible for:
+// 1. Playing the music
+// 2. Keeping the server updated regularly about the current state
 
 type MusicPlayer struct {
   SongId int 		`json:"songId"`
   Playing bool 		`json:"playing"`
-  PlayTimeSeconds int 	`json:"playTimeSeconds"`
+  PlayTimeSeconds int 	`json:"currentTime"`
 
-  CurrentClient string 	`json:"currentClient"`
+  Master string 	`json:"master"`
 }