files.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package read
  2. import (
  3. "io/ioutil"
  4. "path/filepath"
  5. config "github.com/felamaslen/go-music-player/pkg/config"
  6. "github.com/felamaslen/go-music-player/pkg/database"
  7. "github.com/felamaslen/go-music-player/pkg/logger"
  8. "github.com/jmoiron/sqlx"
  9. "github.com/lib/pq"
  10. )
  11. const BATCH_SIZE = 100
  12. func ReadMultipleFiles(basePath string, files chan *File) chan *Song {
  13. var l = logger.CreateLogger(config.GetConfig().LogLevel)
  14. songs := make(chan *Song)
  15. go func() {
  16. defer func() {
  17. l.Verbose("Finished reading files")
  18. close(songs)
  19. }()
  20. for {
  21. select {
  22. case file, more := <- files:
  23. if more {
  24. l.Verbose("Reading file: %s\n", file.RelativePath)
  25. song, err := ReadFile(basePath, file)
  26. if err == nil {
  27. songs <- song
  28. } else {
  29. l.Error("Error reading file (%s): %s\n", file.RelativePath, err)
  30. }
  31. } else {
  32. return
  33. }
  34. }
  35. }
  36. }()
  37. return songs
  38. }
  39. func isValidFile(file string) bool {
  40. // TODO: support FLAC/MP3
  41. return filepath.Ext(file) == ".ogg"
  42. }
  43. func recursiveDirScan(
  44. db *sqlx.DB,
  45. l *logger.Logger,
  46. inputBatch *[BATCH_SIZE]*File,
  47. fileBatches *chan [BATCH_SIZE]*File,
  48. rootDirectory string,
  49. relativePath string,
  50. isRoot bool,
  51. batchIndex int,
  52. ) int {
  53. directoryToScan := filepath.Join(rootDirectory, relativePath)
  54. if (isRoot) {
  55. l.Verbose("Scanning root directory: %s\n", directoryToScan)
  56. defer func() {
  57. var remainingItemsExist = batchIndex > 0
  58. if remainingItemsExist {
  59. *fileBatches <- *inputBatch
  60. }
  61. l.Verbose("Finished recursive directory scan")
  62. close(*fileBatches)
  63. }()
  64. } else {
  65. l.Debug("Scanning subdirectory: %s\n", directoryToScan)
  66. }
  67. files, err := ioutil.ReadDir(directoryToScan)
  68. if err != nil {
  69. l.Fatal("Error scanning directory: (%s): %s", directoryToScan, err)
  70. }
  71. for _, file := range(files) {
  72. fileRelativePath := filepath.Join(relativePath, file.Name())
  73. if file.IsDir() {
  74. batchIndex = recursiveDirScan(
  75. db,
  76. l,
  77. inputBatch,
  78. fileBatches,
  79. rootDirectory,
  80. fileRelativePath,
  81. false,
  82. batchIndex,
  83. )
  84. } else if isValidFile(file.Name()) {
  85. if batchIndex == BATCH_SIZE {
  86. *fileBatches <- *inputBatch
  87. batchIndex = 0
  88. }
  89. (*inputBatch)[batchIndex] = &File{
  90. RelativePath: fileRelativePath,
  91. ModifiedDate: file.ModTime().Unix(),
  92. }
  93. batchIndex++
  94. }
  95. }
  96. return batchIndex
  97. }
  98. func maybeReadFile(
  99. db *sqlx.DB,
  100. l *logger.Logger,
  101. output *chan *File,
  102. inputBatch *chan [BATCH_SIZE]*File,
  103. basePath string,
  104. ) {
  105. defer close(*output)
  106. for {
  107. select {
  108. case batch, more := <- *inputBatch:
  109. if !more {
  110. return
  111. }
  112. var relativePaths pq.StringArray
  113. var modifiedDates pq.Int64Array
  114. for _, s := range batch {
  115. if s == nil {
  116. break
  117. }
  118. relativePaths = append(relativePaths, s.RelativePath)
  119. modifiedDates = append(modifiedDates, s.ModifiedDate)
  120. }
  121. newOrUpdatedFiles, err := db.Queryx(
  122. `
  123. select r.relative_path, r.modified_date
  124. from (
  125. select * from unnest($1::varchar[], $2::bigint[])
  126. as t(relative_path, modified_date)
  127. ) r
  128. left join songs on
  129. songs.base_path = $3
  130. and songs.relative_path = r.relative_path
  131. and songs.modified_date = r.modified_date
  132. where songs.id is null
  133. `,
  134. relativePaths,
  135. modifiedDates,
  136. basePath,
  137. )
  138. if err != nil {
  139. l.Fatal("Error determining file eligibility: %v\n", err)
  140. }
  141. for newOrUpdatedFiles.Next() {
  142. var file File
  143. newOrUpdatedFiles.StructScan(&file)
  144. l.Verbose("New or updated file: %s\n", file.RelativePath)
  145. *output <- &file
  146. }
  147. newOrUpdatedFiles.Close()
  148. }
  149. }
  150. }
  151. func ScanDirectory(directory string) chan *File {
  152. db := database.GetConnection()
  153. l := logger.CreateLogger(config.GetConfig().LogLevel)
  154. output := make(chan *File)
  155. fileBatches := make(chan [BATCH_SIZE]*File)
  156. go func() {
  157. maybeReadFile(db, l, &output, &fileBatches, directory)
  158. }()
  159. var inputBatch [BATCH_SIZE]*File
  160. go func() {
  161. recursiveDirScan(
  162. db,
  163. l,
  164. &inputBatch,
  165. &fileBatches,
  166. directory,
  167. "",
  168. true,
  169. 0,
  170. )
  171. }()
  172. return output
  173. }