pubsub.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package server
  2. import (
  3. "encoding/json"
  4. "net/http"
  5. "github.com/felamaslen/go-music-player/pkg/logger"
  6. "github.com/go-redis/redis/v7"
  7. "github.com/gorilla/mux"
  8. "github.com/gorilla/websocket"
  9. )
  10. var upgrader = websocket.Upgrader{
  11. CheckOrigin: func(r *http.Request) bool {
  12. return true
  13. },
  14. }
  15. func handleClientSubscription(thisPodClients *map[string]*Client) RouteHandler {
  16. return func(l *logger.Logger, rdb *redis.Client, w http.ResponseWriter, r *http.Request) error {
  17. clientName := getClientNameFromRequest(r)
  18. if len(clientName) == 0 {
  19. w.WriteHeader(400)
  20. w.Write([]byte("Must set client name in query"))
  21. return nil
  22. }
  23. l.Verbose("[Client connected] %s\n", clientName)
  24. conn, err := upgrader.Upgrade(w, r, nil)
  25. if err != nil {
  26. w.WriteHeader(400)
  27. w.Write([]byte("Incorrect client config to upgrade WS"))
  28. return nil
  29. }
  30. client := Client{
  31. name: clientName,
  32. conn: conn,
  33. closeChan: make(chan bool),
  34. }
  35. (*thisPodClients)[client.name] = &client
  36. defer conn.Close()
  37. conn.SetCloseHandler(func(code int, text string) error {
  38. if _, ok := (*thisPodClients)[client.name]; ok {
  39. delete(*thisPodClients, client.name)
  40. }
  41. if err := client.onDisconnect(l, rdb); err != nil {
  42. return err
  43. }
  44. return nil
  45. })
  46. if err := client.onConnect(l, rdb); err != nil {
  47. l.Error("Error connecting client: %v\n", err)
  48. return err
  49. }
  50. return nil
  51. }
  52. }
  53. func subscribeToBroadcast(
  54. l *logger.Logger,
  55. rdb *redis.Client,
  56. thisPodClients *map[string]*Client,
  57. ) {
  58. // Subscribe all of this pod's clients to messages from any pod, via internal pubsub
  59. subscription := rdb.Subscribe(TOPIC_BROADCAST)
  60. for {
  61. select {
  62. case msg, ok := <- subscription.Channel():
  63. if !ok {
  64. return
  65. }
  66. var actionFromPubsub Action
  67. if err := json.Unmarshal([]byte(msg.Payload), &actionFromPubsub); err != nil {
  68. l.Error("Invalid action from pubsub: %v\n", err)
  69. } else {
  70. if actionFromPubsub.FromClient == nil {
  71. l.Debug("[<-Server] %s\n", actionFromPubsub.Type)
  72. } else {
  73. l.Debug("[<-Client] %s (%s)\n", actionFromPubsub.Type, *actionFromPubsub.FromClient)
  74. }
  75. errors := broadcastAction(thisPodClients, &actionFromPubsub)
  76. if len(errors) > 0 {
  77. l.Warn("Error broadcasting: %v\n", errors)
  78. }
  79. }
  80. }
  81. }
  82. }
  83. func initPubsub(l *logger.Logger, rdb *redis.Client, router *mux.Router) {
  84. thisPodClients := make(map[string]*Client)
  85. go subscribeToBroadcast(l, rdb, &thisPodClients)
  86. router.Path("/pubsub").Methods("GET").HandlerFunc(
  87. routeHandler(l, rdb, handleClientSubscription(&thisPodClients)),
  88. )
  89. }