files.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package read
  2. import (
  3. "io/ioutil"
  4. "path/filepath"
  5. "time"
  6. config "github.com/felamaslen/gmus-backend/pkg/config"
  7. "github.com/felamaslen/gmus-backend/pkg/database"
  8. "github.com/felamaslen/gmus-backend/pkg/logger"
  9. "github.com/felamaslen/gmus-backend/pkg/repository"
  10. "github.com/felamaslen/gmus-backend/pkg/types"
  11. "github.com/jmoiron/sqlx"
  12. "github.com/lib/pq"
  13. )
  14. const BATCH_SIZE = 100
  15. const LOG_EVERY = 100
  16. func ReadMultipleFiles(basePath string, files chan *types.File) chan *types.Song {
  17. var db = database.GetConnection()
  18. var l = logger.CreateLogger(config.GetConfig().LogLevel)
  19. songs := make(chan *types.Song)
  20. go func() {
  21. defer func() {
  22. l.Verbose("[READ] Finished reading files")
  23. close(songs)
  24. }()
  25. for {
  26. select {
  27. case file, more := <-files:
  28. if more {
  29. l.Debug("[READ] %s\n", file.RelativePath)
  30. song, err := ReadFile(basePath, file)
  31. if err == nil {
  32. songs <- song
  33. } else {
  34. l.Error("[READ] Error (%s): %v\n", file.RelativePath, err)
  35. repository.InsertScanError(db, &types.ScanError{
  36. CreatedAt: time.Now(),
  37. BasePath: basePath,
  38. RelativePath: file.RelativePath,
  39. Error: err.Error(),
  40. })
  41. }
  42. } else {
  43. return
  44. }
  45. }
  46. }
  47. }()
  48. return songs
  49. }
  50. func isValidFile(file string) bool {
  51. // TODO: support FLAC/MP3
  52. return filepath.Ext(file) == ".ogg"
  53. }
  54. func recursiveDirScan(
  55. db *sqlx.DB,
  56. l *logger.Logger,
  57. allFiles *chan *types.File,
  58. rootDirectory string,
  59. relativePath string,
  60. isRoot bool,
  61. ) {
  62. directoryToScan := filepath.Join(rootDirectory, relativePath)
  63. if isRoot {
  64. l.Verbose("[SCAN] (root): %s\n", directoryToScan)
  65. defer func() {
  66. l.Verbose("[SCAN] Finished scanning directory")
  67. close(*allFiles)
  68. }()
  69. } else {
  70. l.Debug("[SCAN] %s\n", directoryToScan)
  71. }
  72. files, err := ioutil.ReadDir(directoryToScan)
  73. if err != nil {
  74. l.Error("[SCAN] Error (%s): %v", directoryToScan, err)
  75. return // TODO: add this to a table of failed directories
  76. }
  77. for _, file := range files {
  78. fileRelativePath := filepath.Join(relativePath, file.Name())
  79. if file.IsDir() {
  80. recursiveDirScan(
  81. db,
  82. l,
  83. allFiles,
  84. rootDirectory,
  85. fileRelativePath,
  86. false,
  87. )
  88. } else if isValidFile(file.Name()) {
  89. *allFiles <- &types.File{
  90. RelativePath: fileRelativePath,
  91. ModifiedDate: file.ModTime().Unix(),
  92. }
  93. }
  94. }
  95. }
  96. func batchFilterFiles(
  97. db *sqlx.DB,
  98. l *logger.Logger,
  99. filteredOutput *chan *types.File,
  100. allFiles *chan *types.File,
  101. basePath string,
  102. ) {
  103. defer close(*filteredOutput)
  104. var batch [BATCH_SIZE]*types.File
  105. var batchSize = 0
  106. var numFiltered = 0
  107. var processBatch = func() {
  108. if batchSize == 0 {
  109. return
  110. }
  111. l.Debug("[FILTER] Processing batch\n")
  112. var relativePaths pq.StringArray
  113. var modifiedDates pq.Int64Array
  114. for i := 0; i < batchSize; i++ {
  115. relativePaths = append(relativePaths, batch[i].RelativePath)
  116. modifiedDates = append(modifiedDates, batch[i].ModifiedDate)
  117. }
  118. newOrUpdatedFiles, err := repository.SelectNewOrUpdatedFiles(db, relativePaths, modifiedDates, basePath)
  119. if err != nil {
  120. l.Fatal("[FILTER] Fatal error! %v\n", err)
  121. }
  122. for newOrUpdatedFiles.Next() {
  123. var file types.File
  124. newOrUpdatedFiles.StructScan(&file)
  125. l.Verbose("[NEW] %s\n", file.RelativePath)
  126. *filteredOutput <- &file
  127. }
  128. batchSize = 0
  129. newOrUpdatedFiles.Close()
  130. }
  131. for {
  132. select {
  133. case file, more := <-*allFiles:
  134. if !more {
  135. processBatch()
  136. l.Verbose("[FILTER] Finished filtering %d files\n", numFiltered)
  137. return
  138. }
  139. batch[batchSize] = file
  140. batchSize++
  141. numFiltered++
  142. if numFiltered%LOG_EVERY == 0 {
  143. l.Verbose("[FILTER] Processed %d\n", numFiltered)
  144. }
  145. if batchSize >= BATCH_SIZE {
  146. processBatch()
  147. }
  148. }
  149. }
  150. }
  151. func ScanDirectory(directory string) chan *types.File {
  152. db := database.GetConnection()
  153. l := logger.CreateLogger(config.GetConfig().LogLevel)
  154. filteredOutput := make(chan *types.File)
  155. allFiles := make(chan *types.File)
  156. go func() {
  157. batchFilterFiles(db, l, &filteredOutput, &allFiles, directory)
  158. }()
  159. go func() {
  160. recursiveDirScan(
  161. db,
  162. l,
  163. &allFiles,
  164. directory,
  165. "",
  166. true,
  167. )
  168. }()
  169. return filteredOutput
  170. }