package server import ( "encoding/json" "fmt" "log" "net/http" "github.com/felamaslen/go-music-player/pkg/config" "github.com/felamaslen/go-music-player/pkg/logger" "github.com/go-redis/redis/v7" "github.com/gorilla/mux" "github.com/gorilla/websocket" ) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } func getClientList(l *logger.Logger, rdb *redis.Client) []string { result, err := rdb.SMembers(KEY_CLIENT_NAMES).Result() if err != nil { l.Fatal("Error fetching client list: %v\n", err) } return result } func onClientSubscribe( l *logger.Logger, rdb *redis.Client, thisPodClients *map[string]*Client, ) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { clientName := getClientNameFromRequest(r) l.Debug("got request, name=%v\n", clientName) conn, err := upgrader.Upgrade(w, r, nil) if err != nil { l.Error("Error upgrading client: %v\n", err) return } client := Client{ name: clientName, conn: conn, disconnected: make(chan bool), } (*thisPodClients)[client.name] = &client err = client.onConnect(l, rdb) if err != nil { l.Error("Error connecting client: %v\n", err) } closeChan := make(chan bool) conn.SetCloseHandler(func(code int, text string) error { if err := client.onDisconnect(l, rdb); err != nil { return err } close(closeChan) return nil }) defer conn.Close() // Subscribe to messages from the client for { select { case <- closeChan: return default: var actionFromClient action if err := conn.ReadJSON(&actionFromClient); err != nil { l.Error("Error reading message from client: %v\n", err) return } l.Debug("Received action from client, action=%v\n", actionFromClient) switch actionFromClient.ActionType { case ACTION_TYPE_STATE_SET: var updatedState MusicPlayer payload := []byte(actionFromClient.Payload.(string)) if err := json.Unmarshal(payload, &updatedState); err != nil { l.Error("Error parsing action from client: %v\n", err) } if _, err := rdb.Publish(TOPIC_STATE, payload).Result(); err != nil { l.Error("Error publishing to state topic: %v\n", err) } } } } } } func broadcastClientList( l *logger.Logger, rdb *redis.Client, thisPodClients *map[string]*Client, ) { subClientsChanged := rdb.Subscribe( TOPIC_CLIENT_CONNECTED, TOPIC_CLIENT_DISCONNECTED, ) go func() { for { select { case _, ok := <- subClientsChanged.Channel(): if !ok { return } allClientNames := getClientList(l, rdb) for _, client := range(*thisPodClients) { client.conn.WriteJSON(action{ ActionType: ACTION_TYPE_CLIENTS_UPDATED, Payload: allClientNames, }) } } } }() } func StartServer() { conf := config.GetConfig() l := logger.CreateLogger(conf.LogLevel) rdb := redis.NewClient(&redis.Options{ Addr: conf.RedisUrl }) defer rdb.Close() thisPodClients := make(map[string]*Client) broadcastClientList(l, rdb, &thisPodClients) router := mux.NewRouter() router.Path("/pubsub").Methods("GET").HandlerFunc(onClientSubscribe( l, rdb, &thisPodClients, )) port := config.GetConfig().Port l.Info("Starting server on port %d\n", port) log.Fatal(http.ListenAndServe(fmt.Sprintf("localhost:%d", port), router)) }