Add the ability to do everything in Go
This commit is contained in:
parent
d088e413b7
commit
a6dfb7e698
6 changed files with 476 additions and 258 deletions
3
.gitmodules
vendored
3
.gitmodules
vendored
|
@ -1,3 +0,0 @@
|
||||||
[submodule "metainfo/vendor/github.com/anacrolix/torrent"]
|
|
||||||
path = metainfo/vendor/github.com/anacrolix/torrent
|
|
||||||
url = ssh://git@github.com/anacrolix/torrent.git
|
|
80
README.md
80
README.md
|
@ -1,19 +1,69 @@
|
||||||
Hoarder
|
# Hoarder
|
||||||
========
|
|
||||||
Uploads .torrent files from a local "blackhole" to a remote (SSH) rtorrent watch folder. From there, rtorrent is polled over XMLRPC as to whether the torrent is completed. Finally, the files are downloaded over a multithreaded SSH connection and saved to the local machine. The blackhole is used as a queue and will have its .torrent files deleted.
|
Uploads .torrent files from a local "blackhole" to a remote (SSH) rtorrent watch folder. From there, rtorrent is polled over XMLRPC as to whether the torrent is completed. Finally, the files are downloaded over a multithreaded SSH connection and saved to the local machine. The blackhole is used as a queue and will have its .torrent files deleted.
|
||||||
|
|
||||||
Requirements
|
# Installation
|
||||||
------------
|
## Manual
|
||||||
* bash >= 4.0 (support for associative arrays)
|
1. Install [Go](https://golang.org) for your Operating System
|
||||||
* python2
|
2. Run `$ go get -u github.com/tblyler/hoarder/cmd`
|
||||||
* curl
|
3. If your `GOPATH` is in your `PATH`, run `$ hoarder -config $PATH_TO_HOARDER_CONF`
|
||||||
* rsync
|
|
||||||
* scp
|
|
||||||
|
|
||||||
Configuration
|
# Configuration
|
||||||
-------------
|
## Example
|
||||||
Edit the variables at the top of hoarder.sh to your liking.
|
Ignore the keys that start with an underscore, they are comments.
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"_rtorrent_addr": "The address to the rtorrent XMLRPC endpoint",
|
||||||
|
"rtorrent_addr": "mycoolrtorrentserver.com/XMLRPC",
|
||||||
|
|
||||||
Running
|
"_rtorrent_insecure_cert": "true to ignore the certificate authenticity; false to honor it",
|
||||||
-------
|
"rtorrent_insecure_cert": false,
|
||||||
Run hoarder.sh with bash
|
|
||||||
|
"_torrent_username": "The HTTP Basic auth username to use for rtorrent's XMLRPC",
|
||||||
|
"rtorrent_username": "JohnDoe",
|
||||||
|
|
||||||
|
"_rtorrent_password": "The HTTP Basic auth password to use for rtorrent's XMLRPC",
|
||||||
|
"rtorrent_password": "correct horse battery staple",
|
||||||
|
|
||||||
|
"_ssh_username": "The ssh username to use for getting finished torrents from the remote host",
|
||||||
|
"ssh_username": "JohnDoe"
|
||||||
|
|
||||||
|
"_SSH_AUTH_COMMENT": "You may choose to use an ssh key or ssh password. If both are supplied, the password will not be used.",
|
||||||
|
|
||||||
|
"_ssh_password": "The SSH password to use for SSH authentication",
|
||||||
|
"ssh_password": "correct horse battery staple SSH",
|
||||||
|
|
||||||
|
"_ssh_privkey_path": "The path to the private SSH key for SSH authentication",
|
||||||
|
"ssh_privkey_path": "/home/tblyler/.ssh/id_rsa",
|
||||||
|
|
||||||
|
"_ssh_addr": "The SSH address to connect to",
|
||||||
|
"ssh_addr": "mysshserver.com:22",
|
||||||
|
|
||||||
|
"_ssh_connect_timeout": "The time in nano seconds to wait for an SSH connection attempt",
|
||||||
|
"ssh_connect_timeout": 30000000000,
|
||||||
|
|
||||||
|
"_file_download_filemode": "The base 10 file mode to use for downloaded files",
|
||||||
|
"file_download_filemode": 511,
|
||||||
|
|
||||||
|
"_watch_to_download_paths": "The correlation of .torrent file paths and where their contents should be downloaded to",
|
||||||
|
"watch_to_download_paths": {
|
||||||
|
"/home/tblyler/torrent_files/tv": "/home/tblyler/Downloads/tv",
|
||||||
|
"/home/tblyler/torrent_files/movies": "/home/tblyler/Downloads/movies",
|
||||||
|
"/home/tblyler/torrent_files": "/home/tblyler/Downloads"
|
||||||
|
},
|
||||||
|
|
||||||
|
"_temp_download_path": "The root path to temporarily download to and then move to the folder in the setting above. The destination path is created underneath the temp_download_path",
|
||||||
|
"temp_download_path": "/home/tblyler/tempDownloads",
|
||||||
|
|
||||||
|
"_watch_to_finish_path": "If defined, the finished .torrent files finished are moved to their respected path here. Otherwise, they are deleted.",
|
||||||
|
"watch_to_finish_path": {
|
||||||
|
"/home/tblyler/torrent_files/tv": "/home/tblyler/completed_torrent_files/tv",
|
||||||
|
"/home/tblyler/torrent_files": "/home/tblyler/completed_torrent_files"
|
||||||
|
},
|
||||||
|
|
||||||
|
"_rtorrent_update_interval": "The time in nano seconds to update the list of torrents and their statuses in rTorrent",
|
||||||
|
"rtorrent_update_interval": 300000000000,
|
||||||
|
|
||||||
|
"_download_jobs": "The number of concurrent download streams to have at one time",
|
||||||
|
"download_jobs": 2
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
57
cmd/hoarder/main.go
Normal file
57
cmd/hoarder/main.go
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
|
"github.com/tblyler/hoarder/queue"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
configPath := flag.String("config", "", "path to the config file")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
logger := log.New(os.Stdout, "hoarder", log.LstdFlags)
|
||||||
|
|
||||||
|
if *configPath == "" {
|
||||||
|
logger.Println("Missing config path")
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
configRaw, err := ioutil.ReadFile(*configPath)
|
||||||
|
if err != nil {
|
||||||
|
logger.Printf("Failed to read config file '%s': '%s'", *configPath, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
config := &queue.Config{}
|
||||||
|
err = json.Unmarshal(configRaw, config)
|
||||||
|
if err != nil {
|
||||||
|
logger.Printf("Unable to decode config json at '%s': '%s'", *configPath, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
q, err := queue.NewQueue(config, logger)
|
||||||
|
if err != nil {
|
||||||
|
logger.Printf("Failed to start hoarder: '%s'", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
stop := make(chan bool)
|
||||||
|
done := make(chan bool)
|
||||||
|
go func() {
|
||||||
|
q.Run(stop)
|
||||||
|
done <- true
|
||||||
|
}()
|
||||||
|
|
||||||
|
sigChan := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigChan, os.Interrupt)
|
||||||
|
|
||||||
|
sig := <-sigChan
|
||||||
|
logger.Println("Got signal ", sig, " quitting")
|
||||||
|
stop <- true
|
||||||
|
<-done
|
||||||
|
}
|
|
@ -1,239 +0,0 @@
|
||||||
package easysftp
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"github.com/pkg/sftp"
|
|
||||||
"golang.org/x/crypto/ssh"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ClientConfig maintains all of the configuration info to connect to a SSH host
|
|
||||||
type ClientConfig struct {
|
|
||||||
Username string
|
|
||||||
Host string
|
|
||||||
KeyPath string
|
|
||||||
Password string
|
|
||||||
Timeout time.Duration
|
|
||||||
FileMode os.FileMode
|
|
||||||
}
|
|
||||||
|
|
||||||
// Client communicates with the SFTP to download files/pathes
|
|
||||||
type Client struct {
|
|
||||||
sshClient *ssh.Client
|
|
||||||
config *ClientConfig
|
|
||||||
}
|
|
||||||
|
|
||||||
// Connect to a host with this given config
|
|
||||||
func Connect(config *ClientConfig) (*Client, error) {
|
|
||||||
var auth []ssh.AuthMethod
|
|
||||||
if config.KeyPath != "" {
|
|
||||||
privKey, err := ioutil.ReadFile(config.KeyPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
signer, err := ssh.ParsePrivateKey(privKey)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
auth = append(auth, ssh.PublicKeys(signer))
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(auth) == 0 {
|
|
||||||
if config.Password == "" {
|
|
||||||
return nil, errors.New("Missing password or key for SSH authentication")
|
|
||||||
}
|
|
||||||
|
|
||||||
auth = append(auth, ssh.Password(config.Password))
|
|
||||||
}
|
|
||||||
|
|
||||||
sshClient, err := ssh.Dial("tcp", config.Host, &ssh.ClientConfig{
|
|
||||||
User: config.Username,
|
|
||||||
Auth: auth,
|
|
||||||
Timeout: config.Timeout,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Client{
|
|
||||||
sshClient: sshClient,
|
|
||||||
config: config,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close the underlying SSH conection
|
|
||||||
func (c *Client) Close() error {
|
|
||||||
return c.sshClient.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) newSftpClient() (*sftp.Client, error) {
|
|
||||||
return sftp.NewClient(c.sshClient)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stat gets information for the given path
|
|
||||||
func (c *Client) Stat(path string) (os.FileInfo, error) {
|
|
||||||
sftpClient, err := c.newSftpClient()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
defer sftpClient.Close()
|
|
||||||
|
|
||||||
return sftpClient.Stat(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Lstat gets information for the given path, if it is a symbolic link, it will describe the symbolic link
|
|
||||||
func (c *Client) Lstat(path string) (os.FileInfo, error) {
|
|
||||||
sftpClient, err := c.newSftpClient()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
defer sftpClient.Close()
|
|
||||||
|
|
||||||
return sftpClient.Lstat(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Download a file from the given path to the output writer
|
|
||||||
func (c *Client) Download(path string, output io.Writer) error {
|
|
||||||
sftpClient, err := c.newSftpClient()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
defer sftpClient.Close()
|
|
||||||
|
|
||||||
info, err := sftpClient.Stat(path)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if info.IsDir() {
|
|
||||||
return errors.New("Unable to use easysftp.Client.Download for dir: " + path)
|
|
||||||
}
|
|
||||||
|
|
||||||
remote, err := sftpClient.Open(path)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
defer remote.Close()
|
|
||||||
|
|
||||||
_, err = io.Copy(output, remote)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mirror downloads an entire folder (recursively) or file underneath the given localParentPath
|
|
||||||
func (c *Client) Mirror(path string, localParentPath string) error {
|
|
||||||
sftpClient, err := c.newSftpClient()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
defer sftpClient.Close()
|
|
||||||
|
|
||||||
info, err := sftpClient.Stat(path)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// download the file
|
|
||||||
if !info.IsDir() {
|
|
||||||
sftpClient.Close()
|
|
||||||
localPath := filepath.Join(localParentPath, info.Name())
|
|
||||||
localInfo, err := os.Stat(localPath)
|
|
||||||
if os.IsExist(err) && localInfo.IsDir() {
|
|
||||||
err = os.RemoveAll(localPath)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
file, err := os.OpenFile(
|
|
||||||
localPath,
|
|
||||||
os.O_RDWR|os.O_CREATE|os.O_TRUNC,
|
|
||||||
c.config.FileMode,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
defer file.Close()
|
|
||||||
|
|
||||||
return c.Download(path, file)
|
|
||||||
}
|
|
||||||
|
|
||||||
// download the whole directory recursively
|
|
||||||
walker := sftpClient.Walk(path)
|
|
||||||
remoteParentPath := filepath.Dir(path)
|
|
||||||
for walker.Step() {
|
|
||||||
if err := walker.Err(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
info := walker.Stat()
|
|
||||||
|
|
||||||
relPath, err := filepath.Rel(remoteParentPath, walker.Path())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
localPath := filepath.Join(localParentPath, relPath)
|
|
||||||
|
|
||||||
// if we have something at the download path delete it if it is a directory
|
|
||||||
// and the remote is a file and vice a versa
|
|
||||||
localInfo, err := os.Stat(localPath)
|
|
||||||
if os.IsExist(err) {
|
|
||||||
if localInfo.IsDir() {
|
|
||||||
if info.IsDir() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
err = os.RemoveAll(localPath)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
} else if info.IsDir() {
|
|
||||||
err = os.Remove(localPath)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if info.IsDir() {
|
|
||||||
err = os.MkdirAll(localPath, c.config.FileMode)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
remoteFile, err := sftpClient.Open(walker.Path())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
localFile, err := os.OpenFile(localPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, c.config.FileMode)
|
|
||||||
if err != nil {
|
|
||||||
remoteFile.Close()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = io.Copy(localFile, remoteFile)
|
|
||||||
remoteFile.Close()
|
|
||||||
localFile.Close()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
1
metainfo/vendor/github.com/anacrolix/torrent
generated
vendored
1
metainfo/vendor/github.com/anacrolix/torrent
generated
vendored
|
@ -1 +0,0 @@
|
||||||
Subproject commit dcfee93f96d231b5590f991313b5d9f925757f52
|
|
354
queue/queue.go
Normal file
354
queue/queue.go
Normal file
|
@ -0,0 +1,354 @@
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"github.com/fsnotify/fsnotify"
|
||||||
|
"github.com/tblyler/easysftp"
|
||||||
|
"github.com/tblyler/go-rtorrent/rtorrent"
|
||||||
|
"github.com/tblyler/hoarder/metainfo"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Config defines the settings for watching, uploading, and downloading
|
||||||
|
type Config struct {
|
||||||
|
RtorrentAddr string `json:"rtorrent_addr"`
|
||||||
|
RtorrentInsecureCert bool `json:"rtorrent_insecure_cert"`
|
||||||
|
RtorrentUsername string `json:"rtorrent_username"`
|
||||||
|
RtorrentPassword string `json:"rtorrent_password"`
|
||||||
|
SSHUsername string `json:"ssh_username"`
|
||||||
|
SSHPassword string `json:"ssh_password"`
|
||||||
|
SSHKeyPath string `json:"ssh_privkey_path"`
|
||||||
|
SSHAddr string `json:"ssh_addr"`
|
||||||
|
SSHTimeout time.Duration `json:"ssh_connect_timeout"`
|
||||||
|
DownloadFileMode os.FileMode `json:"file_download_filemode"`
|
||||||
|
WatchDownloadPaths map[string]string `json:"watch_to_download_paths"`
|
||||||
|
TempDownloadPath string `json:"temp_download_path"`
|
||||||
|
FinishedTorrentFilePath map[string]string `json:"watch_to_finish_path"`
|
||||||
|
TorrentListUpdateInterval time.Duration `json:"rtorrent_update_interval"`
|
||||||
|
ConcurrentDownloads uint `json:"download_jobs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Queue watches the given folders for new .torrent files,
|
||||||
|
// uploads them to the given rTorrent server,
|
||||||
|
// and then downloads them over SSH to the given download path.
|
||||||
|
type Queue struct {
|
||||||
|
rtClient *rtorrent.RTorrent
|
||||||
|
sftpClient *easysftp.Client
|
||||||
|
fsWatcher *fsnotify.Watcher
|
||||||
|
config *Config
|
||||||
|
torrentList map[string]rtorrent.Torrent
|
||||||
|
torrentListUpdate time.Time
|
||||||
|
downloadQueue map[string]string
|
||||||
|
logger *log.Logger
|
||||||
|
lock sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewQueue establishes all connections and watchers
|
||||||
|
func NewQueue(config *Config, logger *log.Logger) (*Queue, error) {
|
||||||
|
if config.WatchDownloadPaths == nil || len(config.WatchDownloadPaths) == 0 {
|
||||||
|
return nil, errors.New("Must have queue.QueueConfig.WatchDownloadPaths set")
|
||||||
|
}
|
||||||
|
|
||||||
|
for watchPath, downloadPath := range config.WatchDownloadPaths {
|
||||||
|
config.WatchDownloadPaths[filepath.Clean(watchPath)] = filepath.Clean(downloadPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.ConcurrentDownloads == 0 {
|
||||||
|
config.ConcurrentDownloads = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
if logger == nil {
|
||||||
|
logger = log.New(ioutil.Discard, "", 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
fsWatcher, err := fsnotify.NewWatcher()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for watchPath := range config.WatchDownloadPaths {
|
||||||
|
err = fsWatcher.Add(watchPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rtClient := rtorrent.New(config.RtorrentAddr, config.RtorrentInsecureCert)
|
||||||
|
rtClient.SetAuth(config.RtorrentUsername, config.RtorrentPassword)
|
||||||
|
|
||||||
|
sftpClient, err := easysftp.Connect(&easysftp.ClientConfig{
|
||||||
|
Username: config.SSHUsername,
|
||||||
|
Password: config.SSHPassword,
|
||||||
|
KeyPath: config.SSHKeyPath,
|
||||||
|
Host: config.SSHAddr,
|
||||||
|
Timeout: config.SSHTimeout,
|
||||||
|
FileMode: config.DownloadFileMode,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Queue{
|
||||||
|
rtClient: rtClient,
|
||||||
|
sftpClient: sftpClient,
|
||||||
|
fsWatcher: fsWatcher,
|
||||||
|
config: config,
|
||||||
|
downloadQueue: make(map[string]string),
|
||||||
|
logger: logger,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close all of the connections and watchers
|
||||||
|
func (q *Queue) Close() []error {
|
||||||
|
errs := []error{}
|
||||||
|
err := q.sftpClient.Close()
|
||||||
|
if err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = q.fsWatcher.Close()
|
||||||
|
if err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return errs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queue) updateTorrentList() error {
|
||||||
|
// lock for torrentList
|
||||||
|
q.lock.Lock()
|
||||||
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
|
torrents, err := q.rtClient.GetTorrents(rtorrent.ViewMain)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
torrentList := make(map[string]rtorrent.Torrent)
|
||||||
|
|
||||||
|
for _, torrent := range torrents {
|
||||||
|
torrentList[torrent.Hash] = torrent
|
||||||
|
}
|
||||||
|
|
||||||
|
q.torrentList = torrentList
|
||||||
|
q.torrentListUpdate = time.Now()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queue) addTorrentFilePath(path string) error {
|
||||||
|
// lock for downloadQueue
|
||||||
|
q.lock.Lock()
|
||||||
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
|
torrentData, err := ioutil.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
torrentHash, err := metainfo.GetTorrentHashHexString(bytes.NewReader(torrentData))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
q.downloadQueue[torrentHash] = path
|
||||||
|
|
||||||
|
return q.rtClient.AddTorrent(torrentData)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queue) getFinishedTorrents() []rtorrent.Torrent {
|
||||||
|
// lock for torrentList
|
||||||
|
q.lock.RLock()
|
||||||
|
defer q.lock.RUnlock()
|
||||||
|
|
||||||
|
torrents := []rtorrent.Torrent{}
|
||||||
|
for hash, torrentPath := range q.downloadQueue {
|
||||||
|
torrent, exists := q.torrentList[hash]
|
||||||
|
if !exists {
|
||||||
|
err := q.addTorrentFilePath(torrentPath)
|
||||||
|
if err != nil {
|
||||||
|
q.logger.Printf("Unable to add torrent '%s' error '%s'", torrentPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !torrent.Completed {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
torrents = append(torrents, torrent)
|
||||||
|
}
|
||||||
|
|
||||||
|
return torrents
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queue) downloadTorrents(torrents []rtorrent.Torrent) {
|
||||||
|
// lock for downloadQueue
|
||||||
|
q.lock.RLock()
|
||||||
|
defer q.lock.RUnlock()
|
||||||
|
|
||||||
|
done := make(chan bool, q.config.ConcurrentDownloads)
|
||||||
|
var running uint
|
||||||
|
for _, torrent := range torrents {
|
||||||
|
if !torrent.Completed {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
torrent.Hash = strings.ToUpper(torrent.Hash)
|
||||||
|
|
||||||
|
torrentFilePath, exists := q.downloadQueue[torrent.Hash]
|
||||||
|
if !exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if running >= q.config.ConcurrentDownloads {
|
||||||
|
<-done
|
||||||
|
running--
|
||||||
|
}
|
||||||
|
|
||||||
|
var downloadPath string
|
||||||
|
destDownloadPath := q.config.WatchDownloadPaths[filepath.Dir(torrentFilePath)]
|
||||||
|
|
||||||
|
if q.config.TempDownloadPath == "" {
|
||||||
|
downloadPath = destDownloadPath
|
||||||
|
} else {
|
||||||
|
downloadPath = filepath.Join(q.config.TempDownloadPath, destDownloadPath)
|
||||||
|
|
||||||
|
if info, err := os.Stat(downloadPath); os.IsExist(err) {
|
||||||
|
if !info.IsDir() {
|
||||||
|
q.logger.Printf("Unable to downlaod to temp path '%s' since it is not a directory", downloadPath)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := os.MkdirAll(downloadPath, q.config.DownloadFileMode)
|
||||||
|
if err != nil {
|
||||||
|
q.logger.Printf("Unable to create temp download path '%s' error '%s'", downloadPath, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
go func(torrentFilePath string, downloadPath string, torrent rtorrent.Torrent) {
|
||||||
|
err := q.sftpClient.Mirror(torrent.Path, downloadPath)
|
||||||
|
if err != nil {
|
||||||
|
q.logger.Printf("Failed to download '%s' to '%s' error '%s'", torrent.Path, downloadPath, err)
|
||||||
|
done <- false
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// we need to move the downlaod from the temp directory to the destination
|
||||||
|
if downloadPath != destDownloadPath {
|
||||||
|
fileName := filepath.Base(torrent.Path)
|
||||||
|
downFullPath := filepath.Join(downloadPath, fileName)
|
||||||
|
destFullPath := filepath.Join(destDownloadPath, fileName)
|
||||||
|
err = os.Rename(downFullPath, destFullPath)
|
||||||
|
if err != nil {
|
||||||
|
q.logger.Printf("Failed to move temp path '%s' to dest path '%s' error '%s'", downFullPath, destFullPath, err)
|
||||||
|
done <- false
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
parentTorrentPath := filepath.Dir(torrentFilePath)
|
||||||
|
if movePath, exists := q.config.FinishedTorrentFilePath[parentTorrentPath]; exists {
|
||||||
|
destFullPath := filepath.Join(movePath, filepath.Base(torrentFilePath))
|
||||||
|
err = os.Rename(torrentFilePath, destFullPath)
|
||||||
|
if err != nil {
|
||||||
|
q.logger.Printf("Failed to move torrent file from '%s' to '%s' error '%s'", torrentFilePath, destFullPath, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = os.Remove(torrentFilePath)
|
||||||
|
if err != nil {
|
||||||
|
q.logger.Printf("Failed to remove torrent file '%s' error '%s'", torrentFilePath, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
q.lock.RUnlock()
|
||||||
|
q.lock.Lock()
|
||||||
|
delete(q.downloadQueue, torrent.Hash)
|
||||||
|
q.lock.Unlock()
|
||||||
|
q.lock.RLock()
|
||||||
|
done <- true
|
||||||
|
}(torrentFilePath, downloadPath, torrent)
|
||||||
|
running++
|
||||||
|
}
|
||||||
|
|
||||||
|
for running > 0 {
|
||||||
|
<-done
|
||||||
|
running--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run executes all steps needed for looking at the queue, catching updates, and processing all work
|
||||||
|
func (q *Queue) Run(stop <-chan bool) {
|
||||||
|
// for the initial run, load all files in the specified directories
|
||||||
|
for localPath := range q.config.WatchDownloadPaths {
|
||||||
|
files, err := ioutil.ReadDir(localPath)
|
||||||
|
if err != nil {
|
||||||
|
q.logger.Printf("Unable to read local path '%s' error '%s'", localPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, file := range files {
|
||||||
|
if file.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fullPath := filepath.Join(localPath, file.Name())
|
||||||
|
|
||||||
|
err = q.addTorrentFilePath(fullPath)
|
||||||
|
if err != nil {
|
||||||
|
q.logger.Printf("Failed to add torrent at '%s' error '%s'", fullPath, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
err := q.updateTorrentList()
|
||||||
|
if err != nil {
|
||||||
|
q.logger.Printf("Failed to update torrent list from rTorrent: '%s'", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if q.config.TorrentListUpdateInterval == 0 {
|
||||||
|
time.Sleep(time.Minute)
|
||||||
|
} else {
|
||||||
|
time.Sleep(q.config.TorrentListUpdateInterval)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// watch all directories for file changes
|
||||||
|
var err error
|
||||||
|
finished := false
|
||||||
|
for !finished {
|
||||||
|
select {
|
||||||
|
case event := <-q.fsWatcher.Events:
|
||||||
|
if event.Op&fsnotify.Write == fsnotify.Write ||
|
||||||
|
event.Op&fsnotify.Create == fsnotify.Create ||
|
||||||
|
event.Op&fsnotify.Rename == fsnotify.Rename {
|
||||||
|
// skip files that do not end with .torrent
|
||||||
|
if !strings.HasSuffix(event.Name, ".torrent") {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
q.logger.Printf("Adding %s to download queue", event.Name)
|
||||||
|
err = q.addTorrentFilePath(event.Name)
|
||||||
|
if err != nil {
|
||||||
|
q.logger.Printf("Failed to add '%s' error '%s'", event.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case err = <-q.fsWatcher.Errors:
|
||||||
|
q.logger.Printf("Error while watching folders '%s'", err)
|
||||||
|
|
||||||
|
case <-stop:
|
||||||
|
finished = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue