Fixes locked up downloads and enables removing torrents via .torrent removal

This commit is contained in:
Tony Blyler 2016-05-16 12:37:28 -04:00
parent 80929d25ed
commit 659fca62a8
No known key found for this signature in database
GPG key ID: 25C9D3A655D1A65C

View file

@ -3,6 +3,7 @@ package queue
import ( import (
"bytes" "bytes"
"errors" "errors"
"fmt"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"github.com/tblyler/easysftp" "github.com/tblyler/easysftp"
"github.com/tblyler/go-rtorrent/rtorrent" "github.com/tblyler/go-rtorrent/rtorrent"
@ -13,7 +14,6 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
) )
@ -54,7 +54,6 @@ type Queue struct {
torrentList map[string]rtorrent.Torrent torrentList map[string]rtorrent.Torrent
downloadQueue map[string]string downloadQueue map[string]string
logger *log.Logger logger *log.Logger
lock sync.RWMutex
} }
var prettyBytesValues = []float64{ var prettyBytesValues = []float64{
@ -182,10 +181,6 @@ func (q *Queue) updateTorrentList() error {
} }
func (q *Queue) addTorrentFilePath(path string) error { func (q *Queue) addTorrentFilePath(path string) error {
// lock for downloadQueue
q.lock.Lock()
defer q.lock.Unlock()
torrentData, err := ioutil.ReadFile(path) torrentData, err := ioutil.ReadFile(path)
if err != nil { if err != nil {
return err return err
@ -203,18 +198,20 @@ func (q *Queue) addTorrentFilePath(path string) error {
return nil return nil
} }
return q.rtClient.AddTorrent(torrentData) err = q.rtClient.AddTorrent(torrentData)
if err != nil {
return err
}
return q.updateTorrentList()
} }
func (q *Queue) getFinishedTorrents() []rtorrent.Torrent { func (q *Queue) getFinishedTorrents() []rtorrent.Torrent {
q.lock.RLock()
torrents := []rtorrent.Torrent{} torrents := []rtorrent.Torrent{}
for hash, torrentPath := range q.downloadQueue { for hash, torrentPath := range q.downloadQueue {
torrent, exists := q.torrentList[hash] torrent, exists := q.torrentList[hash]
if !exists { if !exists {
q.lock.RUnlock()
err := q.addTorrentFilePath(torrentPath) err := q.addTorrentFilePath(torrentPath)
q.lock.RLock()
if err == nil { if err == nil {
q.logger.Printf("Added torrent '%s' to rTorrent", torrentPath) q.logger.Printf("Added torrent '%s' to rTorrent", torrentPath)
} else { } else {
@ -228,43 +225,17 @@ func (q *Queue) getFinishedTorrents() []rtorrent.Torrent {
continue continue
} }
torrent.Hash = strings.ToUpper(torrent.Hash)
torrents = append(torrents, torrent) torrents = append(torrents, torrent)
} }
return torrents return torrents
} }
func (q *Queue) downloadTorrents(torrents []rtorrent.Torrent) { func (q *Queue) downloadTorrent(torrent rtorrent.Torrent, torrentFilePath string) error {
q.lock.RLock()
defer q.lock.RUnlock()
// this will keep track of the torrent hashes that have finished
// will return an empty string on a failed download
done := make(chan string, q.config.ConcurrentDownloads)
var running uint
for _, torrent := range torrents {
if !torrent.Completed { if !torrent.Completed {
continue return fmt.Errorf("'%s' is not a completed torrent, not downloading", torrentFilePath)
}
torrent.Hash = strings.ToUpper(torrent.Hash)
torrentFilePath, exists := q.downloadQueue[torrent.Hash]
if !exists {
continue
}
if running >= q.config.ConcurrentDownloads {
finishedHash := <-done
if finishedHash != "" {
q.lock.RUnlock()
q.lock.Lock()
delete(q.downloadQueue, finishedHash)
q.lock.Unlock()
q.lock.RLock()
}
running--
} }
var downloadPath string var downloadPath string
@ -277,25 +248,20 @@ func (q *Queue) downloadTorrents(torrents []rtorrent.Torrent) {
if info, err := os.Stat(downloadPath); os.IsExist(err) { if info, err := os.Stat(downloadPath); os.IsExist(err) {
if !info.IsDir() { if !info.IsDir() {
q.logger.Printf("Unable to downlaod to temp path '%s' since it is not a directory", downloadPath) return fmt.Errorf("Unable to downlaod to temp path '%s' since it is not a directory", downloadPath)
continue
} }
} }
err := os.MkdirAll(downloadPath, q.config.DownloadFileMode) err := os.MkdirAll(downloadPath, q.config.DownloadFileMode)
if err != nil { if err != nil {
q.logger.Printf("Unable to create temp download path '%s' error '%s'", downloadPath, err) return fmt.Errorf("Unable to create temp download path '%s' error '%s'", downloadPath, err)
continue
} }
} }
go func(torrentFilePath string, downloadPath string, torrent rtorrent.Torrent) {
q.logger.Printf("Downloading '%s' (%s) to '%s' (%s) %s", torrent.Name, torrentFilePath, downloadPath, destDownloadPath, prettyBytes(float64(torrent.Size))) q.logger.Printf("Downloading '%s' (%s) to '%s' (%s) %s", torrent.Name, torrentFilePath, downloadPath, destDownloadPath, prettyBytes(float64(torrent.Size)))
err := q.sftpClient.Mirror(torrent.Path, downloadPath, q.config.ResumeDownloads) err := q.sftpClient.Mirror(torrent.Path, downloadPath, q.config.ResumeDownloads)
if err != nil { if err != nil {
q.logger.Printf("Failed to download '%s' to '%s' error '%s'", torrent.Path, downloadPath, err) return fmt.Errorf("Failed to download '%s' to '%s' error '%s'", torrent.Path, downloadPath, err)
done <- ""
return
} }
// we need to move the downlaod from the temp directory to the destination // we need to move the downlaod from the temp directory to the destination
@ -305,9 +271,7 @@ func (q *Queue) downloadTorrents(torrents []rtorrent.Torrent) {
destFullPath := filepath.Join(destDownloadPath, fileName) destFullPath := filepath.Join(destDownloadPath, fileName)
err = os.Rename(downFullPath, destFullPath) err = os.Rename(downFullPath, destFullPath)
if err != nil { if err != nil {
q.logger.Printf("Failed to move temp path '%s' to dest path '%s' error '%s'", downFullPath, destFullPath, err) return fmt.Errorf("Failed to move temp path '%s' to dest path '%s' error '%s'", downFullPath, destFullPath, err)
done <- ""
return
} }
} }
@ -315,35 +279,19 @@ func (q *Queue) downloadTorrents(torrents []rtorrent.Torrent) {
if movePath, exists := q.config.FinishedTorrentFilePath[parentTorrentPath]; exists { if movePath, exists := q.config.FinishedTorrentFilePath[parentTorrentPath]; exists {
destFullPath := filepath.Join(movePath, filepath.Base(torrentFilePath)) destFullPath := filepath.Join(movePath, filepath.Base(torrentFilePath))
err = os.Rename(torrentFilePath, destFullPath) err = os.Rename(torrentFilePath, destFullPath)
if err != nil { if err != nil && os.IsExist(err) {
q.logger.Printf("Failed to move torrent file from '%s' to '%s' error '%s'", torrentFilePath, destFullPath, err) return fmt.Errorf("Failed to move torrent file from '%s' to '%s' error '%s'", torrentFilePath, destFullPath, err)
} }
} else { } else {
err = os.Remove(torrentFilePath) err = os.Remove(torrentFilePath)
if err != nil { if err != nil && os.IsExist(err) {
q.logger.Printf("Failed to remove torrent file '%s' error '%s'", torrentFilePath, err) return fmt.Errorf("Failed to remove torrent file '%s' error '%s'", torrentFilePath, err)
} }
} }
q.logger.Printf("Successfully downloaded '%s' (%s)", torrent.Name, torrentFilePath) q.logger.Printf("Successfully downloaded '%s' (%s)", torrent.Name, torrentFilePath)
done <- torrent.Hash return nil
}(torrentFilePath, downloadPath, torrent)
running++
}
for running > 0 {
finishedHash := <-done
if finishedHash != "" {
q.lock.RUnlock()
q.lock.Lock()
delete(q.downloadQueue, finishedHash)
q.lock.Unlock()
q.lock.RLock()
}
running--
}
} }
// Run executes all steps needed for looking at the queue, catching updates, and processing all work // Run executes all steps needed for looking at the queue, catching updates, and processing all work
@ -373,31 +321,15 @@ func (q *Queue) Run(stop <-chan bool) {
} }
} }
go func() {
lastUpdateTime := time.Time{}
for {
if time.Now().Sub(lastUpdateTime) >= q.config.TorrentListUpdateInterval {
err := q.updateTorrentList()
if err == nil {
lastUpdateTime = time.Now()
} else {
q.logger.Printf("Failed to update torrent list from rTorrent: '%s'", err)
}
}
downloadTorrents := q.getFinishedTorrents()
if len(downloadTorrents) > 0 {
q.downloadTorrents(downloadTorrents)
}
time.Sleep(time.Second)
}
}()
// watch all directories for file changes // watch all directories for file changes
var err error var err error
finished := false finished := false
for !finished { lastUpdateTime := time.Time{}
downloadedHashes := make(chan string, q.config.ConcurrentDownloads)
downloadsRunning := make(map[string]bool)
for {
cont := true
for cont {
select { select {
case event := <-q.fsWatcher.Events: case event := <-q.fsWatcher.Events:
if event.Op&fsnotify.Write == fsnotify.Write || if event.Op&fsnotify.Write == fsnotify.Write ||
@ -415,13 +347,95 @@ func (q *Queue) Run(stop <-chan bool) {
} else { } else {
q.logger.Printf("Failed to add '%s' error '%s'", event.Name, err) q.logger.Printf("Failed to add '%s' error '%s'", event.Name, err)
} }
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
for hash, path := range q.downloadQueue {
if path == event.Name {
q.logger.Printf("Removing torrent '%s' from queue", path)
delete(q.downloadQueue, hash)
break
} }
}
}
break
case err = <-q.fsWatcher.Errors: case err = <-q.fsWatcher.Errors:
q.logger.Printf("Error while watching folders '%s'", err) q.logger.Printf("Error while watching folders '%s'", err)
cont = false
break
case <-stop: case <-stop:
finished = true finished = true
cont = false
break
default:
time.Sleep(time.Second)
cont = false
break
}
}
if finished {
break
}
if time.Now().Sub(lastUpdateTime) >= q.config.TorrentListUpdateInterval {
err := q.updateTorrentList()
if err == nil {
lastUpdateTime = time.Now()
} else {
q.logger.Printf("Failed to update torrent list from rTorrent: '%s'", err)
}
}
if len(downloadsRunning) > 0 {
cont = true
for cont {
select {
case finishedHash := <-downloadedHashes:
if finishedHash == "" {
break
}
if finishedHash[0] == '!' {
finishedHash = finishedHash[1:]
} else {
delete(q.downloadQueue, finishedHash)
}
delete(downloadsRunning, finishedHash)
break
default:
cont = false
break
}
}
}
if uint(len(downloadsRunning)) < q.config.ConcurrentDownloads {
for _, torrent := range q.getFinishedTorrents() {
if _, exists := downloadsRunning[torrent.Hash]; exists {
continue
}
go func(torrent rtorrent.Torrent, torrentPath string, hashChan chan<- string) {
err := q.downloadTorrent(torrent, torrentPath)
if err != nil {
q.logger.Printf("Failed to download '%s' (%s): %s", torrent.Name, torrentPath, err)
hashChan <- "!" + torrent.Hash
return
}
hashChan <- torrent.Hash
}(torrent, q.downloadQueue[torrent.Hash], downloadedHashes)
downloadsRunning[torrent.Hash] = true
if uint(len(downloadsRunning)) == q.config.ConcurrentDownloads {
break
}
}
} }
} }
} }