pubsub.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package server
  2. import (
  3. "encoding/json"
  4. "net/http"
  5. "github.com/felamaslen/go-music-player/pkg/logger"
  6. "github.com/go-redis/redis/v7"
  7. "github.com/gorilla/mux"
  8. "github.com/gorilla/websocket"
  9. )
  10. var upgrader = websocket.Upgrader{
  11. CheckOrigin: func(r *http.Request) bool {
  12. return true
  13. },
  14. }
  15. func handleClientSubscription(
  16. l *logger.Logger,
  17. rdb *redis.Client,
  18. thisPodClients *map[string]*Client,
  19. ) func(http.ResponseWriter, *http.Request) {
  20. return func(w http.ResponseWriter, r *http.Request) {
  21. clientName := getClientNameFromRequest(r)
  22. if len(clientName) == 0 {
  23. w.WriteHeader(400)
  24. w.Write([]byte("Must set client name in query"))
  25. return
  26. }
  27. l.Verbose("[Client connected] %s\n", clientName)
  28. conn, err := upgrader.Upgrade(w, r, nil)
  29. if err != nil {
  30. l.Error("Error upgrading client: %v\n", err)
  31. return
  32. }
  33. client := Client{
  34. name: clientName,
  35. conn: conn,
  36. closeChan: make(chan bool),
  37. }
  38. (*thisPodClients)[client.name] = &client
  39. defer conn.Close()
  40. conn.SetCloseHandler(func(code int, text string) error {
  41. if _, ok := (*thisPodClients)[client.name]; ok {
  42. delete(*thisPodClients, client.name)
  43. }
  44. if err := client.onDisconnect(l, rdb); err != nil {
  45. return err
  46. }
  47. return nil
  48. })
  49. if err := client.onConnect(l, rdb); err != nil {
  50. l.Error("Error connecting client: %v\n", err)
  51. }
  52. }
  53. }
  54. func subscribeToBroadcast(
  55. l *logger.Logger,
  56. rdb *redis.Client,
  57. thisPodClients *map[string]*Client,
  58. ) {
  59. // Subscribe all of this pod's clients to messages from any pod, via internal pubsub
  60. subscription := rdb.Subscribe(TOPIC_BROADCAST)
  61. for {
  62. select {
  63. case msg, ok := <- subscription.Channel():
  64. if !ok {
  65. return
  66. }
  67. var actionFromPubsub Action
  68. if err := json.Unmarshal([]byte(msg.Payload), &actionFromPubsub); err != nil {
  69. l.Error("Invalid action from pubsub: %v\n", err)
  70. } else {
  71. if actionFromPubsub.FromClient == nil {
  72. l.Debug("[<-Server] %s\n", actionFromPubsub.Type)
  73. } else {
  74. l.Debug("[<-Client] %s (%s)\n", actionFromPubsub.Type, *actionFromPubsub.FromClient)
  75. }
  76. errors := broadcastAction(thisPodClients, &actionFromPubsub)
  77. if len(errors) > 0 {
  78. l.Warn("Error broadcasting: %v\n", errors)
  79. }
  80. }
  81. }
  82. }
  83. }
  84. func initPubsub(l *logger.Logger, rdb *redis.Client, router *mux.Router) {
  85. thisPodClients := make(map[string]*Client)
  86. go subscribeToBroadcast(l, rdb, &thisPodClients)
  87. router.Path("/pubsub").Methods("GET").HandlerFunc(handleClientSubscription(l, rdb, &thisPodClients))
  88. }