Преглед изворни кода

feat: refactored server with generic pubsub actions

Fela Maslen пре 5 година
родитељ
комит
53e94a4caf

+ 46 - 0
music-player/pkg/server/actions.go

@@ -0,0 +1,46 @@
+package server
+
+import (
+	"encoding/json"
+
+	"github.com/go-redis/redis/v7"
+)
+
+type ActionType string
+
+const (
+  StateSet ActionType = "STATE_SET"
+  ClientConnected ActionType = "CLIENT_CONNECTED"
+  ClientDisconnected ActionType = "CLIENT_DISCONNECTED"
+)
+
+type Action struct {
+  Type ActionType 	`json:"type"`
+  FromClient *string 	`json:"fromClient"`
+  Payload interface{} 	`json:"payload"`
+}
+
+func broadcastAction(thisPodClients *map[string]*Client, action *Action) []error {
+  var errors []error
+
+  for _, client := range(*thisPodClients) {
+    if client.name != *action.FromClient {
+      if err := client.conn.WriteJSON(action); err != nil {
+	errors = append(errors, err)
+      }
+    }
+  }
+
+  return errors
+}
+
+func publishAction(rdb *redis.Client, action *Action) error {
+  payload, err := json.Marshal(action)
+  if err != nil {
+    return err
+  }
+  if _, err := rdb.Publish(TOPIC_BROADCAST, payload).Result(); err != nil {
+    return err
+  }
+  return nil
+}

+ 69 - 32
music-player/pkg/server/clients.go

