files.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package read
  2. import (
  3. "io/ioutil"
  4. "path/filepath"
  5. config "github.com/felamaslen/gmus-backend/pkg/config"
  6. "github.com/felamaslen/gmus-backend/pkg/database"
  7. "github.com/felamaslen/gmus-backend/pkg/logger"
  8. "github.com/jmoiron/sqlx"
  9. "github.com/lib/pq"
  10. )
  11. const BATCH_SIZE = 100
  12. const LOG_EVERY = 100
  13. func ReadMultipleFiles(basePath string, files chan *File) chan *Song {
  14. var l = logger.CreateLogger(config.GetConfig().LogLevel)
  15. songs := make(chan *Song)
  16. go func() {
  17. defer func() {
  18. l.Verbose("[READ] Finished reading files")
  19. close(songs)
  20. }()
  21. for {
  22. select {
  23. case file, more := <-files:
  24. if more {
  25. l.Debug("[READ] %s\n", file.RelativePath)
  26. song, err := ReadFile(basePath, file)
  27. if err == nil {
  28. songs <- song
  29. } else {
  30. l.Error("[READ] Error (%s): %v\n", file.RelativePath, err)
  31. }
  32. } else {
  33. return
  34. }
  35. }
  36. }
  37. }()
  38. return songs
  39. }
  40. func isValidFile(file string) bool {
  41. // TODO: support FLAC/MP3
  42. return filepath.Ext(file) == ".ogg"
  43. }
  44. func recursiveDirScan(
  45. db *sqlx.DB,
  46. l *logger.Logger,
  47. allFiles *chan *File,
  48. rootDirectory string,
  49. relativePath string,
  50. isRoot bool,
  51. ) {
  52. directoryToScan := filepath.Join(rootDirectory, relativePath)
  53. if isRoot {
  54. l.Verbose("[SCAN] (root): %s\n", directoryToScan)
  55. defer func() {
  56. l.Verbose("[SCAN] Finished scanning directory")
  57. close(*allFiles)
  58. }()
  59. } else {
  60. l.Debug("[SCAN] %s\n", directoryToScan)
  61. }
  62. files, err := ioutil.ReadDir(directoryToScan)
  63. if err != nil {
  64. l.Error("[SCAN] Error (%s): %v", directoryToScan, err)
  65. return // TODO: add this to a table of failed directories
  66. }
  67. for _, file := range files {
  68. fileRelativePath := filepath.Join(relativePath, file.Name())
  69. if file.IsDir() {
  70. recursiveDirScan(
  71. db,
  72. l,
  73. allFiles,
  74. rootDirectory,
  75. fileRelativePath,
  76. false,
  77. )
  78. } else if isValidFile(file.Name()) {
  79. *allFiles <- &File{
  80. RelativePath: fileRelativePath,
  81. ModifiedDate: file.ModTime().Unix(),
  82. }
  83. }
  84. }
  85. }
  86. func batchFilterFiles(
  87. db *sqlx.DB,
  88. l *logger.Logger,
  89. filteredOutput *chan *File,
  90. allFiles *chan *File,
  91. basePath string,
  92. ) {
  93. defer close(*filteredOutput)
  94. var batch [BATCH_SIZE]*File
  95. var batchSize = 0
  96. var numFiltered = 0
  97. var processBatch = func() {
  98. if batchSize == 0 {
  99. return
  100. }
  101. l.Debug("[FILTER] Processing batch\n")
  102. var relativePaths pq.StringArray
  103. var modifiedDates pq.Int64Array
  104. for i := 0; i < batchSize; i++ {
  105. relativePaths = append(relativePaths, batch[i].RelativePath)
  106. modifiedDates = append(modifiedDates, batch[i].ModifiedDate)
  107. }
  108. newOrUpdatedFiles, err := db.Queryx(
  109. `
  110. select r.relative_path, r.modified_date
  111. from (
  112. select * from unnest($1::varchar[], $2::bigint[])
  113. as t(relative_path, modified_date)
  114. ) r
  115. left join songs on
  116. songs.base_path = $3
  117. and songs.relative_path = r.relative_path
  118. and songs.modified_date = r.modified_date
  119. where songs.id is null
  120. `,
  121. relativePaths,
  122. modifiedDates,
  123. basePath,
  124. )
  125. if err != nil {
  126. l.Fatal("[FILTER] Fatal error! %v\n", err)
  127. }
  128. for newOrUpdatedFiles.Next() {
  129. var file File
  130. newOrUpdatedFiles.StructScan(&file)
  131. l.Verbose("[NEW] %s\n", file.RelativePath)
  132. *filteredOutput <- &file
  133. }
  134. batchSize = 0
  135. newOrUpdatedFiles.Close()
  136. }
  137. for {
  138. select {
  139. case file, more := <-*allFiles:
  140. if !more {
  141. processBatch()
  142. l.Verbose("[FILTER] Finished filtering %d files\n", numFiltered)
  143. return
  144. }
  145. batch[batchSize] = file
  146. batchSize++
  147. numFiltered++
  148. if numFiltered%LOG_EVERY == 0 {
  149. l.Verbose("[FILTER] Processed %d\n", numFiltered)
  150. }
  151. if batchSize >= BATCH_SIZE {
  152. processBatch()
  153. }
  154. }
  155. }
  156. }
  157. func ScanDirectory(directory string) chan *File {
  158. db := database.GetConnection()
  159. l := logger.CreateLogger(config.GetConfig().LogLevel)
  160. filteredOutput := make(chan *File)
  161. allFiles := make(chan *File)
  162. go func() {
  163. batchFilterFiles(db, l, &filteredOutput, &allFiles, directory)
  164. }()
  165. go func() {
  166. recursiveDirScan(
  167. db,
  168. l,
  169. &allFiles,
  170. directory,
  171. "",
  172. true,
  173. )
  174. }()
  175. return filteredOutput
  176. }