pubsub.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "github.com/felamaslen/gmus-backend/pkg/logger"
  8. "github.com/go-redis/redis/v7"
  9. "github.com/gorilla/mux"
  10. "github.com/gorilla/websocket"
  11. )
  12. var upgrader = websocket.Upgrader{
  13. CheckOrigin: func(r *http.Request) bool {
  14. return true
  15. },
  16. }
  17. func handleClientSubscription(thisPodClients *map[string]*Client) RouteHandler {
  18. return func(l *logger.Logger, rdb *redis.Client, w http.ResponseWriter, r *http.Request) error {
  19. clientName := getClientNameFromRequest(r)
  20. if len(clientName) == 0 {
  21. w.WriteHeader(400)
  22. w.Write([]byte("Must set client name in query"))
  23. return nil
  24. }
  25. conn, err := upgrader.Upgrade(w, r, nil)
  26. if err != nil {
  27. w.WriteHeader(400)
  28. w.Write([]byte("Incorrect client config to upgrade WS"))
  29. return nil
  30. }
  31. client := Client{
  32. name: clientName,
  33. conn: conn,
  34. closeChan: make(chan bool),
  35. }
  36. (*thisPodClients)[client.name] = &client
  37. defer conn.Close()
  38. go func() {
  39. for {
  40. select {
  41. case <-client.closeChan:
  42. l.Verbose("Caught closeChan call, closing... %s\n", client.name)
  43. if _, ok := (*thisPodClients)[client.name]; ok {
  44. delete(*thisPodClients, client.name)
  45. }
  46. client.onDisconnect(l, rdb)
  47. return
  48. }
  49. }
  50. }()
  51. if err := client.onConnect(l, rdb); err != nil {
  52. l.Error("Error connecting client: %v\n", err)
  53. return err
  54. }
  55. return nil
  56. }
  57. }
  58. func subscribeToBroadcast(
  59. l *logger.Logger,
  60. rdb *redis.Client,
  61. thisPodClients *map[string]*Client,
  62. ) {
  63. // Subscribe all of this pod's clients to messages from any pod, via internal pubsub
  64. subscription := rdb.Subscribe(TOPIC_BROADCAST)
  65. for {
  66. select {
  67. case msg, ok := <-subscription.Channel():
  68. if !ok {
  69. return
  70. }
  71. var actionFromPubsub Action
  72. if err := json.Unmarshal([]byte(msg.Payload), &actionFromPubsub); err != nil {
  73. l.Error("Invalid action from pubsub: %v\n", err)
  74. } else {
  75. if actionFromPubsub.FromClient == nil {
  76. l.Debug("[<-Server] %s\n", actionFromPubsub.Type)
  77. } else {
  78. l.Debug("[<-Client] %s (%s)\n", actionFromPubsub.Type, *actionFromPubsub.FromClient)
  79. }
  80. errors := broadcastAction(l, thisPodClients, &actionFromPubsub)
  81. if len(errors) > 0 {
  82. l.Warn("Error broadcasting: %v\n", errors)
  83. }
  84. }
  85. }
  86. }
  87. }
  88. func pruneDisappearedClients(l *logger.Logger, rdb *redis.Client) {
  89. for {
  90. now := time.Now().Unix()
  91. rdb.ZRemRangeByScore(KEY_CLIENT_NAMES, "0", fmt.Sprintf("%d", now-CLIENT_TTL_SEC))
  92. time.Sleep(CLIENT_TTL_SEC * time.Second)
  93. }
  94. }
  95. func initPubsub(l *logger.Logger, rdb *redis.Client, router *mux.Router) {
  96. thisPodClients := make(map[string]*Client)
  97. go subscribeToBroadcast(l, rdb, &thisPodClients)
  98. go pruneDisappearedClients(l, rdb)
  99. router.Path("/pubsub").Methods("GET").HandlerFunc(
  100. routeHandler(l, rdb, handleClientSubscription(&thisPodClients)),
  101. )
  102. }