@@ -1,7 +1,6 @@
 package server
 
 import (
-	"encoding/json"
 	"net/http"
 
 	"github.com/felamaslen/go-music-player/pkg/logger"
@@ -22,54 +21,92 @@ func endSubscription(sub *redis.PubSub) error {
   return nil
 }
 
-func (c *Client) onConnect(l *logger.Logger, rdb *redis.Client) error {
+func (c *Client) exposeToNetwork(l *logger.Logger, rdb *redis.Client) error {
+  // Expose the client to all pods running the server
   if _, err := rdb.SAdd(KEY_CLIENT_NAMES, c.name).Result(); err != nil {
-    l.Fatal("Error adding client to set: %v\n", err)
+    return err
+  }
+
+  allClients, err := rdb.SMembers(KEY_CLIENT_NAMES).Result()
+  if err != nil {
+    return err
+  }
+
+  if err := publishAction(rdb, &Action{
+    Type: ClientConnected,
+    Payload: allClients,
+  }); err != nil {
+    return err
   }
 
-  rdb.Publish(TOPIC_CLIENT_CONNECTED, c.name)
+  return nil
+}
 
-  l.Printf("subscribing to TOPIC_STATE\n")
-  stateSub := rdb.Subscribe(TOPIC_STATE)
+func (c *Client) disposeFromNetwork(l *logger.Logger, rdb *redis.Client) error {
+  // Make sure other clients know when one goes away
+  if _, err := rdb.SRem(KEY_CLIENT_NAMES, c.name).Result(); err != nil {
+    return err
+  }
 
-  go func() {
-    for {
-      select {
-      case <- c.disconnected:
-	endSubscription(stateSub)
-	return
+  allClients, err := rdb.SMembers(KEY_CLIENT_NAMES).Result()
+  if err != nil {
+    return err
+  }
 
-      case msg, ok := <- stateSub.Channel():
-	l.Debug("Received message from state pubsub: %v\n", msg)
-	if !ok {
-	  return
-	}
+  if err := publishAction(rdb, &Action{
+    Type: ClientDisconnected,
+    Payload: allClients,
+  }); err != nil {
+    return err
+  }
+
+  return nil
+}
+
+func (c *Client) subscribeToMe(l *logger.Logger, rdb *redis.Client) {
+  // Subscribe this pod to messages from the client. This pod is responsible for
+  // onward publishing to other pods where necessary, via internal pubsub
 
-	var state MusicPlayer
-	if err := json.Unmarshal([]byte(msg.Payload), &state); err != nil {
-	  l.Error("Error parsing state from pubsub: %v\n", err)
-	  return
+  for {
+    select {
+    case <- c.closeChan:
+      return
+    default:
+      var actionFromClient Action
+      if err := c.conn.ReadJSON(&actionFromClient); err != nil {
+	l.Error("Error reading message from client: %v\n", err)
+
+      } else {
+	l.Debug("[->Client] %s (%s)\n", actionFromClient.Type, c.name)
+
+	actionFromClient.FromClient = &c.name
+
+	if err := publishAction(rdb, &actionFromClient); err != nil {
+	  l.Error("Error publishing action from client: %v\n", err)
 	}
-	c.conn.WriteJSON(action{
-	  ActionType: ACTION_TYPE_STATE_SET,
-	  Payload: state,
-	})
       }
     }
-  }()
+  }
+}
+
+func (c *Client) onConnect(l *logger.Logger, rdb *redis.Client) error {
+  if err := c.exposeToNetwork(l, rdb); err != nil {
+    l.Error("Error exposing new client to network: %v\n", err)
+    return err
+  }
+
+  go c.subscribeToMe(l, rdb)
 
   return nil
 }
 
 func (c *Client) onDisconnect(l *logger.Logger, rdb *redis.Client) error {
-  close(c.disconnected)
+  close(c.closeChan)
 
-  _, err := rdb.SRem(KEY_CLIENT_NAMES, c.name).Result()
-  if err != nil {
-    l.Fatal("Error removing client from set: %v\n", err)
+  if err := c.disposeFromNetwork(l, rdb); err != nil {
+    l.Error("Error disposing client from network: %v\n", err)
+    return err
   }
 
-  rdb.Publish(TOPIC_CLIENT_DISCONNECTED, c.name)
-
   return nil
 }

+ 1 - 7
music-player/pkg/server/constants.go

@@ -1,13 +1,7 @@
 package server
 
-const TOPIC_STATE = "GMUSIC_STATE"
-
-const TOPIC_CLIENT_CONNECTED = "GMUSIC_CLIENT_CONNECTED"
-const TOPIC_CLIENT_DISCONNECTED = "GMUSIC_CLIENT_DISCONNECTED"
+const TOPIC_BROADCAST = "GMUSIC_BROADCAST"
 
 const KEY_CLIENT_NAMES = "GMUSIC_CLIENTS"
 
 const CLIENT_QUERY_NAME = "client-name"
-
-const ACTION_TYPE_STATE_SET = "STATE_SET"
-const ACTION_TYPE_CLIENTS_UPDATED = "CLIENTS_UPDATED"

+ 108 - 0
music-player/pkg/server/pubsub.go

@@ -0,0 +1,108 @@
+package server
+
+import (
+	"encoding/json"
+	"net/http"
+
+	"github.com/felamaslen/go-music-player/pkg/config"
+	"github.com/felamaslen/go-music-player/pkg/logger"
+	"github.com/go-redis/redis/v7"
+	"github.com/gorilla/mux"
+)
+
+func handleClientSubscription(
+  l *logger.Logger,
+  rdb *redis.Client,
+  thisPodClients *map[string]*Client,
+) func(http.ResponseWriter, *http.Request) {
+
+  return func(w http.ResponseWriter, r *http.Request) {
+    clientName := getClientNameFromRequest(r)
+    if len(clientName) == 0 {
+      w.WriteHeader(400)
+      w.Write([]byte("Must set client name in query"))
+
+      return
+    }
+
+    l.Debug("got request, name=%v\n", clientName)
+
+    conn, err := upgrader.Upgrade(w, r, nil)
+    if err != nil {
+      l.Error("Error upgrading client: %v\n", err)
+      return
+    }
+
+    client := Client{
+      name: clientName,
+      conn: conn,
+      closeChan: make(chan bool),
+    }
+
+    (*thisPodClients)[client.name] = &client
+
+    defer conn.Close()
+
+    conn.SetCloseHandler(func(code int, text string) error {
+      if _, ok := (*thisPodClients)[client.name]; ok {
+	delete(*thisPodClients, client.name)
+      }
+
+      if err := client.onDisconnect(l, rdb); err != nil {
+	return err
+      }
+      return nil
+    })
+
+    if err := client.onConnect(l, rdb); err != nil {
+      l.Error("Error connecting client: %v\n", err)
+    }
+  }
+}
+
+func subscribeToBroadcast(
+  l *logger.Logger,
+  rdb *redis.Client,
+  thisPodClients *map[string]*Client,
+) {
+  // Subscribe all of this pod's clients to messages from any pod, via internal pubsub
+  subscription := rdb.Subscribe(TOPIC_BROADCAST)
+
+  for {
+    select {
+    case msg, ok := <- subscription.Channel():
+      if !ok {
+	return
+      }
+
+      var actionFromPubsub Action
+
+      if err := json.Unmarshal([]byte(msg.Payload), &actionFromPubsub); err != nil {
+	l.Error("Invalid action from pubsub: %v\n", err)
+
+      } else {
+	if actionFromPubsub.FromClient == nil {
+	  l.Debug("[<-Server] %s\n", actionFromPubsub.Type)
+	} else {
+	  l.Debug("[<-Client] %s (%s)\n", actionFromPubsub.Type, *actionFromPubsub.FromClient)
+	}
+
+	errors := broadcastAction(thisPodClients, &actionFromPubsub)
+
+	if len(errors) > 0 {
+	  l.Warn("Error broadcasting: %v\n", errors)
+	}
+      }
+    }
+  }
+}
+
+func initPubsub(l *logger.Logger, router *mux.Router) {
+  rdb := redis.NewClient(&redis.Options{ Addr: config.GetConfig().RedisUrl })
+  defer rdb.Close()
+
+  thisPodClients := make(map[string]*Client)
+  go subscribeToBroadcast(l, rdb, &thisPodClients)
+
+  router.Path("/pubsub").Methods("GET").HandlerFunc(handleClientSubscription(l, rdb, &thisPodClients))
+}

+ 2 - 130
music-player/pkg/server/server.go

@@ -1,14 +1,12 @@
 package server
 
 import (
-	"encoding/json"
 	"fmt"
 	"log"
 	"net/http"
 
 	"github.com/felamaslen/go-music-player/pkg/config"
 	"github.com/felamaslen/go-music-player/pkg/logger"
-	"github.com/go-redis/redis/v7"
 	"github.com/gorilla/mux"
 	"github.com/gorilla/websocket"
 )
@@ -19,141 +17,15 @@ var upgrader = websocket.Upgrader{
   },
 }
 
-func getClientList(l *logger.Logger, rdb *redis.Client) []string {
-  result, err := rdb.SMembers(KEY_CLIENT_NAMES).Result()
-  if err != nil {
-    l.Fatal("Error fetching client list: %v\n", err)
-  }
-
-  return result
-}
-
-func onClientSubscribe(
-  l *logger.Logger,
-  rdb *redis.Client,
-  thisPodClients *map[string]*Client,
-) func(http.ResponseWriter, *http.Request) {
-
-  return func(w http.ResponseWriter, r *http.Request) {
-    clientName := getClientNameFromRequest(r)
-
-    l.Debug("got request, name=%v\n", clientName)
-
-    conn, err := upgrader.Upgrade(w, r, nil)
-    if err != nil {
-      l.Error("Error upgrading client: %v\n", err)
-      return
-    }
-
-    client := Client{
-      name: clientName,
-      conn: conn,
-      disconnected: make(chan bool),
-    }
-
-    (*thisPodClients)[client.name] = &client
-
-    err = client.onConnect(l, rdb)
-    if err != nil {
-      l.Error("Error connecting client: %v\n", err)
-    }
-
-    closeChan := make(chan bool)
-
-    conn.SetCloseHandler(func(code int, text string) error {
-      if err := client.onDisconnect(l, rdb); err != nil {
-	return err
-      }
-      close(closeChan)
-      return nil
-    })
-
-    defer conn.Close()
-
-    // Subscribe to messages from the client
-    for {
-      select {
-      case <- closeChan:
-	return
-      default:
-	var actionFromClient action
-	if err := conn.ReadJSON(&actionFromClient); err != nil {
-	  l.Error("Error reading message from client: %v\n", err)
-	  return
-	}
-
-	l.Debug("Received action from client, action=%v\n", actionFromClient)
-
-	switch actionFromClient.ActionType {
-	case ACTION_TYPE_STATE_SET:
-
-	  var updatedState MusicPlayer
-	  payload := []byte(actionFromClient.Payload.(string))
-
-	  if err := json.Unmarshal(payload, &updatedState); err != nil {
-	    l.Error("Error parsing action from client: %v\n", err)
-	  }
-
-	  if _, err := rdb.Publish(TOPIC_STATE, payload).Result(); err != nil {
-	    l.Error("Error publishing to state topic: %v\n", err)
-	  }
-	}
-      }
-    }
-  }
-}
-
-func broadcastClientList(
-  l *logger.Logger,
-  rdb *redis.Client,
-  thisPodClients *map[string]*Client,
-) {
-  subClientsChanged := rdb.Subscribe(
-    TOPIC_CLIENT_CONNECTED,
-    TOPIC_CLIENT_DISCONNECTED,
-  )
-
-  go func() {
-    for {
-      select {
-      case _, ok := <- subClientsChanged.Channel():
-	if !ok {
-	  return
-	}
-
-	allClientNames := getClientList(l, rdb)
-
-	for _, client := range(*thisPodClients) {
-	  client.conn.WriteJSON(action{
-	    ActionType: ACTION_TYPE_CLIENTS_UPDATED,
-	    Payload: allClientNames,
-	  })
-	}
-      }
-    }
-  }()
-}
-
 func StartServer() {
   conf := config.GetConfig()
   l := logger.CreateLogger(conf.LogLevel)
 
-  rdb := redis.NewClient(&redis.Options{ Addr: conf.RedisUrl })
-  defer rdb.Close()
-
-  thisPodClients := make(map[string]*Client)
-
-  broadcastClientList(l, rdb, &thisPodClients)
-
   router := mux.NewRouter()
 
-  router.Path("/pubsub").Methods("GET").HandlerFunc(onClientSubscribe(
-    l,
-    rdb,
-    &thisPodClients,
-  ))
+  initPubsub(l, router)
 
-  port := config.GetConfig().Port
+  port := conf.Port
 
   l.Info("Starting server on port %d\n", port)
   log.Fatal(http.ListenAndServe(fmt.Sprintf("localhost:%d", port), router))

+ 1 - 6
music-player/pkg/server/state.go

@@ -7,7 +7,7 @@ import (
 type Client struct {
   name string
   conn *websocket.Conn
-  disconnected chan bool
+  closeChan chan bool
 }
 
 // This state lives on the server and is common to all clients.
@@ -24,8 +24,3 @@ type MusicPlayer struct {
 
   CurrentClient string 	`json:"currentClient"`
 }
-
-type action struct {
-  ActionType string 	`json:"type"`
-  Payload interface{} 	`json:"payload"`
-}