files.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package read
  2. import (
  3. "io/ioutil"
  4. "path/filepath"
  5. config "github.com/felamaslen/go-music-player/pkg/config"
  6. "github.com/felamaslen/go-music-player/pkg/database"
  7. "github.com/felamaslen/go-music-player/pkg/logger"
  8. "github.com/jmoiron/sqlx"
  9. "github.com/lib/pq"
  10. )
  11. const BATCH_SIZE = 100
  12. func ReadMultipleFiles(basePath string, files chan *File) chan *Song {
  13. var l = logger.CreateLogger(config.GetConfig().LogLevel)
  14. songs := make(chan *Song)
  15. go func() {
  16. defer func() {
  17. l.Verbose("Finished reading files")
  18. close(songs)
  19. }()
  20. for {
  21. select {
  22. case file, more := <- files:
  23. if more {
  24. l.Verbose("Reading file: %s\n", file.RelativePath)
  25. song, err := ReadFile(basePath, file)
  26. if err == nil {
  27. songs <- song
  28. } else {
  29. l.Error("Error reading file (%s): %s\n", file.RelativePath, err)
  30. }
  31. } else {
  32. return
  33. }
  34. }
  35. }
  36. }()
  37. return songs
  38. }
  39. func isValidFile(file string) bool {
  40. // TODO: support FLAC/MP3
  41. return filepath.Ext(file) == ".ogg"
  42. }
  43. func recursiveDirScan(
  44. db *sqlx.DB,
  45. l *logger.Logger,
  46. inputBatch *[BATCH_SIZE]*File,
  47. fileBatches *chan [BATCH_SIZE]*File,
  48. rootDirectory string,
  49. relativePath string,
  50. isRoot bool,
  51. batchIndex int,
  52. ) int {
  53. directoryToScan := filepath.Join(rootDirectory, relativePath)
  54. if (isRoot) {
  55. l.Verbose("Scanning root directory: %s\n", directoryToScan)
  56. defer func() {
  57. var remainingItemsExist = batchIndex > 0
  58. if remainingItemsExist {
  59. *fileBatches <- *inputBatch
  60. }
  61. l.Verbose("Finished recursive directory scan")
  62. close(*fileBatches)
  63. }()
  64. } else {
  65. l.Debug("Scanning subdirectory: %s\n", directoryToScan)
  66. }
  67. files, err := ioutil.ReadDir(directoryToScan)
  68. if err != nil {
  69. l.Error("Error scanning directory: (%s): %s", directoryToScan, err)
  70. return batchIndex
  71. }
  72. for _, file := range(files) {
  73. fileRelativePath := filepath.Join(relativePath, file.Name())
  74. if file.IsDir() {
  75. batchIndex = recursiveDirScan(
  76. db,
  77. l,
  78. inputBatch,
  79. fileBatches,
  80. rootDirectory,
  81. fileRelativePath,
  82. false,
  83. batchIndex,
  84. )
  85. } else if isValidFile(file.Name()) {
  86. if batchIndex == BATCH_SIZE {
  87. *fileBatches <- *inputBatch
  88. batchIndex = 0
  89. }
  90. (*inputBatch)[batchIndex] = &File{
  91. RelativePath: fileRelativePath,
  92. ModifiedDate: file.ModTime().Unix(),
  93. }
  94. batchIndex++
  95. }
  96. }
  97. return batchIndex
  98. }
  99. func maybeReadFile(
  100. db *sqlx.DB,
  101. l *logger.Logger,
  102. output *chan *File,
  103. inputBatch *chan [BATCH_SIZE]*File,
  104. basePath string,
  105. ) {
  106. defer close(*output)
  107. for {
  108. select {
  109. case batch, more := <- *inputBatch:
  110. if !more {
  111. return
  112. }
  113. var relativePaths pq.StringArray
  114. var modifiedDates pq.Int64Array
  115. for _, s := range batch {
  116. if s == nil {
  117. break
  118. }
  119. relativePaths = append(relativePaths, s.RelativePath)
  120. modifiedDates = append(modifiedDates, s.ModifiedDate)
  121. }
  122. newOrUpdatedFiles, err := db.Queryx(
  123. `
  124. select r.relative_path, r.modified_date
  125. from (
  126. select * from unnest($1::varchar[], $2::bigint[])
  127. as t(relative_path, modified_date)
  128. ) r
  129. left join songs on
  130. songs.base_path = $3
  131. and songs.relative_path = r.relative_path
  132. and songs.modified_date = r.modified_date
  133. where songs.id is null
  134. `,
  135. relativePaths,
  136. modifiedDates,
  137. basePath,
  138. )
  139. if err != nil {
  140. l.Fatal("Error determining file eligibility: %v\n", err)
  141. }
  142. for newOrUpdatedFiles.Next() {
  143. var file File
  144. newOrUpdatedFiles.StructScan(&file)
  145. l.Verbose("New or updated file: %s\n", file.RelativePath)
  146. *output <- &file
  147. }
  148. newOrUpdatedFiles.Close()
  149. }
  150. }
  151. }
  152. func ScanDirectory(directory string) chan *File {
  153. db := database.GetConnection()
  154. l := logger.CreateLogger(config.GetConfig().LogLevel)
  155. output := make(chan *File)
  156. fileBatches := make(chan [BATCH_SIZE]*File)
  157. go func() {
  158. maybeReadFile(db, l, &output, &fileBatches, directory)
  159. }()
  160. var inputBatch [BATCH_SIZE]*File
  161. go func() {
  162. recursiveDirScan(
  163. db,
  164. l,
  165. &inputBatch,
  166. &fileBatches,
  167. directory,
  168. "",
  169. true,
  170. 0,
  171. )
  172. }()
  173. return output
  174. }