From 659fca62a80d41bdfcc61d0bac51113c181250a8 Mon Sep 17 00:00:00 2001
From: Tony Blyler <tony@blyler.cc>
Date: Mon, 16 May 2016 12:37:28 -0400
Subject: [PATCH] Fixes locked up downloads and enables removing torrents via
 .torrent removal

---
 queue/queue.go | 326 ++++++++++++++++++++++++++-----------------------
 1 file changed, 170 insertions(+), 156 deletions(-)

diff --git a/queue/queue.go b/queue/queue.go
index 70bdc53..b5be8d2 100644
--- a/queue/queue.go
+++ b/queue/queue.go
@@ -3,6 +3,7 @@ package queue
 import (
 	"bytes"
 	"errors"
+	"fmt"
 	"github.com/fsnotify/fsnotify"
 	"github.com/tblyler/easysftp"
 	"github.com/tblyler/go-rtorrent/rtorrent"
@@ -13,7 +14,6 @@ import (
 	"path/filepath"
 	"strconv"
 	"strings"
-	"sync"
 	"time"
 )
 
@@ -54,7 +54,6 @@ type Queue struct {
 	torrentList   map[string]rtorrent.Torrent
 	downloadQueue map[string]string
 	logger        *log.Logger
-	lock          sync.RWMutex
 }
 
 var prettyBytesValues = []float64{
@@ -182,10 +181,6 @@ func (q *Queue) updateTorrentList() error {
 }
 
 func (q *Queue) addTorrentFilePath(path string) error {
-	// lock for downloadQueue
-	q.lock.Lock()
-	defer q.lock.Unlock()
-
 	torrentData, err := ioutil.ReadFile(path)
 	if err != nil {
 		return err
@@ -203,18 +198,20 @@ func (q *Queue) addTorrentFilePath(path string) error {
 		return nil
 	}
 
-	return q.rtClient.AddTorrent(torrentData)
+	err = q.rtClient.AddTorrent(torrentData)
+	if err != nil {
+		return err
+	}
+
+	return q.updateTorrentList()
 }
 
 func (q *Queue) getFinishedTorrents() []rtorrent.Torrent {
-	q.lock.RLock()
 	torrents := []rtorrent.Torrent{}
 	for hash, torrentPath := range q.downloadQueue {
 		torrent, exists := q.torrentList[hash]
 		if !exists {
-			q.lock.RUnlock()
 			err := q.addTorrentFilePath(torrentPath)
-			q.lock.RLock()
 			if err == nil {
 				q.logger.Printf("Added torrent '%s' to rTorrent", torrentPath)
 			} else {
@@ -228,122 +225,73 @@ func (q *Queue) getFinishedTorrents() []rtorrent.Torrent {
 			continue
 		}
 
+		torrent.Hash = strings.ToUpper(torrent.Hash)
+
 		torrents = append(torrents, torrent)
 	}
 
 	return torrents
 }
 
-func (q *Queue) downloadTorrents(torrents []rtorrent.Torrent) {
-	q.lock.RLock()
-	defer q.lock.RUnlock()
-
-	// this will keep track of the torrent hashes that have finished
-	// will return an empty string on a failed download
-	done := make(chan string, q.config.ConcurrentDownloads)
-	var running uint
-	for _, torrent := range torrents {
-		if !torrent.Completed {
-			continue
-		}
-
-		torrent.Hash = strings.ToUpper(torrent.Hash)
-
-		torrentFilePath, exists := q.downloadQueue[torrent.Hash]
-		if !exists {
-			continue
-		}
-
-		if running >= q.config.ConcurrentDownloads {
-			finishedHash := <-done
-			if finishedHash != "" {
-				q.lock.RUnlock()
-				q.lock.Lock()
-				delete(q.downloadQueue, finishedHash)
-				q.lock.Unlock()
-				q.lock.RLock()
-			}
-
-			running--
-		}
-
-		var downloadPath string
-		destDownloadPath := q.config.WatchDownloadPaths[filepath.Dir(torrentFilePath)]
-
-		if q.config.TempDownloadPath == "" {
-			downloadPath = destDownloadPath
-		} else {
-			downloadPath = filepath.Join(q.config.TempDownloadPath, destDownloadPath)
-
-			if info, err := os.Stat(downloadPath); os.IsExist(err) {
-				if !info.IsDir() {
-					q.logger.Printf("Unable to downlaod to temp path '%s' since it is not a directory", downloadPath)
-					continue
-				}
-			}
-
-			err := os.MkdirAll(downloadPath, q.config.DownloadFileMode)
-			if err != nil {
-				q.logger.Printf("Unable to create temp download path '%s' error '%s'", downloadPath, err)
-				continue
-			}
-		}
-
-		go func(torrentFilePath string, downloadPath string, torrent rtorrent.Torrent) {
-			q.logger.Printf("Downloading '%s' (%s) to '%s' (%s) %s", torrent.Name, torrentFilePath, downloadPath, destDownloadPath, prettyBytes(float64(torrent.Size)))
-			err := q.sftpClient.Mirror(torrent.Path, downloadPath, q.config.ResumeDownloads)
-			if err != nil {
-				q.logger.Printf("Failed to download '%s' to '%s' error '%s'", torrent.Path, downloadPath, err)
-				done <- ""
-				return
-			}
-
-			// we need to move the downlaod from the temp directory to the destination
-			if downloadPath != destDownloadPath {
-				fileName := filepath.Base(torrent.Path)
-				downFullPath := filepath.Join(downloadPath, fileName)
-				destFullPath := filepath.Join(destDownloadPath, fileName)
-				err = os.Rename(downFullPath, destFullPath)
-				if err != nil {
-					q.logger.Printf("Failed to move temp path '%s' to dest path '%s' error '%s'", downFullPath, destFullPath, err)
-					done <- ""
-					return
-				}
-			}
-
-			parentTorrentPath := filepath.Dir(torrentFilePath)
-			if movePath, exists := q.config.FinishedTorrentFilePath[parentTorrentPath]; exists {
-				destFullPath := filepath.Join(movePath, filepath.Base(torrentFilePath))
-				err = os.Rename(torrentFilePath, destFullPath)
-				if err != nil {
-					q.logger.Printf("Failed to move torrent file from '%s' to '%s' error '%s'", torrentFilePath, destFullPath, err)
-				}
-			} else {
-				err = os.Remove(torrentFilePath)
-				if err != nil {
-					q.logger.Printf("Failed to remove torrent file '%s' error '%s'", torrentFilePath, err)
-				}
-			}
-
-			q.logger.Printf("Successfully downloaded '%s' (%s)", torrent.Name, torrentFilePath)
-
-			done <- torrent.Hash
-		}(torrentFilePath, downloadPath, torrent)
-		running++
+func (q *Queue) downloadTorrent(torrent rtorrent.Torrent, torrentFilePath string) error {
+	if !torrent.Completed {
+		return fmt.Errorf("'%s' is not a completed torrent, not downloading", torrentFilePath)
 	}
 
-	for running > 0 {
-		finishedHash := <-done
-		if finishedHash != "" {
-			q.lock.RUnlock()
-			q.lock.Lock()
-			delete(q.downloadQueue, finishedHash)
-			q.lock.Unlock()
-			q.lock.RLock()
+	var downloadPath string
+	destDownloadPath := q.config.WatchDownloadPaths[filepath.Dir(torrentFilePath)]
+
+	if q.config.TempDownloadPath == "" {
+		downloadPath = destDownloadPath
+	} else {
+		downloadPath = filepath.Join(q.config.TempDownloadPath, destDownloadPath)
+
+		if info, err := os.Stat(downloadPath); os.IsExist(err) {
+			if !info.IsDir() {
+				return fmt.Errorf("Unable to downlaod to temp path '%s' since it is not a directory", downloadPath)
+			}
 		}
 
-		running--
+		err := os.MkdirAll(downloadPath, q.config.DownloadFileMode)
+		if err != nil {
+			return fmt.Errorf("Unable to create temp download path '%s' error '%s'", downloadPath, err)
+		}
 	}
+
+	q.logger.Printf("Downloading '%s' (%s) to '%s' (%s) %s", torrent.Name, torrentFilePath, downloadPath, destDownloadPath, prettyBytes(float64(torrent.Size)))
+	err := q.sftpClient.Mirror(torrent.Path, downloadPath, q.config.ResumeDownloads)
+	if err != nil {
+		return fmt.Errorf("Failed to download '%s' to '%s' error '%s'", torrent.Path, downloadPath, err)
+	}
+
+	// we need to move the downlaod from the temp directory to the destination
+	if downloadPath != destDownloadPath {
+		fileName := filepath.Base(torrent.Path)
+		downFullPath := filepath.Join(downloadPath, fileName)
+		destFullPath := filepath.Join(destDownloadPath, fileName)
+		err = os.Rename(downFullPath, destFullPath)
+		if err != nil {
+			return fmt.Errorf("Failed to move temp path '%s' to dest path '%s' error '%s'", downFullPath, destFullPath, err)
+		}
+	}
+
+	parentTorrentPath := filepath.Dir(torrentFilePath)
+	if movePath, exists := q.config.FinishedTorrentFilePath[parentTorrentPath]; exists {
+		destFullPath := filepath.Join(movePath, filepath.Base(torrentFilePath))
+		err = os.Rename(torrentFilePath, destFullPath)
+		if err != nil && os.IsExist(err) {
+			return fmt.Errorf("Failed to move torrent file from '%s' to '%s' error '%s'", torrentFilePath, destFullPath, err)
+		}
+	} else {
+		err = os.Remove(torrentFilePath)
+		if err != nil && os.IsExist(err) {
+			return fmt.Errorf("Failed to remove torrent file '%s' error '%s'", torrentFilePath, err)
+		}
+	}
+
+	q.logger.Printf("Successfully downloaded '%s' (%s)", torrent.Name, torrentFilePath)
+
+	return nil
 }
 
 // Run executes all steps needed for looking at the queue, catching updates, and processing all work
@@ -373,55 +321,121 @@ func (q *Queue) Run(stop <-chan bool) {
 		}
 	}
 
-	go func() {
-		lastUpdateTime := time.Time{}
-		for {
-			if time.Now().Sub(lastUpdateTime) >= q.config.TorrentListUpdateInterval {
-				err := q.updateTorrentList()
-				if err == nil {
-					lastUpdateTime = time.Now()
-				} else {
-					q.logger.Printf("Failed to update torrent list from rTorrent: '%s'", err)
-				}
-			}
-
-			downloadTorrents := q.getFinishedTorrents()
-			if len(downloadTorrents) > 0 {
-				q.downloadTorrents(downloadTorrents)
-			}
-
-			time.Sleep(time.Second)
-		}
-	}()
-
 	// watch all directories for file changes
 	var err error
 	finished := false
-	for !finished {
-		select {
-		case event := <-q.fsWatcher.Events:
-			if event.Op&fsnotify.Write == fsnotify.Write ||
-				event.Op&fsnotify.Create == fsnotify.Create ||
-				event.Op&fsnotify.Rename == fsnotify.Rename {
-				// skip files that do not end with .torrent
-				if !strings.HasSuffix(event.Name, ".torrent") {
+	lastUpdateTime := time.Time{}
+	downloadedHashes := make(chan string, q.config.ConcurrentDownloads)
+	downloadsRunning := make(map[string]bool)
+	for {
+		cont := true
+		for cont {
+			select {
+			case event := <-q.fsWatcher.Events:
+				if event.Op&fsnotify.Write == fsnotify.Write ||
+					event.Op&fsnotify.Create == fsnotify.Create ||
+					event.Op&fsnotify.Rename == fsnotify.Rename {
+					// skip files that do not end with .torrent
+					if !strings.HasSuffix(event.Name, ".torrent") {
+						break
+					}
+
+					q.logger.Printf("Adding %s to download queue", event.Name)
+					err = q.addTorrentFilePath(event.Name)
+					if err == nil {
+						q.logger.Printf("Added torrent '%s' to rTorrent", event.Name)
+					} else {
+						q.logger.Printf("Failed to add '%s' error '%s'", event.Name, err)
+					}
+				} else if event.Op&fsnotify.Remove == fsnotify.Remove {
+					for hash, path := range q.downloadQueue {
+						if path == event.Name {
+							q.logger.Printf("Removing torrent '%s' from queue", path)
+							delete(q.downloadQueue, hash)
+							break
+						}
+					}
+				}
+				break
+
+			case err = <-q.fsWatcher.Errors:
+				q.logger.Printf("Error while watching folders '%s'", err)
+				cont = false
+				break
+
+			case <-stop:
+				finished = true
+				cont = false
+				break
+
+			default:
+				time.Sleep(time.Second)
+				cont = false
+				break
+			}
+		}
+
+		if finished {
+			break
+		}
+
+		if time.Now().Sub(lastUpdateTime) >= q.config.TorrentListUpdateInterval {
+			err := q.updateTorrentList()
+			if err == nil {
+				lastUpdateTime = time.Now()
+			} else {
+				q.logger.Printf("Failed to update torrent list from rTorrent: '%s'", err)
+			}
+		}
+
+		if len(downloadsRunning) > 0 {
+			cont = true
+			for cont {
+				select {
+				case finishedHash := <-downloadedHashes:
+					if finishedHash == "" {
+						break
+					}
+
+					if finishedHash[0] == '!' {
+						finishedHash = finishedHash[1:]
+					} else {
+						delete(q.downloadQueue, finishedHash)
+					}
+
+					delete(downloadsRunning, finishedHash)
+					break
+
+				default:
+					cont = false
 					break
 				}
+			}
+		}
 
-				q.logger.Printf("Adding %s to download queue", event.Name)
-				err = q.addTorrentFilePath(event.Name)
-				if err == nil {
-					q.logger.Printf("Added torrent '%s' to rTorrent", event.Name)
-				} else {
-					q.logger.Printf("Failed to add '%s' error '%s'", event.Name, err)
+		if uint(len(downloadsRunning)) < q.config.ConcurrentDownloads {
+			for _, torrent := range q.getFinishedTorrents() {
+				if _, exists := downloadsRunning[torrent.Hash]; exists {
+					continue
+				}
+
+				go func(torrent rtorrent.Torrent, torrentPath string, hashChan chan<- string) {
+					err := q.downloadTorrent(torrent, torrentPath)
+					if err != nil {
+						q.logger.Printf("Failed to download '%s' (%s): %s", torrent.Name, torrentPath, err)
+						hashChan <- "!" + torrent.Hash
+						return
+					}
+
+					hashChan <- torrent.Hash
+				}(torrent, q.downloadQueue[torrent.Hash], downloadedHashes)
+
+				downloadsRunning[torrent.Hash] = true
+
+				if uint(len(downloadsRunning)) == q.config.ConcurrentDownloads {
+					break
 				}
 			}
-
-		case err = <-q.fsWatcher.Errors:
-			q.logger.Printf("Error while watching folders '%s'", err)
-
-		case <-stop:
-			finished = true
 		}
 	}
 }