فهرست منبع

feat: batch filtering of scanned files

Fela Maslen 5 سال پیش
والد
کامیت
bb1cff6949

+ 1 - 0
music-player/go.mod

@@ -11,6 +11,7 @@ require (
 	github.com/jackc/pgx/v4 v4.9.2 // indirect
 	github.com/jmoiron/sqlx v1.2.0
 	github.com/joho/godotenv v1.3.0
+	github.com/lib/pq v1.3.0
 	github.com/onsi/ginkgo v1.14.2
 	github.com/onsi/gomega v1.10.1
 	github.com/stretchr/testify v1.5.1 // indirect

+ 2 - 0
music-player/migrations/000004_base_path_index.down.sql

@@ -0,0 +1,2 @@
+DROP INDEX songs_file_time_unique;
+CREATE INDEX filename_timestamp ON songs (base_path, relative_path, modified_date);

+ 2 - 0
music-player/migrations/000004_base_path_index.up.sql

@@ -0,0 +1,2 @@
+DROP INDEX filename_timestamp;
+CREATE UNIQUE INDEX songs_file_time_unique ON songs (base_path, relative_path, modified_date)

+ 101 - 24
music-player/pkg/read/files.go

@@ -8,8 +8,11 @@ import (
 	"github.com/felamaslen/go-music-player/pkg/database"
 	"github.com/felamaslen/go-music-player/pkg/logger"
 	"github.com/jmoiron/sqlx"
+	"github.com/lib/pq"
 )
 
+const BATCH_SIZE = 100
+
 func ReadMultipleFiles(basePath string, files chan *File) chan *Song {
   var l = logger.CreateLogger(config.GetConfig().LogLevel)
 
@@ -51,19 +54,26 @@ func isValidFile(file string) bool {
 func recursiveDirScan(
   db *sqlx.DB,
   l *logger.Logger,
-  output *chan *File,
+  inputBatch *[BATCH_SIZE]*File,
+  fileBatches *chan [BATCH_SIZE]*File,
   rootDirectory string,
   relativePath string,
   isRoot bool,
-) {
+  batchIndex int,
+) int {
   directoryToScan := filepath.Join(rootDirectory, relativePath)
 
   if (isRoot) {
     l.Verbose("Scanning root directory: %s\n", directoryToScan)
 
     defer func() {
+      var remainingItemsExist = batchIndex > 0
+      if remainingItemsExist {
+        *fileBatches <- *inputBatch
+      }
+
       l.Verbose("Finished recursive directory scan")
-      close(*output)
+      close(*fileBatches)
     }()
   } else {
     l.Debug("Scanning subdirectory: %s\n", directoryToScan)
@@ -73,45 +83,103 @@ func recursiveDirScan(
 
   if err != nil {
     l.Fatal("Error scanning directory: (%s): %s", directoryToScan, err)
-    return
   }
 
   for _, file := range(files) {
     fileRelativePath := filepath.Join(relativePath, file.Name())
 
     if file.IsDir() {
-      recursiveDirScan(
+      batchIndex = recursiveDirScan(
         db,
         l,
-        output,
+        inputBatch,
+        fileBatches,
         rootDirectory,
         fileRelativePath,
         false,
+        batchIndex,
       )
     } else if isValidFile(file.Name()) {
-      modifiedDate := file.ModTime().Unix()
 
-      var existingCount = 0
+      if batchIndex == BATCH_SIZE {
+        *fileBatches <- *inputBatch
+        batchIndex = 0
+      }
+
+      (*inputBatch)[batchIndex] = &File{
+        RelativePath: fileRelativePath,
+        ModifiedDate: file.ModTime().Unix(),
+      }
+
+      batchIndex++
+    }
+  }
+
+  return batchIndex
+}
+
+func maybeReadFile(
+  db *sqlx.DB,
+  l *logger.Logger,
+  output *chan *File,
+  inputBatch *chan [BATCH_SIZE]*File,
+  basePath string,
+) {
+  defer close(*output)
+
+  for {
+    select {
+    case batch, more := <- *inputBatch:
+      if !more {
+        return
+      }
+
+      var relativePaths pq.StringArray
+      var modifiedDates pq.Int64Array
+
+      for _, s := range batch {
+        if s == nil {
+          break
+        }
 
-      err := db.Get(
-        &existingCount,
+        relativePaths = append(relativePaths, s.RelativePath)
+        modifiedDates = append(modifiedDates, s.ModifiedDate)
+      }
+
+      newOrUpdatedFiles, err := db.Queryx(
         `
-        select count(*) from songs
-        where base_path = $1 and relative_path = $2 and modified_date = $3
+        select r.relative_path, r.modified_date
+        from (
+          select * from unnest($1::varchar[], $2::bigint[])
+          as t(relative_path, modified_date)
+        ) r
+
+        left join songs on
+          songs.base_path = $3
+          and songs.relative_path = r.relative_path
+          and songs.modified_date = r.modified_date
+
+        where songs.id is null
         `,
-        rootDirectory,
-        fileRelativePath,
-        modifiedDate,
+        relativePaths,
+        modifiedDates,
+        basePath,
       )
 
-      if err == nil && existingCount == 0 {
-        l.Verbose("Found file: %s\n", fileRelativePath)
+      if err != nil {
+        l.Fatal("Error determining file eligibility: %v\n", err)
+      }
 
-        *output <- &File{
-          RelativePath: fileRelativePath,
-          ModifiedDate: modifiedDate,
-        }
+      for newOrUpdatedFiles.Next() {
+        var file File
+        newOrUpdatedFiles.StructScan(&file)
+
+        l.Verbose("New or updated file: %s\n", file.RelativePath)
+
+        *output <- &file
       }
+
+      newOrUpdatedFiles.Close()
     }
   }
 }
@@ -120,18 +188,27 @@ func ScanDirectory(directory string) chan *File {
   db := database.GetConnection()
   l := logger.CreateLogger(config.GetConfig().LogLevel)
 
-  files := make(chan *File)
+  output := make(chan *File)
+  fileBatches := make(chan [BATCH_SIZE]*File)
+
+  go func() {
+    maybeReadFile(db, l, &output, &fileBatches, directory)
+  }()
+
+  var inputBatch [BATCH_SIZE]*File
   
   go func() {
     recursiveDirScan(
       db,
       l,
-      &files,
+      &inputBatch,
+      &fileBatches,
       directory,
       "",
       true,
+      0,
     )
   }()
 
-  return files
+  return output
 }

+ 8 - 2
music-player/pkg/read/files_test.go

@@ -87,8 +87,14 @@ var _ = Describe("reading files", func() {
 
       It("should return a channel with all the files in the directory", func() {
 	Expect(results).To(HaveLen(2))
-	Expect(results[0].RelativePath).To(Equal(read.TestSong.RelativePath))
-	Expect(results[1].RelativePath).To(Equal(read.TestSongNested.RelativePath))
+
+	if results[0].RelativePath == read.TestSong.RelativePath {
+	  Expect(results[0].RelativePath).To(Equal(read.TestSong.RelativePath))
+	  Expect(results[1].RelativePath).To(Equal(read.TestSongNested.RelativePath))
+	} else {
+	  Expect(results[1].RelativePath).To(Equal(read.TestSong.RelativePath))
+	  Expect(results[0].RelativePath).To(Equal(read.TestSongNested.RelativePath))
+	}
       })
     })
 

+ 2 - 2
music-player/pkg/read/types.go

@@ -12,6 +12,6 @@ type Song struct {
 }
 
 type File struct {
-  RelativePath string
-  ModifiedDate int64
+  RelativePath string 	`db:"relative_path"`
+  ModifiedDate int64 	`db:"modified_date"`
 }

+ 8 - 18
music-player/pkg/repository/scan_test.go

@@ -61,18 +61,15 @@ var _ = Describe("scanning repository", func() {
       })
 
       It("should insert both songs", func() {
-	var song read.Song
+	var songs []read.Song
 
-	rows, _ := db.Queryx(`
+	db.Select(&songs, `
 	select title, artist, album, duration, base_path, relative_path, modified_date
 	from songs
 	order by title
 	`)
 
-	rows.Next()
-	rows.StructScan(&song)
-
-	Expect(song).To(Equal(read.Song{
+	Expect(songs[0]).To(Equal(read.Song{
 	  Title: "Hey Jude",
 	  Artist: "The Beatles",
 	  Album: "",
@@ -82,10 +79,7 @@ var _ = Describe("scanning repository", func() {
 	  ModifiedDate: 8876,
 	}))
 
-	rows.Next()
-	rows.StructScan(&song)
-
-	Expect(song).To(Equal(read.Song{
+	Expect(songs[1]).To(Equal(read.Song{
 	  Title: "Starman",
 	  Artist: "David Bowie",
 	  Album: "The Rise and Fall of Ziggy Stardust and the Spiders from Mars",
@@ -94,8 +88,6 @@ var _ = Describe("scanning repository", func() {
 	  RelativePath: "otherFile.ogg",
 	  ModifiedDate: 11883,
 	}))
-
-	rows.Close()
       })
     })
 
@@ -128,22 +120,20 @@ var _ = Describe("scanning repository", func() {
       })
 
       It("should upsert the existing item", func() {
-	rows, _ := db.Queryx(`
+	var songs []read.Song
+	db.Select(&songs, `
 	select title, artist, album, duration, base_path, relative_path, modified_date from songs
 	where base_path = '/path/to' and relative_path = 'file.ogg'
 	`)
 
-	var song read.Song
-	rows.Next()
-	rows.StructScan(&song)
+	Expect(songs).To(HaveLen(1))
+	var song = songs[0]
 
 	Expect(song.Title).To(Equal("Hey Jude"))
 	Expect(song.Artist).To(Equal("The Beatles"))
 	Expect(song.Album).To(Equal(""))
 	Expect(song.Duration).To(Equal(431))
 	Expect(song.ModifiedDate).To(Equal(int64(8876)))
-
-	rows.Close()
       })
     })
   })

+ 14 - 9
music-player/pkg/services/scanner_test.go

@@ -21,27 +21,32 @@ var _ = Describe("music scanner (integration test)", func() {
 
     db := database.GetConnection()
 
-    rows, err := db.Queryx(`
+    var songs []read.Song
+    err := db.Select(&songs, `
       select title, artist, album, duration, base_path, relative_path
       from songs
     `)
 
     Expect(err).To(BeNil())
 
-    var song read.Song
+    Expect(songs).To(HaveLen(2))
 
-    rows.Next()
-    rows.StructScan(&song)
-
-    Expect(song).To(Equal(read.Song{
+    Expect(read.Song{
       Title: read.TestSong.Title,
       Artist: read.TestSong.Artist,
       Album: read.TestSong.Album,
       Duration: read.TestSong.Duration,
       BasePath: read.TestSong.BasePath,
       RelativePath: read.TestSong.RelativePath,
-    }))
-
-    rows.Close()
+    }).To(BeElementOf(songs))
+
+    Expect(read.Song{
+      Title: read.TestSongNested.Title,
+      Artist: read.TestSongNested.Artist,
+      Album: read.TestSongNested.Album,
+      Duration: read.TestSongNested.Duration,
+      BasePath: read.TestSongNested.BasePath,
+      RelativePath: read.TestSongNested.RelativePath,
+    }).To(BeElementOf(songs))
   })
 })