pubsub.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package server
  2. import (
  3. "encoding/json"
  4. "net/http"
  5. "github.com/felamaslen/go-music-player/pkg/config"
  6. "github.com/felamaslen/go-music-player/pkg/logger"
  7. "github.com/go-redis/redis/v7"
  8. "github.com/gorilla/mux"
  9. )
  10. func handleClientSubscription(
  11. l *logger.Logger,
  12. rdb *redis.Client,
  13. thisPodClients *map[string]*Client,
  14. ) func(http.ResponseWriter, *http.Request) {
  15. return func(w http.ResponseWriter, r *http.Request) {
  16. clientName := getClientNameFromRequest(r)
  17. if len(clientName) == 0 {
  18. w.WriteHeader(400)
  19. w.Write([]byte("Must set client name in query"))
  20. return
  21. }
  22. l.Debug("got request, name=%v\n", clientName)
  23. conn, err := upgrader.Upgrade(w, r, nil)
  24. if err != nil {
  25. l.Error("Error upgrading client: %v\n", err)
  26. return
  27. }
  28. client := Client{
  29. name: clientName,
  30. conn: conn,
  31. closeChan: make(chan bool),
  32. }
  33. (*thisPodClients)[client.name] = &client
  34. defer conn.Close()
  35. conn.SetCloseHandler(func(code int, text string) error {
  36. if _, ok := (*thisPodClients)[client.name]; ok {
  37. delete(*thisPodClients, client.name)
  38. }
  39. if err := client.onDisconnect(l, rdb); err != nil {
  40. return err
  41. }
  42. return nil
  43. })
  44. if err := client.onConnect(l, rdb); err != nil {
  45. l.Error("Error connecting client: %v\n", err)
  46. }
  47. }
  48. }
  49. func subscribeToBroadcast(
  50. l *logger.Logger,
  51. rdb *redis.Client,
  52. thisPodClients *map[string]*Client,
  53. ) {
  54. // Subscribe all of this pod's clients to messages from any pod, via internal pubsub
  55. subscription := rdb.Subscribe(TOPIC_BROADCAST)
  56. for {
  57. select {
  58. case msg, ok := <- subscription.Channel():
  59. if !ok {
  60. return
  61. }
  62. var actionFromPubsub Action
  63. if err := json.Unmarshal([]byte(msg.Payload), &actionFromPubsub); err != nil {
  64. l.Error("Invalid action from pubsub: %v\n", err)
  65. } else {
  66. if actionFromPubsub.FromClient == nil {
  67. l.Debug("[<-Server] %s\n", actionFromPubsub.Type)
  68. } else {
  69. l.Debug("[<-Client] %s (%s)\n", actionFromPubsub.Type, *actionFromPubsub.FromClient)
  70. }
  71. errors := broadcastAction(thisPodClients, &actionFromPubsub)
  72. if len(errors) > 0 {
  73. l.Warn("Error broadcasting: %v\n", errors)
  74. }
  75. }
  76. }
  77. }
  78. }
  79. func initPubsub(l *logger.Logger, router *mux.Router) {
  80. rdb := redis.NewClient(&redis.Options{ Addr: config.GetConfig().RedisUrl })
  81. defer rdb.Close()
  82. thisPodClients := make(map[string]*Client)
  83. go subscribeToBroadcast(l, rdb, &thisPodClients)
  84. router.Path("/pubsub").Methods("GET").HandlerFunc(handleClientSubscription(l, rdb, &thisPodClients))
  85. }