| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- package server
- import (
- "encoding/json"
- "fmt"
- "log"
- "net/http"
- "github.com/felamaslen/go-music-player/pkg/config"
- "github.com/felamaslen/go-music-player/pkg/logger"
- "github.com/go-redis/redis/v7"
- "github.com/gorilla/mux"
- "github.com/gorilla/websocket"
- )
- var upgrader = websocket.Upgrader{
- CheckOrigin: func(r *http.Request) bool {
- return true
- },
- }
- func getClientList(l *logger.Logger, rdb *redis.Client) []string {
- result, err := rdb.SMembers(KEY_CLIENT_NAMES).Result()
- if err != nil {
- l.Fatal("Error fetching client list: %v\n", err)
- }
- return result
- }
- func onClientSubscribe(
- l *logger.Logger,
- rdb *redis.Client,
- thisPodClients *map[string]*Client,
- ) func(http.ResponseWriter, *http.Request) {
- return func(w http.ResponseWriter, r *http.Request) {
- clientName := getClientNameFromRequest(r)
- l.Debug("got request, name=%v\n", clientName)
- conn, err := upgrader.Upgrade(w, r, nil)
- if err != nil {
- l.Error("Error upgrading client: %v\n", err)
- return
- }
- client := Client{
- name: clientName,
- conn: conn,
- disconnected: make(chan bool),
- }
- (*thisPodClients)[client.name] = &client
- err = client.onConnect(l, rdb)
- if err != nil {
- l.Error("Error connecting client: %v\n", err)
- }
- closeChan := make(chan bool)
- conn.SetCloseHandler(func(code int, text string) error {
- if err := client.onDisconnect(l, rdb); err != nil {
- return err
- }
- close(closeChan)
- return nil
- })
- defer conn.Close()
- // Subscribe to messages from the client
- for {
- select {
- case <- closeChan:
- return
- default:
- var actionFromClient action
- if err := conn.ReadJSON(&actionFromClient); err != nil {
- l.Error("Error reading message from client: %v\n", err)
- return
- }
- l.Debug("Received action from client, action=%v\n", actionFromClient)
- switch actionFromClient.ActionType {
- case ACTION_TYPE_STATE_SET:
- var updatedState MusicPlayer
- payload := []byte(actionFromClient.Payload.(string))
- if err := json.Unmarshal(payload, &updatedState); err != nil {
- l.Error("Error parsing action from client: %v\n", err)
- }
- if _, err := rdb.Publish(TOPIC_STATE, payload).Result(); err != nil {
- l.Error("Error publishing to state topic: %v\n", err)
- }
- }
- }
- }
- }
- }
- func broadcastClientList(
- l *logger.Logger,
- rdb *redis.Client,
- thisPodClients *map[string]*Client,
- ) {
- subClientsChanged := rdb.Subscribe(
- TOPIC_CLIENT_CONNECTED,
- TOPIC_CLIENT_DISCONNECTED,
- )
- go func() {
- for {
- select {
- case _, ok := <- subClientsChanged.Channel():
- if !ok {
- return
- }
- allClientNames := getClientList(l, rdb)
- for _, client := range(*thisPodClients) {
- client.conn.WriteJSON(action{
- ActionType: ACTION_TYPE_CLIENTS_UPDATED,
- Payload: allClientNames,
- })
- }
- }
- }
- }()
- }
- func StartServer() {
- conf := config.GetConfig()
- l := logger.CreateLogger(conf.LogLevel)
- rdb := redis.NewClient(&redis.Options{ Addr: conf.RedisUrl })
- defer rdb.Close()
- thisPodClients := make(map[string]*Client)
- broadcastClientList(l, rdb, &thisPodClients)
- router := mux.NewRouter()
- router.Path("/pubsub").Methods("GET").HandlerFunc(onClientSubscribe(
- l,
- rdb,
- &thisPodClients,
- ))
- port := config.GetConfig().Port
- l.Info("Starting server on port %d\n", port)
- log.Fatal(http.ListenAndServe(fmt.Sprintf("localhost:%d", port), router))
- }
|