| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- package server
- import (
- "encoding/json"
- "fmt"
- "net/http"
- "time"
- "github.com/felamaslen/gmus-backend/pkg/logger"
- "github.com/go-redis/redis"
- "github.com/gorilla/mux"
- "github.com/gorilla/websocket"
- )
- var upgrader = websocket.Upgrader{
- CheckOrigin: func(r *http.Request) bool {
- return true
- },
- }
- func handleClientSubscription(thisPodClients *map[string]*Client) RouteHandler {
- return func(l *logger.Logger, rdb redis.Cmdable, w http.ResponseWriter, r *http.Request) error {
- clientName := getClientNameFromRequest(r)
- if len(clientName) == 0 {
- w.WriteHeader(400)
- w.Write([]byte("Must set client name in query"))
- return nil
- }
- conn, err := upgrader.Upgrade(w, r, nil)
- if err != nil {
- w.WriteHeader(400)
- w.Write([]byte("Incorrect client config to upgrade WS"))
- return nil
- }
- client := Client{
- name: clientName,
- conn: conn,
- closeChan: make(chan bool),
- }
- (*thisPodClients)[client.name] = &client
- defer conn.Close()
- go func() {
- for {
- select {
- case <-client.closeChan:
- l.Verbose("Caught closeChan call, closing... %s\n", client.name)
- if _, ok := (*thisPodClients)[client.name]; ok {
- delete(*thisPodClients, client.name)
- }
- client.onDisconnect(l, rdb)
- return
- }
- }
- }()
- if err := client.onConnect(l, rdb); err != nil {
- l.Error("Error connecting client: %v\n", err)
- return err
- }
- return nil
- }
- }
- func subscribeToBroadcast(
- l *logger.Logger,
- rdb *redis.Client,
- thisPodClients *map[string]*Client,
- ) {
- // Subscribe all of this pod's clients to messages from any pod, via internal pubsub
- subscription := rdb.Subscribe(TOPIC_BROADCAST)
- for {
- select {
- case msg, ok := <-subscription.Channel():
- if !ok {
- return
- }
- var actionFromPubsub Action
- if err := json.Unmarshal([]byte(msg.Payload), &actionFromPubsub); err != nil {
- l.Error("Invalid action from pubsub: %v\n", err)
- } else {
- if actionFromPubsub.FromClient == nil {
- l.Debug("[<-Server] %s\n", actionFromPubsub.Type)
- } else {
- l.Debug("[<-Client] %s (%s)\n", actionFromPubsub.Type, *actionFromPubsub.FromClient)
- }
- errors := BroadcastAction(l, thisPodClients, &actionFromPubsub)
- if len(errors) > 0 {
- l.Warn("Error broadcasting: %v\n", errors)
- }
- }
- }
- }
- }
- func pruneDisappearedClients(l *logger.Logger, rdb redis.Cmdable) {
- for {
- now := time.Now().Unix()
- rdb.ZRemRangeByScore(KEY_CLIENT_NAMES, "0", fmt.Sprintf("%d", now-CLIENT_TTL_SEC))
- time.Sleep(CLIENT_TTL_SEC * time.Second)
- }
- }
- func initPubsub(l *logger.Logger, rdb *redis.Client, router *mux.Router) {
- thisPodClients := make(map[string]*Client)
- go subscribeToBroadcast(l, rdb, &thisPodClients)
- go pruneDisappearedClients(l, rdb)
- router.Path("/pubsub").Methods("GET").HandlerFunc(
- routeHandler(l, rdb, handleClientSubscription(&thisPodClients)),
- )
- }
|