diff --git a/.gitmodules b/.gitmodules index 352de19..e69de29 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +0,0 @@ -[submodule "metainfo/vendor/github.com/anacrolix/torrent"] - path = metainfo/vendor/github.com/anacrolix/torrent - url = ssh://git@github.com/anacrolix/torrent.git diff --git a/README.md b/README.md index eb3200d..e0981dd 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,69 @@ -Hoarder -======== +# Hoarder Uploads .torrent files from a local "blackhole" to a remote (SSH) rtorrent watch folder. From there, rtorrent is polled over XMLRPC as to whether the torrent is completed. Finally, the files are downloaded over a multithreaded SSH connection and saved to the local machine. The blackhole is used as a queue and will have its .torrent files deleted. -Requirements ------------- -* bash >= 4.0 (support for associative arrays) -* python2 -* curl -* rsync -* scp +# Installation +## Manual +1. Install [Go](https://golang.org) for your Operating System +2. Run `$ go get -u github.com/tblyler/hoarder/cmd` +3. If your `GOPATH` is in your `PATH`, run `$ hoarder -config $PATH_TO_HOARDER_CONF` -Configuration -------------- -Edit the variables at the top of hoarder.sh to your liking. +# Configuration +## Example +Ignore the keys that start with an underscore, they are comments. +```json +{ + "_rtorrent_addr": "The address to the rtorrent XMLRPC endpoint", + "rtorrent_addr": "mycoolrtorrentserver.com/XMLRPC", -Running -------- -Run hoarder.sh with bash + "_rtorrent_insecure_cert": "true to ignore the certificate authenticity; false to honor it", + "rtorrent_insecure_cert": false, + + "_torrent_username": "The HTTP Basic auth username to use for rtorrent's XMLRPC", + "rtorrent_username": "JohnDoe", + + "_rtorrent_password": "The HTTP Basic auth password to use for rtorrent's XMLRPC", + "rtorrent_password": "correct horse battery staple", + + "_ssh_username": "The ssh username to use for getting finished torrents from the remote host", + "ssh_username": "JohnDoe" + + "_SSH_AUTH_COMMENT": "You may choose to use an ssh key or ssh password. If both are supplied, the password will not be used.", + + "_ssh_password": "The SSH password to use for SSH authentication", + "ssh_password": "correct horse battery staple SSH", + + "_ssh_privkey_path": "The path to the private SSH key for SSH authentication", + "ssh_privkey_path": "/home/tblyler/.ssh/id_rsa", + + "_ssh_addr": "The SSH address to connect to", + "ssh_addr": "mysshserver.com:22", + + "_ssh_connect_timeout": "The time in nano seconds to wait for an SSH connection attempt", + "ssh_connect_timeout": 30000000000, + + "_file_download_filemode": "The base 10 file mode to use for downloaded files", + "file_download_filemode": 511, + + "_watch_to_download_paths": "The correlation of .torrent file paths and where their contents should be downloaded to", + "watch_to_download_paths": { + "/home/tblyler/torrent_files/tv": "/home/tblyler/Downloads/tv", + "/home/tblyler/torrent_files/movies": "/home/tblyler/Downloads/movies", + "/home/tblyler/torrent_files": "/home/tblyler/Downloads" + }, + + "_temp_download_path": "The root path to temporarily download to and then move to the folder in the setting above. The destination path is created underneath the temp_download_path", + "temp_download_path": "/home/tblyler/tempDownloads", + + "_watch_to_finish_path": "If defined, the finished .torrent files finished are moved to their respected path here. Otherwise, they are deleted.", + "watch_to_finish_path": { + "/home/tblyler/torrent_files/tv": "/home/tblyler/completed_torrent_files/tv", + "/home/tblyler/torrent_files": "/home/tblyler/completed_torrent_files" + }, + + "_rtorrent_update_interval": "The time in nano seconds to update the list of torrents and their statuses in rTorrent", + "rtorrent_update_interval": 300000000000, + + "_download_jobs": "The number of concurrent download streams to have at one time", + "download_jobs": 2 +} +``` diff --git a/cmd/hoarder/main.go b/cmd/hoarder/main.go new file mode 100644 index 0000000..f2ce312 --- /dev/null +++ b/cmd/hoarder/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "encoding/json" + "flag" + "github.com/tblyler/hoarder/queue" + "io/ioutil" + "log" + "os" + "os/signal" +) + +func main() { + configPath := flag.String("config", "", "path to the config file") + flag.Parse() + + logger := log.New(os.Stdout, "hoarder", log.LstdFlags) + + if *configPath == "" { + logger.Println("Missing config path") + os.Exit(1) + } + + configRaw, err := ioutil.ReadFile(*configPath) + if err != nil { + logger.Printf("Failed to read config file '%s': '%s'", *configPath, err) + os.Exit(1) + } + + config := &queue.Config{} + err = json.Unmarshal(configRaw, config) + if err != nil { + logger.Printf("Unable to decode config json at '%s': '%s'", *configPath, err) + os.Exit(1) + } + + q, err := queue.NewQueue(config, logger) + if err != nil { + logger.Printf("Failed to start hoarder: '%s'", err) + os.Exit(1) + } + + stop := make(chan bool) + done := make(chan bool) + go func() { + q.Run(stop) + done <- true + }() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt) + + sig := <-sigChan + logger.Println("Got signal ", sig, " quitting") + stop <- true + <-done +} diff --git a/lib/easysftp/easysftp.go b/lib/easysftp/easysftp.go deleted file mode 100644 index 19f7045..0000000 --- a/lib/easysftp/easysftp.go +++ /dev/null @@ -1,239 +0,0 @@ -package easysftp - -import ( - "errors" - "github.com/pkg/sftp" - "golang.org/x/crypto/ssh" - "io" - "io/ioutil" - "os" - "path/filepath" - "time" -) - -// ClientConfig maintains all of the configuration info to connect to a SSH host -type ClientConfig struct { - Username string - Host string - KeyPath string - Password string - Timeout time.Duration - FileMode os.FileMode -} - -// Client communicates with the SFTP to download files/pathes -type Client struct { - sshClient *ssh.Client - config *ClientConfig -} - -// Connect to a host with this given config -func Connect(config *ClientConfig) (*Client, error) { - var auth []ssh.AuthMethod - if config.KeyPath != "" { - privKey, err := ioutil.ReadFile(config.KeyPath) - if err != nil { - return nil, err - } - signer, err := ssh.ParsePrivateKey(privKey) - if err != nil { - return nil, err - } - - auth = append(auth, ssh.PublicKeys(signer)) - } - - if len(auth) == 0 { - if config.Password == "" { - return nil, errors.New("Missing password or key for SSH authentication") - } - - auth = append(auth, ssh.Password(config.Password)) - } - - sshClient, err := ssh.Dial("tcp", config.Host, &ssh.ClientConfig{ - User: config.Username, - Auth: auth, - Timeout: config.Timeout, - }) - if err != nil { - return nil, err - } - - return &Client{ - sshClient: sshClient, - config: config, - }, nil -} - -// Close the underlying SSH conection -func (c *Client) Close() error { - return c.sshClient.Close() -} - -func (c *Client) newSftpClient() (*sftp.Client, error) { - return sftp.NewClient(c.sshClient) -} - -// Stat gets information for the given path -func (c *Client) Stat(path string) (os.FileInfo, error) { - sftpClient, err := c.newSftpClient() - if err != nil { - return nil, err - } - - defer sftpClient.Close() - - return sftpClient.Stat(path) -} - -// Lstat gets information for the given path, if it is a symbolic link, it will describe the symbolic link -func (c *Client) Lstat(path string) (os.FileInfo, error) { - sftpClient, err := c.newSftpClient() - if err != nil { - return nil, err - } - - defer sftpClient.Close() - - return sftpClient.Lstat(path) -} - -// Download a file from the given path to the output writer -func (c *Client) Download(path string, output io.Writer) error { - sftpClient, err := c.newSftpClient() - if err != nil { - return err - } - - defer sftpClient.Close() - - info, err := sftpClient.Stat(path) - if err != nil { - return err - } - - if info.IsDir() { - return errors.New("Unable to use easysftp.Client.Download for dir: " + path) - } - - remote, err := sftpClient.Open(path) - if err != nil { - return err - } - - defer remote.Close() - - _, err = io.Copy(output, remote) - return err -} - -// Mirror downloads an entire folder (recursively) or file underneath the given localParentPath -func (c *Client) Mirror(path string, localParentPath string) error { - sftpClient, err := c.newSftpClient() - if err != nil { - return err - } - - defer sftpClient.Close() - - info, err := sftpClient.Stat(path) - if err != nil { - return err - } - - // download the file - if !info.IsDir() { - sftpClient.Close() - localPath := filepath.Join(localParentPath, info.Name()) - localInfo, err := os.Stat(localPath) - if os.IsExist(err) && localInfo.IsDir() { - err = os.RemoveAll(localPath) - if err != nil { - return err - } - } - - file, err := os.OpenFile( - localPath, - os.O_RDWR|os.O_CREATE|os.O_TRUNC, - c.config.FileMode, - ) - if err != nil { - return err - } - - defer file.Close() - - return c.Download(path, file) - } - - // download the whole directory recursively - walker := sftpClient.Walk(path) - remoteParentPath := filepath.Dir(path) - for walker.Step() { - if err := walker.Err(); err != nil { - return err - } - - info := walker.Stat() - - relPath, err := filepath.Rel(remoteParentPath, walker.Path()) - if err != nil { - return err - } - - localPath := filepath.Join(localParentPath, relPath) - - // if we have something at the download path delete it if it is a directory - // and the remote is a file and vice a versa - localInfo, err := os.Stat(localPath) - if os.IsExist(err) { - if localInfo.IsDir() { - if info.IsDir() { - continue - } - - err = os.RemoveAll(localPath) - if err != nil { - return err - } - } else if info.IsDir() { - err = os.Remove(localPath) - if err != nil { - return err - } - } - } - - if info.IsDir() { - err = os.MkdirAll(localPath, c.config.FileMode) - if err != nil { - return err - } - - continue - } - - remoteFile, err := sftpClient.Open(walker.Path()) - if err != nil { - return err - } - - localFile, err := os.OpenFile(localPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, c.config.FileMode) - if err != nil { - remoteFile.Close() - return err - } - - _, err = io.Copy(localFile, remoteFile) - remoteFile.Close() - localFile.Close() - - if err != nil { - return err - } - } - - return nil -} diff --git a/metainfo/vendor/github.com/anacrolix/torrent b/metainfo/vendor/github.com/anacrolix/torrent deleted file mode 160000 index dcfee93..0000000 --- a/metainfo/vendor/github.com/anacrolix/torrent +++ /dev/null @@ -1 +0,0 @@ -Subproject commit dcfee93f96d231b5590f991313b5d9f925757f52 diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 0000000..08f5b1e --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,354 @@ +package queue + +import ( + "bytes" + "errors" + "github.com/fsnotify/fsnotify" + "github.com/tblyler/easysftp" + "github.com/tblyler/go-rtorrent/rtorrent" + "github.com/tblyler/hoarder/metainfo" + "io/ioutil" + "log" + "os" + "path/filepath" + "strings" + "sync" + "time" +) + +// Config defines the settings for watching, uploading, and downloading +type Config struct { + RtorrentAddr string `json:"rtorrent_addr"` + RtorrentInsecureCert bool `json:"rtorrent_insecure_cert"` + RtorrentUsername string `json:"rtorrent_username"` + RtorrentPassword string `json:"rtorrent_password"` + SSHUsername string `json:"ssh_username"` + SSHPassword string `json:"ssh_password"` + SSHKeyPath string `json:"ssh_privkey_path"` + SSHAddr string `json:"ssh_addr"` + SSHTimeout time.Duration `json:"ssh_connect_timeout"` + DownloadFileMode os.FileMode `json:"file_download_filemode"` + WatchDownloadPaths map[string]string `json:"watch_to_download_paths"` + TempDownloadPath string `json:"temp_download_path"` + FinishedTorrentFilePath map[string]string `json:"watch_to_finish_path"` + TorrentListUpdateInterval time.Duration `json:"rtorrent_update_interval"` + ConcurrentDownloads uint `json:"download_jobs"` +} + +// Queue watches the given folders for new .torrent files, +// uploads them to the given rTorrent server, +// and then downloads them over SSH to the given download path. +type Queue struct { + rtClient *rtorrent.RTorrent + sftpClient *easysftp.Client + fsWatcher *fsnotify.Watcher + config *Config + torrentList map[string]rtorrent.Torrent + torrentListUpdate time.Time + downloadQueue map[string]string + logger *log.Logger + lock sync.RWMutex +} + +// NewQueue establishes all connections and watchers +func NewQueue(config *Config, logger *log.Logger) (*Queue, error) { + if config.WatchDownloadPaths == nil || len(config.WatchDownloadPaths) == 0 { + return nil, errors.New("Must have queue.QueueConfig.WatchDownloadPaths set") + } + + for watchPath, downloadPath := range config.WatchDownloadPaths { + config.WatchDownloadPaths[filepath.Clean(watchPath)] = filepath.Clean(downloadPath) + } + + if config.ConcurrentDownloads == 0 { + config.ConcurrentDownloads = 1 + } + + if logger == nil { + logger = log.New(ioutil.Discard, "", 0) + } + + fsWatcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + for watchPath := range config.WatchDownloadPaths { + err = fsWatcher.Add(watchPath) + if err != nil { + return nil, err + } + } + + rtClient := rtorrent.New(config.RtorrentAddr, config.RtorrentInsecureCert) + rtClient.SetAuth(config.RtorrentUsername, config.RtorrentPassword) + + sftpClient, err := easysftp.Connect(&easysftp.ClientConfig{ + Username: config.SSHUsername, + Password: config.SSHPassword, + KeyPath: config.SSHKeyPath, + Host: config.SSHAddr, + Timeout: config.SSHTimeout, + FileMode: config.DownloadFileMode, + }) + if err != nil { + return nil, err + } + + return &Queue{ + rtClient: rtClient, + sftpClient: sftpClient, + fsWatcher: fsWatcher, + config: config, + downloadQueue: make(map[string]string), + logger: logger, + }, nil +} + +// Close all of the connections and watchers +func (q *Queue) Close() []error { + errs := []error{} + err := q.sftpClient.Close() + if err != nil { + errs = append(errs, err) + } + + err = q.fsWatcher.Close() + if err != nil { + errs = append(errs, err) + } + + return errs +} + +func (q *Queue) updateTorrentList() error { + // lock for torrentList + q.lock.Lock() + defer q.lock.Unlock() + + torrents, err := q.rtClient.GetTorrents(rtorrent.ViewMain) + if err != nil { + return err + } + + torrentList := make(map[string]rtorrent.Torrent) + + for _, torrent := range torrents { + torrentList[torrent.Hash] = torrent + } + + q.torrentList = torrentList + q.torrentListUpdate = time.Now() + + return nil +} + +func (q *Queue) addTorrentFilePath(path string) error { + // lock for downloadQueue + q.lock.Lock() + defer q.lock.Unlock() + + torrentData, err := ioutil.ReadFile(path) + if err != nil { + return err + } + + torrentHash, err := metainfo.GetTorrentHashHexString(bytes.NewReader(torrentData)) + if err != nil { + return err + } + + q.downloadQueue[torrentHash] = path + + return q.rtClient.AddTorrent(torrentData) +} + +func (q *Queue) getFinishedTorrents() []rtorrent.Torrent { + // lock for torrentList + q.lock.RLock() + defer q.lock.RUnlock() + + torrents := []rtorrent.Torrent{} + for hash, torrentPath := range q.downloadQueue { + torrent, exists := q.torrentList[hash] + if !exists { + err := q.addTorrentFilePath(torrentPath) + if err != nil { + q.logger.Printf("Unable to add torrent '%s' error '%s'", torrentPath, err) + } + + continue + } + + if !torrent.Completed { + continue + } + + torrents = append(torrents, torrent) + } + + return torrents +} + +func (q *Queue) downloadTorrents(torrents []rtorrent.Torrent) { + // lock for downloadQueue + q.lock.RLock() + defer q.lock.RUnlock() + + done := make(chan bool, q.config.ConcurrentDownloads) + var running uint + for _, torrent := range torrents { + if !torrent.Completed { + continue + } + + torrent.Hash = strings.ToUpper(torrent.Hash) + + torrentFilePath, exists := q.downloadQueue[torrent.Hash] + if !exists { + continue + } + + if running >= q.config.ConcurrentDownloads { + <-done + running-- + } + + var downloadPath string + destDownloadPath := q.config.WatchDownloadPaths[filepath.Dir(torrentFilePath)] + + if q.config.TempDownloadPath == "" { + downloadPath = destDownloadPath + } else { + downloadPath = filepath.Join(q.config.TempDownloadPath, destDownloadPath) + + if info, err := os.Stat(downloadPath); os.IsExist(err) { + if !info.IsDir() { + q.logger.Printf("Unable to downlaod to temp path '%s' since it is not a directory", downloadPath) + continue + } + } + + err := os.MkdirAll(downloadPath, q.config.DownloadFileMode) + if err != nil { + q.logger.Printf("Unable to create temp download path '%s' error '%s'", downloadPath, err) + continue + } + } + + go func(torrentFilePath string, downloadPath string, torrent rtorrent.Torrent) { + err := q.sftpClient.Mirror(torrent.Path, downloadPath) + if err != nil { + q.logger.Printf("Failed to download '%s' to '%s' error '%s'", torrent.Path, downloadPath, err) + done <- false + return + } + + // we need to move the downlaod from the temp directory to the destination + if downloadPath != destDownloadPath { + fileName := filepath.Base(torrent.Path) + downFullPath := filepath.Join(downloadPath, fileName) + destFullPath := filepath.Join(destDownloadPath, fileName) + err = os.Rename(downFullPath, destFullPath) + if err != nil { + q.logger.Printf("Failed to move temp path '%s' to dest path '%s' error '%s'", downFullPath, destFullPath, err) + done <- false + return + } + } + + parentTorrentPath := filepath.Dir(torrentFilePath) + if movePath, exists := q.config.FinishedTorrentFilePath[parentTorrentPath]; exists { + destFullPath := filepath.Join(movePath, filepath.Base(torrentFilePath)) + err = os.Rename(torrentFilePath, destFullPath) + if err != nil { + q.logger.Printf("Failed to move torrent file from '%s' to '%s' error '%s'", torrentFilePath, destFullPath, err) + } + } else { + err = os.Remove(torrentFilePath) + if err != nil { + q.logger.Printf("Failed to remove torrent file '%s' error '%s'", torrentFilePath, err) + } + } + + q.lock.RUnlock() + q.lock.Lock() + delete(q.downloadQueue, torrent.Hash) + q.lock.Unlock() + q.lock.RLock() + done <- true + }(torrentFilePath, downloadPath, torrent) + running++ + } + + for running > 0 { + <-done + running-- + } +} + +// Run executes all steps needed for looking at the queue, catching updates, and processing all work +func (q *Queue) Run(stop <-chan bool) { + // for the initial run, load all files in the specified directories + for localPath := range q.config.WatchDownloadPaths { + files, err := ioutil.ReadDir(localPath) + if err != nil { + q.logger.Printf("Unable to read local path '%s' error '%s'", localPath, err) + } + + for _, file := range files { + if file.IsDir() { + continue + } + + fullPath := filepath.Join(localPath, file.Name()) + + err = q.addTorrentFilePath(fullPath) + if err != nil { + q.logger.Printf("Failed to add torrent at '%s' error '%s'", fullPath, err) + } + } + } + + go func() { + err := q.updateTorrentList() + if err != nil { + q.logger.Printf("Failed to update torrent list from rTorrent: '%s'", err) + } + + if q.config.TorrentListUpdateInterval == 0 { + time.Sleep(time.Minute) + } else { + time.Sleep(q.config.TorrentListUpdateInterval) + } + }() + + // watch all directories for file changes + var err error + finished := false + for !finished { + select { + case event := <-q.fsWatcher.Events: + if event.Op&fsnotify.Write == fsnotify.Write || + event.Op&fsnotify.Create == fsnotify.Create || + event.Op&fsnotify.Rename == fsnotify.Rename { + // skip files that do not end with .torrent + if !strings.HasSuffix(event.Name, ".torrent") { + break + } + + q.logger.Printf("Adding %s to download queue", event.Name) + err = q.addTorrentFilePath(event.Name) + if err != nil { + q.logger.Printf("Failed to add '%s' error '%s'", event.Name, err) + } + } + + case err = <-q.fsWatcher.Errors: + q.logger.Printf("Error while watching folders '%s'", err) + + case <-stop: + finished = true + } + } +}