From e958dcb451b1023a9c9dba04a0cf9a459ae1136f Mon Sep 17 00:00:00 2001 From: Michael Rosenberg <42micro@gmail.com> Date: Tue, 28 Jun 2016 20:36:54 -0400 Subject: [PATCH 1/4] New command line option -getStatus to view formatted download status; config now requires RPC socket path; queue.Close() now runs after interrupt is sent --- README.md | 3 + cmd/hoarder/main.go | 30 +++++++++ queue/queue.go | 157 ++++++++++++++++++++++++++++++++++++++++++-- queue/rpc.go | 38 +++++++++++ 4 files changed, 222 insertions(+), 6 deletions(-) create mode 100644 queue/rpc.go diff --git a/README.md b/README.md index d4a4718..15ce559 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,9 @@ download_jobs: 2 # Whether or not to attempt to resume a previously interrupted download resume_downloads: true +# Path to the unix socket file that hoarder uses for RPC +rpc_socket_path: /tmp/hoarder.sock + rtorrent: # The address to the rtorrent XMLRPC endpoint addr: https://mycoolrtorrentserver.com/XMLRPC diff --git a/cmd/hoarder/main.go b/cmd/hoarder/main.go index e4b1f56..348a181 100644 --- a/cmd/hoarder/main.go +++ b/cmd/hoarder/main.go @@ -7,6 +7,7 @@ import ( "github.com/tblyler/hoarder/queue" "io/ioutil" "log" + "net/rpc" "os" "os/signal" "strconv" @@ -19,6 +20,7 @@ var buildDate = "Unknown" func main() { version := flag.Bool("version", false, "display version info") configPath := flag.String("config", "", "path to the config file") + getStatus := flag.Bool("getStatus", false, "get the status of the current downloads") flag.Parse() if *version { @@ -54,6 +56,29 @@ func main() { os.Exit(1) } + if *getStatus { + rpc, err := rpc.Dial("unix", config.RPCSocketPath) + if err != nil { + logger.Printf("Unable to open RPC socket file '%s': '%s'", config.RPCSocketPath, err) + os.Exit(1) + } + defer rpc.Close() + + reply := "" + err = rpc.Call("Status.Downloads", &queue.RPCArgs{}, &reply) + if err != nil { + logger.Printf("RPC call for download status failed: '%s'", err) + os.Exit(1) + } + + if len(reply) > 0 { + fmt.Println(reply) + } else { + fmt.Println("No Downloads") + } + os.Exit(0) + } + q, err := queue.NewQueue(config, logger) if err != nil { logger.Printf("Failed to start hoarder: '%s'", err) @@ -74,4 +99,9 @@ func main() { logger.Println("Got signal ", sig, " quitting") stop <- true <-done + + errs := q.Close() + for _, err := range errs { + logger.Println(err) + } } diff --git a/queue/queue.go b/queue/queue.go index 1ea037d..24b4174 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -10,8 +10,11 @@ import ( "github.com/tblyler/hoarder/metainfo" "io/ioutil" "log" + "net" + "net/rpc" "os" "path/filepath" + "sort" "strconv" "strings" "time" @@ -38,6 +41,7 @@ type Config struct { WatchDownloadPaths map[string]string `json:"watch_to_download_paths" yaml:"watch_to_download_paths,flow"` TempDownloadPath string `json:"temp_download_path" yaml:"temp_download_path"` FinishedTorrentFilePath map[string]string `json:"watch_to_finish_path" yaml:"watch_to_finish_path,flow"` + RPCSocketPath string `json:"rpc_socket_path" yaml:"rpc_socket_path"` TorrentListUpdateInterval time.Duration `json:"rtorrent_update_interval" yaml:"rtorrent_update_interval"` ConcurrentDownloads uint `json:"download_jobs" yaml:"download_jobs"` ResumeDownloads bool `json:"resume_downloads" yaml:"resume_downloads"` @@ -54,6 +58,13 @@ type Queue struct { torrentList map[string]rtorrent.Torrent downloadQueue map[string]string logger *log.Logger + rpcSocket net.Listener + rpcQueue chan RPCReq +} + +type downloadInfo struct { + path string + size int } var prettyBytesValues = []float64{ @@ -98,6 +109,20 @@ func NewQueue(config *Config, logger *log.Logger) (*Queue, error) { return nil, errors.New("Must have queue.QueueConfig.WatchDownloadPaths set") } + if len(config.RPCSocketPath) == 0 { + return nil, errors.New("Must have queue.QueueConfig.RPCSocketPath set") + } + + // Set up RPC + rpcQueue := make(chan RPCReq) + status := Status{rpcQueue} + rpc.Register(&status) + rpcSocket, err := net.Listen("unix", config.RPCSocketPath) + if err != nil { + return nil, err + } + go rpc.Accept(rpcSocket) + for watchPath, downloadPath := range config.WatchDownloadPaths { config.WatchDownloadPaths[filepath.Clean(watchPath)] = filepath.Clean(downloadPath) } @@ -132,6 +157,8 @@ func NewQueue(config *Config, logger *log.Logger) (*Queue, error) { config: config, downloadQueue: make(map[string]string), logger: logger, + rpcSocket: rpcSocket, + rpcQueue: rpcQueue, } sftpClient, err := q.newSftpClient() @@ -159,12 +186,20 @@ func (q *Queue) newSftpClient() (*easysftp.Client, error) { // Close all of the connections and watchers func (q *Queue) Close() []error { errs := []error{} - err := q.sftpClient.Close() + + if q.sftpClient != nil { + err := q.sftpClient.Close() + if err != nil { + errs = append(errs, err) + } + } + + err := q.fsWatcher.Close() if err != nil { errs = append(errs, err) } - err = q.fsWatcher.Close() + err = q.rpcSocket.Close() if err != nil { errs = append(errs, err) } @@ -215,6 +250,101 @@ func (q *Queue) addTorrentFilePath(path string) error { return q.updateTorrentList() } +func dirSize(path string) (int64, error) { + var size int64 + err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { + if !info.IsDir() { + size += info.Size() + } + return err + }) + return size, err +} + +/* Output looks like +Errored.Download.mkv | | (error: could not stat file) +Totally.Legit.Download.x264-KILLERS |===============> | (50%) +ubuntu.13.37.iso |===> | ( 7%) +*/ +func (q *Queue) getDownloadStatus(downloadsRunning map[string]downloadInfo) string { + // We use maps so that we can traverse in name order + paths := make(map[string]string, len(downloadsRunning)) + names := make([]string, 0, len(downloadsRunning)) + sizes := make(map[string]int, len(downloadsRunning)) + for _, info := range downloadsRunning { + name := filepath.Base(info.path) + names = append(names, name) + paths[name] = info.path + sizes[name] = info.size + } + // Sort the torrent names so that they don't jump around every time this function is called + sort.Strings(names) + + maxNameLen := 0 + for _, name := range names { + if len(name) > maxNameLen { + maxNameLen = len(name) + } + } + + output := "" + downloadBarLength := 30 + + for nameIdx, name := range names { + size := sizes[name] + path := paths[name] + + // Get the size of the data that we've downloaded so far + var bytesDownloaded int64 + stat, err := os.Stat(path) + if err == nil { + if stat.IsDir() { + bytesDownloaded, err = dirSize(path) + } else { + bytesDownloaded = stat.Size() + } + } + + // Make the names and pad them with spaces on the right + output += name + for i := 0; i < maxNameLen-len(name); i++ { + output += " " + } + output += " |" + + // Make the download bar + if err == nil { + // Make the bar proportional to the amount downloaded vs the total size, and make the + // final character a '>' + percentDone := float64(bytesDownloaded) / float64(size) + partialBarLength := int(float64(downloadBarLength) * percentDone) + for i := 0; i < partialBarLength-1; i++ { + output += "=" + } + + if partialBarLength > 0 { + output += ">" + } + + for i := 0; i < downloadBarLength-partialBarLength; i++ { + output += " " + } + output += fmt.Sprintf("| (%2v%%)", int(100.0*percentDone)) + } else { + for i := 0; i < downloadBarLength; i++ { + output += " " + } + output += fmt.Sprintf("| (error: %s)", err) + } + + // Add a newline if there are more downloads to show + if nameIdx < len(names)-1 { + output += "\n" + } + } + return output +} + func (q *Queue) getFinishedTorrents() []rtorrent.Torrent { torrents := []rtorrent.Torrent{} for hash, torrentPath := range q.downloadQueue { @@ -257,7 +387,7 @@ func (q *Queue) downloadTorrent(torrent rtorrent.Torrent, torrentFilePath string if info, err := os.Stat(downloadPath); os.IsExist(err) { if !info.IsDir() { - return fmt.Errorf("Unable to downlaod to temp path '%s' since it is not a directory", downloadPath) + return fmt.Errorf("Unable to download to temp path '%s' since it is not a directory", downloadPath) } } @@ -335,7 +465,7 @@ func (q *Queue) Run(stop <-chan bool) { finished := false lastUpdateTime := time.Time{} downloadedHashes := make(chan string, q.config.ConcurrentDownloads) - downloadsRunning := make(map[string]bool) + downloadsRunning := make(map[string]downloadInfo) for { cont := true for cont { @@ -372,6 +502,14 @@ func (q *Queue) Run(stop <-chan bool) { cont = false break + case rpcReq := <-q.rpcQueue: + switch rpcReq.method { + case "download_status": + status := q.getDownloadStatus(downloadsRunning) + rpcReq.replyChan <- status + } + break + case <-stop: finished = true cont = false @@ -443,6 +581,8 @@ func (q *Queue) Run(stop <-chan bool) { } } + torrentFilePath := q.downloadQueue[torrent.Hash] + go func(torrent rtorrent.Torrent, torrentPath string, hashChan chan<- string) { err := q.downloadTorrent(torrent, torrentPath) if err != nil { @@ -452,9 +592,14 @@ func (q *Queue) Run(stop <-chan bool) { } hashChan <- torrent.Hash - }(torrent, q.downloadQueue[torrent.Hash], downloadedHashes) + }(torrent, torrentFilePath, downloadedHashes) - downloadsRunning[torrent.Hash] = true + destDownloadDir := q.config.WatchDownloadPaths[filepath.Dir(torrentFilePath)] + downloadDir := filepath.Join(q.config.TempDownloadPath, destDownloadDir) + downloadsRunning[torrent.Hash] = downloadInfo{ + path: filepath.Join(downloadDir, torrent.Name), + size: torrent.Size, + } if uint(len(downloadsRunning)) == q.config.ConcurrentDownloads { break diff --git a/queue/rpc.go b/queue/rpc.go new file mode 100644 index 0000000..321d45d --- /dev/null +++ b/queue/rpc.go @@ -0,0 +1,38 @@ +package queue + +import "errors" + +type RPCReq struct { + method string + replyChan chan interface{} +} + +type RPCResponse string + +type Status struct { + queueChan chan RPCReq +} + +type RPCArgs struct{} + +func (s *Status) Downloads(_ RPCArgs, reply *RPCResponse) error { + replyChan := make(chan interface{}) + req := RPCReq{ + method: "download_status", + replyChan: replyChan, + } + + s.queueChan <- req + qReply := <-replyChan + + switch qReply.(type) { + case string: + *reply = RPCResponse(qReply.(string)) + break + default: + return errors.New("error: unexpected return value") + break + } + + return nil +} From fccc859d5754b7b6220f1b260c591dc7711b8a21 Mon Sep 17 00:00:00 2001 From: Michael Rosenberg <42micro@gmail.com> Date: Tue, 28 Jun 2016 20:42:51 -0400 Subject: [PATCH 2/4] Fix alignment on download progress example --- queue/queue.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/queue/queue.go b/queue/queue.go index 24b4174..a2570b8 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -262,9 +262,9 @@ func dirSize(path string) (int64, error) { } /* Output looks like -Errored.Download.mkv | | (error: could not stat file) Totally.Legit.Download.x264-KILLERS |===============> | (50%) -ubuntu.13.37.iso |===> | ( 7%) +ubuntu.13.37.iso |===> | ( 7%) +Errored.Download.mkv | | (error: could not stat file) */ func (q *Queue) getDownloadStatus(downloadsRunning map[string]downloadInfo) string { // We use maps so that we can traverse in name order From 4ad1968b2aeeb734f1035b1e5625739dd91bb42f Mon Sep 17 00:00:00 2001 From: Michael Rosenberg <42micro@gmail.com> Date: Mon, 4 Jul 2016 18:17:24 -0400 Subject: [PATCH 3/4] Don't leave RPC socket on the filesystem if an error happens in NewQueue --- queue/queue.go | 60 +++++++++++++++++++++++++------------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/queue/queue.go b/queue/queue.go index a2570b8..ad71b07 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -103,6 +103,17 @@ func prettyBytes(bytes float64) string { return output } +func newSftpClient(config *Config) (*easysftp.Client, error) { + return easysftp.Connect(&easysftp.ClientConfig{ + Username: config.SSH.Username, + Password: config.SSH.Password, + KeyPath: config.SSH.KeyPath, + Host: config.SSH.Addr, + Timeout: config.SSH.Timeout, + FileMode: config.DownloadFileMode, + }) +} + // NewQueue establishes all connections and watchers func NewQueue(config *Config, logger *log.Logger) (*Queue, error) { if config.WatchDownloadPaths == nil || len(config.WatchDownloadPaths) == 0 { @@ -113,16 +124,6 @@ func NewQueue(config *Config, logger *log.Logger) (*Queue, error) { return nil, errors.New("Must have queue.QueueConfig.RPCSocketPath set") } - // Set up RPC - rpcQueue := make(chan RPCReq) - status := Status{rpcQueue} - rpc.Register(&status) - rpcSocket, err := net.Listen("unix", config.RPCSocketPath) - if err != nil { - return nil, err - } - go rpc.Accept(rpcSocket) - for watchPath, downloadPath := range config.WatchDownloadPaths { config.WatchDownloadPaths[filepath.Clean(watchPath)] = filepath.Clean(downloadPath) } @@ -150,6 +151,24 @@ func NewQueue(config *Config, logger *log.Logger) (*Queue, error) { rtClient := rtorrent.New(config.Rtorrent.Addr, config.Rtorrent.InsecureCert) rtClient.SetAuth(config.Rtorrent.Username, config.Rtorrent.Password) + sftpClient, err := newSftpClient(config) + if err != nil { + return nil, err + } + + // the sftpClient connection was only made to verify settings + sftpClient.Close() + + // Set up RPC + rpcQueue := make(chan RPCReq) + status := Status{rpcQueue} + rpc.Register(&status) + rpcSocket, err := net.Listen("unix", config.RPCSocketPath) + if err != nil { + return nil, err + } + go rpc.Accept(rpcSocket) + q := &Queue{ rtClient: rtClient, sftpClient: nil, @@ -161,28 +180,9 @@ func NewQueue(config *Config, logger *log.Logger) (*Queue, error) { rpcQueue: rpcQueue, } - sftpClient, err := q.newSftpClient() - if err != nil { - return nil, err - } - - // the sftpClient connection was only made to verify settings - sftpClient.Close() - return q, nil } -func (q *Queue) newSftpClient() (*easysftp.Client, error) { - return easysftp.Connect(&easysftp.ClientConfig{ - Username: q.config.SSH.Username, - Password: q.config.SSH.Password, - KeyPath: q.config.SSH.KeyPath, - Host: q.config.SSH.Addr, - Timeout: q.config.SSH.Timeout, - FileMode: q.config.DownloadFileMode, - }) -} - // Close all of the connections and watchers func (q *Queue) Close() []error { errs := []error{} @@ -573,7 +573,7 @@ func (q *Queue) Run(stop <-chan bool) { } if q.sftpClient == nil { - q.sftpClient, err = q.newSftpClient() + q.sftpClient, err = newSftpClient(q.config) if err != nil { q.logger.Println("Failed to connect to sftp: ", err) q.sftpClient = nil From 01da49db25c2cb6c215caae94dec5e40bbcdb710 Mon Sep 17 00:00:00 2001 From: Michael Rosenberg <42micro@gmail.com> Date: Thu, 7 Jul 2016 21:56:44 -0400 Subject: [PATCH 4/4] Change defer to explicit function call before os.Exit() --- cmd/hoarder/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/hoarder/main.go b/cmd/hoarder/main.go index 348a181..eebb5d3 100644 --- a/cmd/hoarder/main.go +++ b/cmd/hoarder/main.go @@ -62,7 +62,6 @@ func main() { logger.Printf("Unable to open RPC socket file '%s': '%s'", config.RPCSocketPath, err) os.Exit(1) } - defer rpc.Close() reply := "" err = rpc.Call("Status.Downloads", &queue.RPCArgs{}, &reply) @@ -76,6 +75,8 @@ func main() { } else { fmt.Println("No Downloads") } + + rpc.Close() os.Exit(0) }