clients.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package server
  2. import (
  3. "net/http"
  4. "github.com/felamaslen/go-music-player/pkg/logger"
  5. "github.com/go-redis/redis/v7"
  6. )
  7. func getClientNameFromRequest(r *http.Request) string {
  8. return r.URL.Query().Get(CLIENT_QUERY_NAME)
  9. }
  10. func endSubscription(sub *redis.PubSub) error {
  11. if err := sub.Unsubscribe(); err != nil {
  12. return err
  13. }
  14. if err := sub.Close(); err != nil {
  15. return err
  16. }
  17. return nil
  18. }
  19. func (c *Client) exposeToNetwork(l *logger.Logger, rdb *redis.Client) error {
  20. // Expose the client to all pods running the server
  21. if _, err := rdb.SAdd(KEY_CLIENT_NAMES, c.name).Result(); err != nil {
  22. return err
  23. }
  24. allClients, err := rdb.SMembers(KEY_CLIENT_NAMES).Result()
  25. if err != nil {
  26. return err
  27. }
  28. if err := publishAction(rdb, &Action{
  29. Type: ClientConnected,
  30. Payload: allClients,
  31. }); err != nil {
  32. return err
  33. }
  34. return nil
  35. }
  36. func (c *Client) disposeFromNetwork(l *logger.Logger, rdb *redis.Client) error {
  37. // Make sure other clients know when one goes away
  38. if _, err := rdb.SRem(KEY_CLIENT_NAMES, c.name).Result(); err != nil {
  39. return err
  40. }
  41. allClients, err := rdb.SMembers(KEY_CLIENT_NAMES).Result()
  42. if err != nil {
  43. return err
  44. }
  45. if err := publishAction(rdb, &Action{
  46. Type: ClientDisconnected,
  47. Payload: allClients,
  48. }); err != nil {
  49. return err
  50. }
  51. return nil
  52. }
  53. func (c *Client) subscribeToMe(l *logger.Logger, rdb *redis.Client) {
  54. // Subscribe this pod to messages from the client. This pod is responsible for
  55. // onward publishing to other pods where necessary, via internal pubsub
  56. for {
  57. select {
  58. case <- c.closeChan:
  59. return
  60. default:
  61. var actionFromClient Action
  62. if err := c.conn.ReadJSON(&actionFromClient); err != nil {
  63. return
  64. }
  65. l.Debug("[->Client] %s (%s)\n", actionFromClient.Type, c.name)
  66. actionFromClient.FromClient = &c.name
  67. if err := publishAction(rdb, &actionFromClient); err != nil {
  68. l.Error("Error publishing action from client: %v\n", err)
  69. }
  70. }
  71. }
  72. }
  73. func (c *Client) onConnect(l *logger.Logger, rdb *redis.Client) error {
  74. if err := c.exposeToNetwork(l, rdb); err != nil {
  75. l.Error("Error exposing new client to network: %v\n", err)
  76. return err
  77. }
  78. c.subscribeToMe(l, rdb)
  79. return nil
  80. }
  81. func (c *Client) onDisconnect(l *logger.Logger, rdb *redis.Client) error {
  82. l.Verbose("[Client disconnected] %s\n", c.name)
  83. close(c.closeChan)
  84. if err := c.disposeFromNetwork(l, rdb); err != nil {
  85. l.Error("Error disposing client from network: %v\n", err)
  86. return err
  87. }
  88. return nil
  89. }