فهرست منبع

chore: moved batching logic into subscriber with better logging and naming

Fela Maslen 5 سال پیش
والد
کامیت
15866124e7
1فایلهای تغییر یافته به همراه90 افزوده شده و 89 حذف شده
  1. 90 89
      music-player/pkg/read/files.go

+ 90 - 89
music-player/pkg/read/files.go

@@ -12,6 +12,7 @@ import (
 )
 
 const BATCH_SIZE = 100
+const LOG_EVERY = 100;
 
 func ReadMultipleFiles(basePath string, files chan *File) chan *Song {
   var l = logger.CreateLogger(config.GetConfig().LogLevel)
@@ -20,7 +21,7 @@ func ReadMultipleFiles(basePath string, files chan *File) chan *Song {
 
   go func() {
     defer func() {
-      l.Verbose("Finished reading files")
+      l.Verbose("[READ] Finished reading files")
       close(songs)
     }()
 
@@ -28,13 +29,13 @@ func ReadMultipleFiles(basePath string, files chan *File) chan *Song {
       select {
       case file, more := <- files:
         if more {
-          l.Verbose("Reading file: %s\n", file.RelativePath)
+          l.Debug("[READ] %s\n", file.RelativePath)
           song, err := ReadFile(basePath, file)
 
           if err == nil {
             songs <- song
           } else {
-            l.Error("Error reading file (%s): %s\n", file.RelativePath, err)
+            l.Error("[READ] Error (%s): %v\n", file.RelativePath, err)
           }
         } else {
           return
@@ -54,133 +55,137 @@ func isValidFile(file string) bool {
 func recursiveDirScan(
   db *sqlx.DB,
   l *logger.Logger,
-  inputBatch *[BATCH_SIZE]*File,
-  fileBatches *chan [BATCH_SIZE]*File,
+  allFiles *chan *File,
   rootDirectory string,
   relativePath string,
   isRoot bool,
-  batchIndex int,
-) int {
+) {
   directoryToScan := filepath.Join(rootDirectory, relativePath)
 
   if (isRoot) {
-    l.Verbose("Scanning root directory: %s\n", directoryToScan)
+    l.Verbose("[SCAN] (root): %s\n", directoryToScan)
 
     defer func() {
-      var remainingItemsExist = batchIndex > 0
-      if remainingItemsExist {
-        *fileBatches <- *inputBatch
-      }
-
-      l.Verbose("Finished recursive directory scan")
-      close(*fileBatches)
+      l.Verbose("[SCAN] Finished scanning directory")
+      close(*allFiles)
     }()
   } else {
-    l.Debug("Scanning subdirectory: %s\n", directoryToScan)
+    l.Debug("[SCAN] %s\n", directoryToScan)
   }
 
   files, err := ioutil.ReadDir(directoryToScan)
 
   if err != nil {
-    l.Error("Error scanning directory: (%s): %s", directoryToScan, err)
-    return batchIndex
+    l.Error("[SCAN] Error (%s): %v", directoryToScan, err)
+    return // TODO: add this to a table of failed directories
   }
 
   for _, file := range(files) {
     fileRelativePath := filepath.Join(relativePath, file.Name())
 
     if file.IsDir() {
-      batchIndex = recursiveDirScan(
+      recursiveDirScan(
         db,
         l,
-        inputBatch,
-        fileBatches,
+        allFiles,
         rootDirectory,
         fileRelativePath,
         false,
-        batchIndex,
       )
     } else if isValidFile(file.Name()) {
-
-      if batchIndex == BATCH_SIZE {
-        *fileBatches <- *inputBatch
-        batchIndex = 0
-      }
-
-      (*inputBatch)[batchIndex] = &File{
+      *allFiles <- &File{
         RelativePath: fileRelativePath,
         ModifiedDate: file.ModTime().Unix(),
       }
-
-      batchIndex++
     }
   }
-
-  return batchIndex
 }
 
-func maybeReadFile(
+func batchFilterFiles(
   db *sqlx.DB,
   l *logger.Logger,
-  output *chan *File,
-  inputBatch *chan [BATCH_SIZE]*File,
+  filteredOutput *chan *File,
+  allFiles *chan *File,
   basePath string,
 ) {
-  defer close(*output)
+  defer close(*filteredOutput)
 
-  for {
-    select {
-    case batch, more := <- *inputBatch:
-      if !more {
-        return
-      }
+  var batch [BATCH_SIZE]*File
+  var batchSize = 0
+  var numFiltered = 0
 
-      var relativePaths pq.StringArray
-      var modifiedDates pq.Int64Array
+  var processBatch = func() {
+    if batchSize == 0 {
+      l.Verbose("[FILTER] Finished filtering files\n")
+      return
+    }
 
-      for _, s := range batch {
-        if s == nil {
-          break
-        }
+    l.Debug("[FILTER] Processing batch\n")
 
-        relativePaths = append(relativePaths, s.RelativePath)
-        modifiedDates = append(modifiedDates, s.ModifiedDate)
-      }
+    var relativePaths pq.StringArray
+    var modifiedDates pq.Int64Array
 
-      newOrUpdatedFiles, err := db.Queryx(
-        `
-        select r.relative_path, r.modified_date
-        from (
-          select * from unnest($1::varchar[], $2::bigint[])
-          as t(relative_path, modified_date)
-        ) r
-
-        left join songs on
-          songs.base_path = $3
-          and songs.relative_path = r.relative_path
-          and songs.modified_date = r.modified_date
-
-        where songs.id is null
-        `,
-        relativePaths,
-        modifiedDates,
-        basePath,
-      )
+    for i := 0; i < batchSize; i++ {
+      relativePaths = append(relativePaths, batch[i].RelativePath)
+      modifiedDates = append(modifiedDates, batch[i].ModifiedDate)
+    }
 
-      if err != nil {
-        l.Fatal("Error determining file eligibility: %v\n", err)
-      }
+    newOrUpdatedFiles, err := db.Queryx(
+      `
+      select r.relative_path, r.modified_date
+      from (
+        select * from unnest($1::varchar[], $2::bigint[])
+        as t(relative_path, modified_date)
+      ) r
+
+      left join songs on
+        songs.base_path = $3
+        and songs.relative_path = r.relative_path
+        and songs.modified_date = r.modified_date
+
+      where songs.id is null
+      `,
+      relativePaths,
+      modifiedDates,
+      basePath,
+    )
+
+    if err != nil {
+      l.Fatal("[FILTER] Fatal error! %v\n", err)
+    }
 
-      for newOrUpdatedFiles.Next() {
-        var file File
-        newOrUpdatedFiles.StructScan(&file)
+    for newOrUpdatedFiles.Next() {
+      var file File
+      newOrUpdatedFiles.StructScan(&file)
 
-        l.Verbose("New or updated file: %s\n", file.RelativePath)
+      l.Debug("[FILTER] %s\n", file.RelativePath)
+
+      *filteredOutput <- &file
+    }
+
+    batchSize = 0
+    newOrUpdatedFiles.Close()
+  }
 
-        *output <- &file
+  for {
+    select {
+    case file, more := <- *allFiles:
+      if !more {
+        processBatch()
+        return
       }
 
-      newOrUpdatedFiles.Close()
+      batch[batchSize] = file
+      batchSize++
+
+      numFiltered++
+      if numFiltered % LOG_EVERY == 0 {
+        l.Verbose("[FILTER] Processed %d\n", numFiltered)
+      }
+
+      if batchSize >= BATCH_SIZE {
+        processBatch()
+      }
     }
   }
 }
@@ -189,27 +194,23 @@ func ScanDirectory(directory string) chan *File {
   db := database.GetConnection()
   l := logger.CreateLogger(config.GetConfig().LogLevel)
 
-  output := make(chan *File)
-  fileBatches := make(chan [BATCH_SIZE]*File)
+  filteredOutput := make(chan *File)
+  allFiles := make(chan *File)
 
   go func() {
-    maybeReadFile(db, l, &output, &fileBatches, directory)
+    batchFilterFiles(db, l, &filteredOutput, &allFiles, directory)
   }()
 
-  var inputBatch [BATCH_SIZE]*File
-  
   go func() {
     recursiveDirScan(
       db,
       l,
-      &inputBatch,
-      &fileBatches,
+      &allFiles,
       directory,
       "",
       true,
-      0,
     )
   }()
 
-  return output
+  return filteredOutput
 }