diff --git a/queue/queue.go b/queue/queue.go index ee1acdd..3fa5c93 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -39,15 +39,14 @@ type Config struct { // uploads them to the given rTorrent server, // and then downloads them over SSH to the given download path. type Queue struct { - rtClient *rtorrent.RTorrent - sftpClient *easysftp.Client - fsWatcher *fsnotify.Watcher - config *Config - torrentList map[string]rtorrent.Torrent - torrentListUpdate time.Time - downloadQueue map[string]string - logger *log.Logger - lock sync.RWMutex + rtClient *rtorrent.RTorrent + sftpClient *easysftp.Client + fsWatcher *fsnotify.Watcher + config *Config + torrentList map[string]rtorrent.Torrent + downloadQueue map[string]string + logger *log.Logger + lock sync.RWMutex } // NewQueue establishes all connections and watchers @@ -138,7 +137,6 @@ func (q *Queue) updateTorrentList() error { } q.torrentList = torrentList - q.torrentListUpdate = time.Now() return nil } @@ -237,6 +235,7 @@ func (q *Queue) downloadTorrents(torrents []rtorrent.Torrent) { } go func(torrentFilePath string, downloadPath string, torrent rtorrent.Torrent) { + q.logger.Printf("Downloading '%s' (%s) to '%s'", torrent.Name, torrentFilePath, downloadPath) err := q.sftpClient.Mirror(torrent.Path, downloadPath) if err != nil { q.logger.Printf("Failed to download '%s' to '%s' error '%s'", torrent.Path, downloadPath, err) @@ -325,6 +324,26 @@ func (q *Queue) Run(stop <-chan bool) { } }() + go func() { + for { + downloadTorrents := []rtorrent.Torrent{} + for torrentHash := range q.downloadQueue { + torrent, exists := q.torrentList[torrentHash] + if !exists && !torrent.Completed { + continue + } + + downloadTorrents = append(downloadTorrents, torrent) + } + + if len(downloadTorrents) > 0 { + q.downloadTorrents(downloadTorrents) + } + + time.Sleep(time.Second) + } + }() + // watch all directories for file changes var err error finished := false