Sfoglia il codice sorgente

feat: separate deployment for file watcher

    * feat: manifest for watcher
    * feat: run health server on watcher
    * feat: documented watcher
    * feat: better logging
    * feat: command to watch library directory
    * feat: print log level when logging
Fela Maslen 4 anni fa
parent
commit
2ee38186c2

+ 5 - 1
README.md

@@ -29,7 +29,11 @@ This is an HTTP API written in Go. It is responsible for implementing the PubSub
 
 ### Scanner
 
-This keeps the database up-to-date, based on a directory containing music files.
+This is a cronjob which periodically scans the library directory, keeping the database up to date.
+
+### Watcher
+
+This watches the library directory in real time, updating the database when it changes.
 
 ### Clients
 

+ 5 - 0
gmus-backend/Makefile

@@ -4,6 +4,8 @@ IMG 	:= ${IMAGE}:${TAG}
 
 build.scan:
 	go build -o bin/gmus.scan ./cmd/gmus.scan
+build.watch:
+	go build -o bin/gmus.watch ./cmd/gmus.watch
 build.migrate:
 	go build -o bin/gmus.migrate ./cmd/gmus.migrate
 build.server:
@@ -11,6 +13,7 @@ build.server:
 
 build:
 	make build.scan
+	make build.watch
 	make build.migrate
 	make build.server
 
@@ -43,5 +46,7 @@ migrate.make:
 
 run.scan:
 	GO_ENV=development go run ./cmd/gmus.scan
+run.watch:
+	GO_ENV=development go run ./cmd/gmus.watch
 run.server:
 	GO_ENV=development go run ./cmd/gmus.server

+ 21 - 0
gmus-backend/cmd/gmus.watch/main.go

@@ -0,0 +1,21 @@
+package main
+
+import (
+	"github.com/felamaslen/gmus-backend/pkg/config"
+	"github.com/felamaslen/gmus-backend/pkg/logger"
+	"github.com/felamaslen/gmus-backend/pkg/read"
+	"github.com/felamaslen/gmus-backend/pkg/server"
+)
+
+func main() {
+	conf := config.GetConfig()
+	l := logger.CreateLogger(conf.LogLevel)
+
+	l.Info("Watching library for changes")
+	go read.WatchLibraryRecursive(l, conf.LibraryDirectory)
+
+	// This is necessary for the liveness and readiness probes
+	srv := server.Server{}
+	srv.Init()
+	srv.Listen()
+}

+ 0 - 6
gmus-backend/pkg/config/config.go

@@ -142,16 +142,10 @@ func getAllowedOrigins() []string {
 	return strings.Split(origins, ",")
 }
 
