server.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "net/http"
  7. "github.com/felamaslen/go-music-player/pkg/config"
  8. "github.com/felamaslen/go-music-player/pkg/logger"
  9. "github.com/go-redis/redis/v7"
  10. "github.com/gorilla/mux"
  11. "github.com/gorilla/websocket"
  12. )
  13. var upgrader = websocket.Upgrader{
  14. CheckOrigin: func(r *http.Request) bool {
  15. return true
  16. },
  17. }
  18. func getClientList(l *logger.Logger, rdb *redis.Client) []string {
  19. result, err := rdb.SMembers(KEY_CLIENT_NAMES).Result()
  20. if err != nil {
  21. l.Fatal("Error fetching client list: %v\n", err)
  22. }
  23. return result
  24. }
  25. func onClientSubscribe(
  26. l *logger.Logger,
  27. rdb *redis.Client,
  28. thisPodClients *map[string]*Client,
  29. ) func(http.ResponseWriter, *http.Request) {
  30. return func(w http.ResponseWriter, r *http.Request) {
  31. clientName := getClientNameFromRequest(r)
  32. l.Debug("got request, name=%v\n", clientName)
  33. conn, err := upgrader.Upgrade(w, r, nil)
  34. if err != nil {
  35. l.Error("Error upgrading client: %v\n", err)
  36. return
  37. }
  38. client := Client{
  39. name: clientName,
  40. conn: conn,
  41. disconnected: make(chan bool),
  42. }
  43. (*thisPodClients)[client.name] = &client
  44. err = client.onConnect(l, rdb)
  45. if err != nil {
  46. l.Error("Error connecting client: %v\n", err)
  47. }
  48. closeChan := make(chan bool)
  49. conn.SetCloseHandler(func(code int, text string) error {
  50. if err := client.onDisconnect(l, rdb); err != nil {
  51. return err
  52. }
  53. close(closeChan)
  54. return nil
  55. })
  56. defer conn.Close()
  57. // Subscribe to messages from the client
  58. for {
  59. select {
  60. case <- closeChan:
  61. return
  62. default:
  63. var actionFromClient action
  64. if err := conn.ReadJSON(&actionFromClient); err != nil {
  65. l.Error("Error reading message from client: %v\n", err)
  66. return
  67. }
  68. l.Debug("Received action from client, action=%v\n", actionFromClient)
  69. switch actionFromClient.ActionType {
  70. case ACTION_TYPE_STATE_SET:
  71. var updatedState MusicPlayer
  72. payload := []byte(actionFromClient.Payload.(string))
  73. if err := json.Unmarshal(payload, &updatedState); err != nil {
  74. l.Error("Error parsing action from client: %v\n", err)
  75. }
  76. if _, err := rdb.Publish(TOPIC_STATE, payload).Result(); err != nil {
  77. l.Error("Error publishing to state topic: %v\n", err)
  78. }
  79. }
  80. }
  81. }
  82. }
  83. }
  84. func broadcastClientList(
  85. l *logger.Logger,
  86. rdb *redis.Client,
  87. thisPodClients *map[string]*Client,
  88. ) {
  89. subClientsChanged := rdb.Subscribe(
  90. TOPIC_CLIENT_CONNECTED,
  91. TOPIC_CLIENT_DISCONNECTED,
  92. )
  93. go func() {
  94. for {
  95. select {
  96. case _, ok := <- subClientsChanged.Channel():
  97. if !ok {
  98. return
  99. }
  100. allClientNames := getClientList(l, rdb)
  101. for _, client := range(*thisPodClients) {
  102. client.conn.WriteJSON(action{
  103. ActionType: ACTION_TYPE_CLIENTS_UPDATED,
  104. Payload: allClientNames,
  105. })
  106. }
  107. }
  108. }
  109. }()
  110. }
  111. func StartServer() {
  112. conf := config.GetConfig()
  113. l := logger.CreateLogger(conf.LogLevel)
  114. rdb := redis.NewClient(&redis.Options{ Addr: conf.RedisUrl })
  115. defer rdb.Close()
  116. thisPodClients := make(map[string]*Client)
  117. broadcastClientList(l, rdb, &thisPodClients)
  118. router := mux.NewRouter()
  119. router.Path("/pubsub").Methods("GET").HandlerFunc(onClientSubscribe(
  120. l,
  121. rdb,
  122. &thisPodClients,
  123. ))
  124. port := config.GetConfig().Port
  125. l.Info("Starting server on port %d\n", port)
  126. log.Fatal(http.ListenAndServe(fmt.Sprintf("localhost:%d", port), router))
  127. }