files.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package read
  2. import (
  3. "io/ioutil"
  4. "os"
  5. "path/filepath"
  6. "time"
  7. config "github.com/felamaslen/gmus-backend/pkg/config"
  8. "github.com/felamaslen/gmus-backend/pkg/database"
  9. "github.com/felamaslen/gmus-backend/pkg/logger"
  10. "github.com/felamaslen/gmus-backend/pkg/repository"
  11. "github.com/felamaslen/gmus-backend/pkg/types"
  12. "github.com/jmoiron/sqlx"
  13. "github.com/lib/pq"
  14. )
  15. func ReadMultipleFiles(basePath string, files chan *types.File) chan *types.Song {
  16. var db = database.GetConnection()
  17. var l = logger.CreateLogger(config.GetConfig().LogLevel)
  18. songs := make(chan *types.Song)
  19. go func() {
  20. defer func() {
  21. l.Verbose("[READ] Finished reading files")
  22. close(songs)
  23. }()
  24. for {
  25. select {
  26. case file, more := <-files:
  27. if more {
  28. l.Debug("[READ] %s\n", file.RelativePath)
  29. song, err := ReadFile(basePath, file)
  30. if err == nil {
  31. songs <- song
  32. } else {
  33. l.Error("[READ] Error (%s): %v\n", file.RelativePath, err)
  34. repository.InsertScanError(db, &types.ScanError{
  35. CreatedAt: time.Now(),
  36. BasePath: basePath,
  37. RelativePath: file.RelativePath,
  38. Error: err.Error(),
  39. })
  40. }
  41. } else {
  42. return
  43. }
  44. }
  45. }
  46. }()
  47. return songs
  48. }
  49. func isValidFile(file string) bool {
  50. // TODO: support FLAC/MP3
  51. return filepath.Ext(file) == ".ogg"
  52. }
  53. func GetFileInfo(file os.FileInfo, relativePath string) *types.File {
  54. if !isValidFile(file.Name()) {
  55. return nil
  56. }
  57. return &types.File{
  58. RelativePath: relativePath,
  59. ModifiedDate: file.ModTime().Unix(),
  60. }
  61. }
  62. // Utilities to aid directory reading
  63. func recursiveDirScan(
  64. db *sqlx.DB,
  65. l *logger.Logger,
  66. allFiles *chan *types.File,
  67. rootDirectory string,
  68. relativePath string,
  69. isRoot bool,
  70. ) {
  71. directoryToScan := filepath.Join(rootDirectory, relativePath)
  72. if isRoot {
  73. l.Verbose("[SCAN] (root): %s\n", directoryToScan)
  74. defer func() {
  75. l.Verbose("[SCAN] Finished scanning directory")
  76. close(*allFiles)
  77. }()
  78. } else {
  79. l.Debug("[SCAN] %s\n", directoryToScan)
  80. }
  81. files, err := ioutil.ReadDir(directoryToScan)
  82. if err != nil {
  83. l.Error("[SCAN] Error (%s): %v", directoryToScan, err)
  84. return // TODO: add this to a table of failed directories
  85. }
  86. for _, file := range files {
  87. fileRelativePath := filepath.Join(relativePath, file.Name())
  88. if file.IsDir() {
  89. recursiveDirScan(
  90. db,
  91. l,
  92. allFiles,
  93. rootDirectory,
  94. fileRelativePath,
  95. false,
  96. )
  97. } else {
  98. validFile := GetFileInfo(file, fileRelativePath)
  99. if validFile != nil {
  100. *allFiles <- validFile
  101. }
  102. }
  103. }
  104. }
  105. func batchFilterFiles(
  106. db *sqlx.DB,
  107. l *logger.Logger,
  108. filteredOutput *chan *types.File,
  109. allFiles *chan *types.File,
  110. basePath string,
  111. ) {
  112. defer close(*filteredOutput)
  113. var batch [BATCH_SIZE]*types.File
  114. var batchSize = 0
  115. var numFiltered = 0
  116. var processBatch = func() {
  117. if batchSize == 0 {
  118. return
  119. }
  120. l.Debug("[FILTER] Processing batch\n")
  121. var relativePaths pq.StringArray
  122. var modifiedDates pq.Int64Array
  123. for i := 0; i < batchSize; i++ {
  124. relativePaths = append(relativePaths, batch[i].RelativePath)
  125. modifiedDates = append(modifiedDates, batch[i].ModifiedDate)
  126. }
  127. newOrUpdatedFiles, err := repository.SelectNewOrUpdatedFiles(db, relativePaths, modifiedDates, basePath)
  128. if err != nil {
  129. l.Fatal("[FILTER] Fatal error! %v\n", err)
  130. }
  131. for newOrUpdatedFiles.Next() {
  132. var file types.File
  133. newOrUpdatedFiles.StructScan(&file)
  134. l.Verbose("[NEW] %s\n", file.RelativePath)
  135. *filteredOutput <- &file
  136. }
  137. batchSize = 0
  138. newOrUpdatedFiles.Close()
  139. }
  140. for {
  141. select {
  142. case file, more := <-*allFiles:
  143. if !more {
  144. processBatch()
  145. l.Verbose("[FILTER] Finished filtering %d files\n", numFiltered)
  146. return
  147. }
  148. batch[batchSize] = file
  149. batchSize++
  150. numFiltered++
  151. if numFiltered%LOG_EVERY == 0 {
  152. l.Verbose("[FILTER] Processed %d\n", numFiltered)
  153. }
  154. if batchSize >= BATCH_SIZE {
  155. processBatch()
  156. }
  157. }
  158. }
  159. }