-func getLibraryWatch() bool {
-	watch, _ := os.LookupEnv("LIBRARY_WATCH")
-	return watch != "false"
-}
-
 type config struct {
 	DatabaseUrl      string
 	LogLevel         logger.LogLevel
 	LibraryDirectory string
-	LibraryWatch     bool
 	Host             string
 	Port             int
 	RedisUrl         string

+ 0 - 1
gmus-backend/pkg/config/main.go

@@ -11,7 +11,6 @@ func GetConfig() config {
 		DatabaseUrl:      getDatabaseUrl(),
 		LogLevel:         getLogLevel(),
 		LibraryDirectory: os.Getenv("LIBRARY_DIRECTORY"),
-		LibraryWatch:     getLibraryWatch(),
 		Host:             getListenHost(),
 		Port:             getPort(),
 		RedisUrl:         getRedisUrl(),

+ 8 - 7
gmus-backend/pkg/logger/logger.go

@@ -1,6 +1,7 @@
 package logger
 
 import (
+	"fmt"
 	"log"
 )
 
@@ -19,8 +20,8 @@ type Logger struct {
 	Level LogLevel
 }
 
-func (l *Logger) Printf(str string, args ...interface{}) {
-	log.Printf(str, args...)
+func (l *Logger) printf(level string, str string, args ...interface{}) {
+	log.Printf(fmt.Sprintf("[%s] %s", level, str), args...)
 }
 
 func (l *Logger) Fatal(str string, args ...interface{}) {
@@ -29,31 +30,31 @@ func (l *Logger) Fatal(str string, args ...interface{}) {
 
 func (l *Logger) Error(str string, args ...interface{}) {
 	if l.Level >= LevelError {
-		l.Printf(str, args...)
+		l.printf("error", str, args...)
 	}
 }
 
 func (l *Logger) Warn(str string, args ...interface{}) {
 	if l.Level >= LevelWarn {
-		l.Printf(str, args...)
+		l.printf("warn", str, args...)
 	}
 }
 
 func (l *Logger) Info(str string, args ...interface{}) {
 	if l.Level >= LevelInfo {
-		l.Printf(str, args...)
+		l.printf("info", str, args...)
 	}
 }
 
 func (l *Logger) Verbose(str string, args ...interface{}) {
 	if l.Level >= LevelVerbose {
-		l.Printf(str, args...)
+		l.printf("verbose", str, args...)
 	}
 }
 
 func (l *Logger) Debug(str string, args ...interface{}) {
 	if l.Level >= LevelDebug {
-		l.Printf(str, args...)
+		l.printf("debug", str, args...)
 	}
 }
 

+ 24 - 10
gmus-backend/pkg/read/watcher.go

@@ -20,6 +20,9 @@ import (
 var watcher *fsnotify.Watcher
 
 func watchDir(path string, fi os.FileInfo, err error) error {
+	if err != nil {
+		return err
+	}
 	if fi.Mode().IsDir() {
 		return watcher.Add(path)
 	}
@@ -38,15 +41,20 @@ func getRelativePath(fileName string, rootDirectory string) (relativePath string
 }
 
 func handleFileRemoveOrRename(l *logger.Logger, db *sqlx.DB, rootDirectory string, event fsnotify.Event) {
-	relativePath, err := getRelativePath(event.Name, rootDirectory)
+	filePath := event.Name
+	relativePath, err := getRelativePath(filePath, rootDirectory)
 	if err != nil {
-		l.Warn("[WATCH] delete error: %v\n", err)
+		l.Error("[WATCH] delete invalid: %v\n", err)
 		return
 	}
 
-	l.Verbose("[WATCH] delete: basePath=%s, relativePath=%s\n", rootDirectory, relativePath)
+	l.Info("[WATCH] delete: basePath=%s, relativePath=%s\n", rootDirectory, relativePath)
+
 	watcher.Remove(event.Name)
-	repository.DeleteSongByPath(db, rootDirectory, relativePath)
+
+	if _, err = repository.DeleteSongByPath(db, rootDirectory, relativePath); err != nil {
+		l.Error("[WATCH] delete error: %v\n", err)
+	}
 	return
 }
 
@@ -59,13 +67,13 @@ var writeWaitMap = map[string]*time.Timer{}
 
 func handleFileOnceWritten(l *logger.Logger, db *sqlx.DB, rootDirectory string, filePath string) {
 	relativePath, err := getRelativePath(filePath, rootDirectory)
-	l.Verbose("[WATCH] create: basePath=%s, relativePath=%s\n", rootDirectory, relativePath)
-
 	if err != nil {
-		l.Warn("[WATCH] invalid path: %v\n", err)
+		l.Error("[WATCH] create invalid: %v\n", err)
 		return
 	}
 
+	l.Info("[WATCH] create: basePath=%s, relativePath=%s\n", rootDirectory, relativePath)
+
 	file, err := os.Stat(filePath)
 	if err != nil {
 		l.Warn("[WATCH] error reading file: %v\n", err)
@@ -116,8 +124,11 @@ func handleFileCreateEvent(l *logger.Logger, db *sqlx.DB, rootDirectory string,
 		return
 	}
 	if file.IsDir() {
-		l.Verbose("[WATCH] adding directory to watcher: %s\n", filePath)
-		watcher.Add(filePath)
+		l.Info("[WATCH] directory added: %s\n", filePath)
+
+		if err = watcher.Add(filePath); err != nil {
+			l.Error("[WATCH] error adding directory: %v\n", err)
+		}
 	} else {
 		waitUntilFileIsWritten(l, db, rootDirectory, event)
 	}
@@ -136,12 +147,13 @@ func handleWatcherEvent(l *logger.Logger, db *sqlx.DB, rootDirectory string, eve
 	}
 }
 
-func WatchLibraryRecursive(l *logger.Logger, rootDirectory string) {
+func WatchLibraryRecursive(l *logger.Logger, rootDirectory string) error {
 	watcher, _ = fsnotify.NewWatcher()
 	defer watcher.Close()
 
 	if err := filepath.Walk(rootDirectory, watchDir); err != nil {
 		l.Error("[WATCH] walk error: %s\n", err.Error())
+		return err
 	}
 
 	db := database.GetConnection()
@@ -159,4 +171,6 @@ func WatchLibraryRecursive(l *logger.Logger, rootDirectory string) {
 	}()
 
 	<-done
+
+	return fmt.Errorf("Ended")
 }

+ 32 - 30
gmus-backend/pkg/server/server.go

@@ -7,51 +7,53 @@ import (
 
 	"github.com/felamaslen/gmus-backend/pkg/config"
 	"github.com/felamaslen/gmus-backend/pkg/logger"
-	"github.com/felamaslen/gmus-backend/pkg/read"
 	"github.com/go-redis/redis"
 	"github.com/gorilla/mux"
 	"github.com/rs/cors"
 )
 
-func StartServer() {
-	conf := config.GetConfig()
-	l := logger.CreateLogger(conf.LogLevel)
+func (s *Server) Init() {
+	s.l = logger.CreateLogger(config.GetConfig().LogLevel)
+	s.router = mux.NewRouter()
 
-	rdb := redis.NewClient(&redis.Options{Addr: conf.RedisUrl})
-	defer rdb.Close()
+	healthRoutes(s.l, s.router)
+}
+
+func (s *Server) Listen() {
+	conf := config.GetConfig()
 
-	if conf.LibraryWatch {
-		l.Info("Watching library for changes")
-		go read.WatchLibraryRecursive(l, conf.LibraryDirectory)
-	} else {
-		l.Verbose("Not watching library for changes")
-	}
+	handler := cors.New(cors.Options{
+		AllowedOrigins:   conf.AllowedOrigins,
+		AllowCredentials: true,
+	}).Handler(s.router)
 
-	router := mux.NewRouter()
+	s.l.Info("Starting server on %s:%d\n", conf.Host, conf.Port)
+	log.Fatal(http.ListenAndServe(fmt.Sprintf("%s:%d", conf.Host, conf.Port), handler))
+}
 
-	healthRoutes(l, router)
+func StartServer() {
+	conf := config.GetConfig()
+	l := logger.CreateLogger(conf.LogLevel)
 
-	initPubsub(l, rdb, router)
+	server := Server{}
+	server.Init()
 
-	router.Path("/stream").Methods("GET").HandlerFunc(routeHandler(l, rdb, streamSong))
+	rdb := redis.NewClient(&redis.Options{Addr: conf.RedisUrl})
+	defer rdb.Close()
 
-	router.Path("/artists").Methods("GET").HandlerFunc(routeHandler(l, rdb, routeFetchArtists))
-	router.Path("/albums").Methods("GET").HandlerFunc(routeHandler(l, rdb, routeFetchAlbums))
-	router.Path("/songs").Methods("GET").HandlerFunc(routeHandler(l, rdb, routeFetchSongs))
+	initPubsub(l, rdb, server.router)
 
-	router.Path("/song-info").Methods("GET").HandlerFunc(routeHandler(l, rdb, routeFetchSongInfo))
-	router.Path("/multi-song-info").Methods("GET").HandlerFunc(routeHandler(l, rdb, routeFetchMultiSongInfo))
+	server.router.Path("/stream").Methods("GET").HandlerFunc(routeHandler(l, rdb, streamSong))
 
-	router.Path("/next-song").Methods("GET").HandlerFunc(routeHandler(l, rdb, routeFetchNextSong))
-	router.Path("/prev-song").Methods("GET").HandlerFunc(routeHandler(l, rdb, routeFetchPrevSong))
+	server.router.Path("/artists").Methods("GET").HandlerFunc(routeHandler(l, rdb, routeFetchArtists))
+	server.router.Path("/albums").Methods("GET").HandlerFunc(routeHandler(l, rdb, routeFetchAlbums))
+	server.router.Path("/songs").Methods("GET").HandlerFunc(routeHandler(l, rdb, routeFetchSongs))
 
-	port := conf.Port
+	server.router.Path("/song-info").Methods("GET").HandlerFunc(routeHandler(l, rdb, routeFetchSongInfo))
+	server.router.Path("/multi-song-info").Methods("GET").HandlerFunc(routeHandler(l, rdb, routeFetchMultiSongInfo))
 
-	handler := cors.New(cors.Options{
-		AllowedOrigins:   conf.AllowedOrigins,
-		AllowCredentials: true,
-	}).Handler(router)
+	server.router.Path("/next-song").Methods("GET").HandlerFunc(routeHandler(l, rdb, routeFetchNextSong))
+	server.router.Path("/prev-song").Methods("GET").HandlerFunc(routeHandler(l, rdb, routeFetchPrevSong))
 
-	l.Info("Starting server on %s:%d\n", conf.Host, port)
-	log.Fatal(http.ListenAndServe(fmt.Sprintf("%s:%d", conf.Host, port), handler))
+	server.Listen()
 }

+ 8 - 0
gmus-backend/pkg/server/types.go

@@ -3,9 +3,17 @@ package server
 import (
 	"sync"
 
+	"github.com/felamaslen/gmus-backend/pkg/logger"
+
+	"github.com/gorilla/mux"
 	"github.com/gorilla/websocket"
 )
 
+type Server struct {
+	router *mux.Router
+	l      *logger.Logger
+}
+
 type Client struct {
 	name      string
 	conn      *websocket.Conn

+ 59 - 0
k8s/manifest.yml

@@ -286,3 +286,62 @@ spec:
             - name: gmus-library
               hostPath:
                 path: LIBRARY_DIRECTORY
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: gmus-file-watcher
+  labels:
+    app: gmus-file-watcher
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: gmus-file-watcher
+  template:
+    metadata:
+      labels:
+        app: gmus-file-watcher
+    spec:
+      imagePullSecrets:
+        - name: regcred
+      containers:
+        - name: gmus-file-watcher
+          image: docker.fela.space/gmus-backend:0
+          ports:
+            - containerPort: 8081
+          args:
+            - gmus.watch
+          envFrom:
+            - configMapRef:
+                name: gmus-backend
+          env:
+            - name: GO_ENV
+              value: production
+            - name: PORT
+              value: "8081"
+            - name: POSTGRES_PASSWORD
+              valueFrom:
+                secretKeyRef:
+                  name: postgres-pass
+                  key: password
+          volumeMounts:
+            - name: gmus-library
+              mountPath: /music
+              readOnly: true
+          livenessProbe:
+            initialDelaySeconds: 5
+            periodSeconds: 5
+            httpGet:
+              path: /liveness
+              port: 8081
+          readinessProbe:
+            initialDelaySeconds: 5
+            periodSeconds: 5
+            httpGet:
+              path: /readiness
+              port: 8081
+      volumes:
+        - name: gmus-library
+          hostPath:
+            path: LIBRARY_DIRECTORY