pubsub.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "github.com/felamaslen/go-music-player/pkg/logger"
  8. "github.com/go-redis/redis/v7"
  9. "github.com/gorilla/mux"
  10. "github.com/gorilla/websocket"
  11. )
  12. var upgrader = websocket.Upgrader{
  13. CheckOrigin: func(r *http.Request) bool {
  14. return true
  15. },
  16. }
  17. func handleClientSubscription(thisPodClients *map[string]*Client) RouteHandler {
  18. return func(l *logger.Logger, rdb *redis.Client, w http.ResponseWriter, r *http.Request) error {
  19. clientName := getClientNameFromRequest(r)
  20. if len(clientName) == 0 {
  21. w.WriteHeader(400)
  22. w.Write([]byte("Must set client name in query"))
  23. return nil
  24. }
  25. l.Verbose("[Client connected] %s\n", clientName)
  26. conn, err := upgrader.Upgrade(w, r, nil)
  27. if err != nil {
  28. w.WriteHeader(400)
  29. w.Write([]byte("Incorrect client config to upgrade WS"))
  30. return nil
  31. }
  32. client := Client{
  33. name: clientName,
  34. conn: conn,
  35. closeChan: make(chan bool),
  36. }
  37. (*thisPodClients)[client.name] = &client
  38. defer conn.Close()
  39. go func() {
  40. for {
  41. select {
  42. case <- client.closeChan:
  43. l.Verbose("Caught closeChan call, closing... %s\n", client.name)
  44. if _, ok := (*thisPodClients)[client.name]; ok {
  45. delete(*thisPodClients, client.name)
  46. }
  47. client.onDisconnect(l, rdb)
  48. return
  49. }
  50. }
  51. }()
  52. if err := client.onConnect(l, rdb); err != nil {
  53. l.Error("Error connecting client: %v\n", err)
  54. return err
  55. }
  56. return nil
  57. }
  58. }
  59. func subscribeToBroadcast(
  60. l *logger.Logger,
  61. rdb *redis.Client,
  62. thisPodClients *map[string]*Client,
  63. ) {
  64. // Subscribe all of this pod's clients to messages from any pod, via internal pubsub
  65. subscription := rdb.Subscribe(TOPIC_BROADCAST)
  66. for {
  67. select {
  68. case msg, ok := <- subscription.Channel():
  69. if !ok {
  70. return
  71. }
  72. var actionFromPubsub Action
  73. if err := json.Unmarshal([]byte(msg.Payload), &actionFromPubsub); err != nil {
  74. l.Error("Invalid action from pubsub: %v\n", err)
  75. } else {
  76. if actionFromPubsub.FromClient == nil {
  77. l.Debug("[<-Server] %s\n", actionFromPubsub.Type)
  78. } else {
  79. l.Debug("[<-Client] %s (%s)\n", actionFromPubsub.Type, *actionFromPubsub.FromClient)
  80. }
  81. errors := broadcastAction(thisPodClients, &actionFromPubsub)
  82. if len(errors) > 0 {
  83. l.Warn("Error broadcasting: %v\n", errors)
  84. }
  85. }
  86. }
  87. }
  88. }
  89. func pruneDisappearedClients(l *logger.Logger, rdb *redis.Client) {
  90. for {
  91. now := time.Now().Unix()
  92. rdb.ZRemRangeByScore(KEY_CLIENT_NAMES, "0", fmt.Sprintf("%d", now - CLIENT_TTL_SEC))
  93. time.Sleep(CLIENT_TTL_SEC * time.Second)
  94. }
  95. }
  96. func initPubsub(l *logger.Logger, rdb *redis.Client, router *mux.Router) {
  97. thisPodClients := make(map[string]*Client)
  98. go subscribeToBroadcast(l, rdb, &thisPodClients)
  99. go pruneDisappearedClients(l, rdb)
  100. router.Path("/pubsub").Methods("GET").HandlerFunc(
  101. routeHandler(l, rdb, handleClientSubscription(&thisPodClients)),
  102. )
  103. }