Merge pull request #7 from doomrobo/master
Implement download progress checking via RPC
This commit is contained in:
commit
44e04ef998
4 changed files with 245 additions and 28 deletions
|
@ -43,6 +43,9 @@ download_jobs: 2
|
|||
# Whether or not to attempt to resume a previously interrupted download
|
||||
resume_downloads: true
|
||||
|
||||
# Path to the unix socket file that hoarder uses for RPC
|
||||
rpc_socket_path: /tmp/hoarder.sock
|
||||
|
||||
rtorrent:
|
||||
# The address to the rtorrent XMLRPC endpoint
|
||||
addr: https://mycoolrtorrentserver.com/XMLRPC
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/tblyler/hoarder/queue"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/rpc"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
|
@ -19,6 +20,7 @@ var buildDate = "Unknown"
|
|||
func main() {
|
||||
version := flag.Bool("version", false, "display version info")
|
||||
configPath := flag.String("config", "", "path to the config file")
|
||||
getStatus := flag.Bool("getStatus", false, "get the status of the current downloads")
|
||||
flag.Parse()
|
||||
|
||||
if *version {
|
||||
|
@ -54,6 +56,30 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
if *getStatus {
|
||||
rpc, err := rpc.Dial("unix", config.RPCSocketPath)
|
||||
if err != nil {
|
||||
logger.Printf("Unable to open RPC socket file '%s': '%s'", config.RPCSocketPath, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
reply := ""
|
||||
err = rpc.Call("Status.Downloads", &queue.RPCArgs{}, &reply)
|
||||
if err != nil {
|
||||
logger.Printf("RPC call for download status failed: '%s'", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if len(reply) > 0 {
|
||||
fmt.Println(reply)
|
||||
} else {
|
||||
fmt.Println("No Downloads")
|
||||
}
|
||||
|
||||
rpc.Close()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
q, err := queue.NewQueue(config, logger)
|
||||
if err != nil {
|
||||
logger.Printf("Failed to start hoarder: '%s'", err)
|
||||
|
@ -74,4 +100,9 @@ func main() {
|
|||
logger.Println("Got signal ", sig, " quitting")
|
||||
stop <- true
|
||||
<-done
|
||||
|
||||
errs := q.Close()
|
||||
for _, err := range errs {
|
||||
logger.Println(err)
|
||||
}
|
||||
}
|
||||
|
|
197
queue/queue.go
197
queue/queue.go
|
@ -10,8 +10,11 @@ import (
|
|||
"github.com/tblyler/hoarder/metainfo"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/rpc"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -38,6 +41,7 @@ type Config struct {
|
|||
WatchDownloadPaths map[string]string `json:"watch_to_download_paths" yaml:"watch_to_download_paths,flow"`
|
||||
TempDownloadPath string `json:"temp_download_path" yaml:"temp_download_path"`
|
||||
FinishedTorrentFilePath map[string]string `json:"watch_to_finish_path" yaml:"watch_to_finish_path,flow"`
|
||||
RPCSocketPath string `json:"rpc_socket_path" yaml:"rpc_socket_path"`
|
||||
TorrentListUpdateInterval time.Duration `json:"rtorrent_update_interval" yaml:"rtorrent_update_interval"`
|
||||
ConcurrentDownloads uint `json:"download_jobs" yaml:"download_jobs"`
|
||||
ResumeDownloads bool `json:"resume_downloads" yaml:"resume_downloads"`
|
||||
|
@ -54,6 +58,13 @@ type Queue struct {
|
|||
torrentList map[string]rtorrent.Torrent
|
||||
downloadQueue map[string]string
|
||||
logger *log.Logger
|
||||
rpcSocket net.Listener
|
||||
rpcQueue chan RPCReq
|
||||
}
|
||||
|
||||
type downloadInfo struct {
|
||||
path string
|
||||
size int
|
||||
}
|
||||
|
||||
var prettyBytesValues = []float64{
|
||||
|
@ -92,12 +103,27 @@ func prettyBytes(bytes float64) string {
|
|||
return output
|
||||
}
|
||||
|
||||
func newSftpClient(config *Config) (*easysftp.Client, error) {
|
||||
return easysftp.Connect(&easysftp.ClientConfig{
|
||||
Username: config.SSH.Username,
|
||||
Password: config.SSH.Password,
|
||||
KeyPath: config.SSH.KeyPath,
|
||||
Host: config.SSH.Addr,
|
||||
Timeout: config.SSH.Timeout,
|
||||
FileMode: config.DownloadFileMode,
|
||||
})
|
||||
}
|
||||
|
||||
// NewQueue establishes all connections and watchers
|
||||
func NewQueue(config *Config, logger *log.Logger) (*Queue, error) {
|
||||
if config.WatchDownloadPaths == nil || len(config.WatchDownloadPaths) == 0 {
|
||||
return nil, errors.New("Must have queue.QueueConfig.WatchDownloadPaths set")
|
||||
}
|
||||
|
||||
if len(config.RPCSocketPath) == 0 {
|
||||
return nil, errors.New("Must have queue.QueueConfig.RPCSocketPath set")
|
||||
}
|
||||
|
||||
for watchPath, downloadPath := range config.WatchDownloadPaths {
|
||||
config.WatchDownloadPaths[filepath.Clean(watchPath)] = filepath.Clean(downloadPath)
|
||||
}
|
||||
|
@ -125,16 +151,7 @@ func NewQueue(config *Config, logger *log.Logger) (*Queue, error) {
|
|||
rtClient := rtorrent.New(config.Rtorrent.Addr, config.Rtorrent.InsecureCert)
|
||||
rtClient.SetAuth(config.Rtorrent.Username, config.Rtorrent.Password)
|
||||
|
||||
q := &Queue{
|
||||
rtClient: rtClient,
|
||||
sftpClient: nil,
|
||||
fsWatcher: fsWatcher,
|
||||
config: config,
|
||||
downloadQueue: make(map[string]string),
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
sftpClient, err := q.newSftpClient()
|
||||
sftpClient, err := newSftpClient(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -142,29 +159,47 @@ func NewQueue(config *Config, logger *log.Logger) (*Queue, error) {
|
|||
// the sftpClient connection was only made to verify settings
|
||||
sftpClient.Close()
|
||||
|
||||
return q, nil
|
||||
// Set up RPC
|
||||
rpcQueue := make(chan RPCReq)
|
||||
status := Status{rpcQueue}
|
||||
rpc.Register(&status)
|
||||
rpcSocket, err := net.Listen("unix", config.RPCSocketPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go rpc.Accept(rpcSocket)
|
||||
|
||||
q := &Queue{
|
||||
rtClient: rtClient,
|
||||
sftpClient: nil,
|
||||
fsWatcher: fsWatcher,
|
||||
config: config,
|
||||
downloadQueue: make(map[string]string),
|
||||
logger: logger,
|
||||
rpcSocket: rpcSocket,
|
||||
rpcQueue: rpcQueue,
|
||||
}
|
||||
|
||||
func (q *Queue) newSftpClient() (*easysftp.Client, error) {
|
||||
return easysftp.Connect(&easysftp.ClientConfig{
|
||||
Username: q.config.SSH.Username,
|
||||
Password: q.config.SSH.Password,
|
||||
KeyPath: q.config.SSH.KeyPath,
|
||||
Host: q.config.SSH.Addr,
|
||||
Timeout: q.config.SSH.Timeout,
|
||||
FileMode: q.config.DownloadFileMode,
|
||||
})
|
||||
return q, nil
|
||||
}
|
||||
|
||||
// Close all of the connections and watchers
|
||||
func (q *Queue) Close() []error {
|
||||
errs := []error{}
|
||||
|
||||
if q.sftpClient != nil {
|
||||
err := q.sftpClient.Close()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
err = q.fsWatcher.Close()
|
||||
err := q.fsWatcher.Close()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
err = q.rpcSocket.Close()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
@ -215,6 +250,101 @@ func (q *Queue) addTorrentFilePath(path string) error {
|
|||
return q.updateTorrentList()
|
||||
}
|
||||
|
||||
func dirSize(path string) (int64, error) {
|
||||
var size int64
|
||||
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
|
||||
if !info.IsDir() {
|
||||
size += info.Size()
|
||||
}
|
||||
return err
|
||||
})
|
||||
return size, err
|
||||
}
|
||||
|
||||
/* Output looks like
|
||||
Totally.Legit.Download.x264-KILLERS |===============> | (50%)
|
||||
ubuntu.13.37.iso |===> | ( 7%)
|
||||
Errored.Download.mkv | | (error: could not stat file)
|
||||
*/
|
||||
func (q *Queue) getDownloadStatus(downloadsRunning map[string]downloadInfo) string {
|
||||
// We use maps so that we can traverse in name order
|
||||
paths := make(map[string]string, len(downloadsRunning))
|
||||
names := make([]string, 0, len(downloadsRunning))
|
||||
sizes := make(map[string]int, len(downloadsRunning))
|
||||
for _, info := range downloadsRunning {
|
||||
name := filepath.Base(info.path)
|
||||
names = append(names, name)
|
||||
paths[name] = info.path
|
||||
sizes[name] = info.size
|
||||
}
|
||||
// Sort the torrent names so that they don't jump around every time this function is called
|
||||
sort.Strings(names)
|
||||
|
||||
maxNameLen := 0
|
||||
for _, name := range names {
|
||||
if len(name) > maxNameLen {
|
||||
maxNameLen = len(name)
|
||||
}
|
||||
}
|
||||
|
||||
output := ""
|
||||
downloadBarLength := 30
|
||||
|
||||
for nameIdx, name := range names {
|
||||
size := sizes[name]
|
||||
path := paths[name]
|
||||
|
||||
// Get the size of the data that we've downloaded so far
|
||||
var bytesDownloaded int64
|
||||
stat, err := os.Stat(path)
|
||||
if err == nil {
|
||||
if stat.IsDir() {
|
||||
bytesDownloaded, err = dirSize(path)
|
||||
} else {
|
||||
bytesDownloaded = stat.Size()
|
||||
}
|
||||
}
|
||||
|
||||
// Make the names and pad them with spaces on the right
|
||||
output += name
|
||||
for i := 0; i < maxNameLen-len(name); i++ {
|
||||
output += " "
|
||||
}
|
||||
output += " |"
|
||||
|
||||
// Make the download bar
|
||||
if err == nil {
|
||||
// Make the bar proportional to the amount downloaded vs the total size, and make the
|
||||
// final character a '>'
|
||||
percentDone := float64(bytesDownloaded) / float64(size)
|
||||
partialBarLength := int(float64(downloadBarLength) * percentDone)
|
||||
for i := 0; i < partialBarLength-1; i++ {
|
||||
output += "="
|
||||
}
|
||||
|
||||
if partialBarLength > 0 {
|
||||
output += ">"
|
||||
}
|
||||
|
||||
for i := 0; i < downloadBarLength-partialBarLength; i++ {
|
||||
output += " "
|
||||
}
|
||||
output += fmt.Sprintf("| (%2v%%)", int(100.0*percentDone))
|
||||
} else {
|
||||
for i := 0; i < downloadBarLength; i++ {
|
||||
output += " "
|
||||
}
|
||||
output += fmt.Sprintf("| (error: %s)", err)
|
||||
}
|
||||
|
||||
// Add a newline if there are more downloads to show
|
||||
if nameIdx < len(names)-1 {
|
||||
output += "\n"
|
||||
}
|
||||
}
|
||||
return output
|
||||
}
|
||||
|
||||
func (q *Queue) getFinishedTorrents() []rtorrent.Torrent {
|
||||
torrents := []rtorrent.Torrent{}
|
||||
for hash, torrentPath := range q.downloadQueue {
|
||||
|
@ -257,7 +387,7 @@ func (q *Queue) downloadTorrent(torrent rtorrent.Torrent, torrentFilePath string
|
|||
|
||||
if info, err := os.Stat(downloadPath); os.IsExist(err) {
|
||||
if !info.IsDir() {
|
||||
return fmt.Errorf("Unable to downlaod to temp path '%s' since it is not a directory", downloadPath)
|
||||
return fmt.Errorf("Unable to download to temp path '%s' since it is not a directory", downloadPath)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -335,7 +465,7 @@ func (q *Queue) Run(stop <-chan bool) {
|
|||
finished := false
|
||||
lastUpdateTime := time.Time{}
|
||||
downloadedHashes := make(chan string, q.config.ConcurrentDownloads)
|
||||
downloadsRunning := make(map[string]bool)
|
||||
downloadsRunning := make(map[string]downloadInfo)
|
||||
for {
|
||||
cont := true
|
||||
for cont {
|
||||
|
@ -372,6 +502,14 @@ func (q *Queue) Run(stop <-chan bool) {
|
|||
cont = false
|
||||
break
|
||||
|
||||
case rpcReq := <-q.rpcQueue:
|
||||
switch rpcReq.method {
|
||||
case "download_status":
|
||||
status := q.getDownloadStatus(downloadsRunning)
|
||||
rpcReq.replyChan <- status
|
||||
}
|
||||
break
|
||||
|
||||
case <-stop:
|
||||
finished = true
|
||||
cont = false
|
||||
|
@ -435,7 +573,7 @@ func (q *Queue) Run(stop <-chan bool) {
|
|||
}
|
||||
|
||||
if q.sftpClient == nil {
|
||||
q.sftpClient, err = q.newSftpClient()
|
||||
q.sftpClient, err = newSftpClient(q.config)
|
||||
if err != nil {
|
||||
q.logger.Println("Failed to connect to sftp: ", err)
|
||||
q.sftpClient = nil
|
||||
|
@ -443,6 +581,8 @@ func (q *Queue) Run(stop <-chan bool) {
|
|||
}
|
||||
}
|
||||
|
||||
torrentFilePath := q.downloadQueue[torrent.Hash]
|
||||
|
||||
go func(torrent rtorrent.Torrent, torrentPath string, hashChan chan<- string) {
|
||||
err := q.downloadTorrent(torrent, torrentPath)
|
||||
if err != nil {
|
||||
|
@ -452,9 +592,14 @@ func (q *Queue) Run(stop <-chan bool) {
|
|||
}
|
||||
|
||||
hashChan <- torrent.Hash
|
||||
}(torrent, q.downloadQueue[torrent.Hash], downloadedHashes)
|
||||
}(torrent, torrentFilePath, downloadedHashes)
|
||||
|
||||
downloadsRunning[torrent.Hash] = true
|
||||
destDownloadDir := q.config.WatchDownloadPaths[filepath.Dir(torrentFilePath)]
|
||||
downloadDir := filepath.Join(q.config.TempDownloadPath, destDownloadDir)
|
||||
downloadsRunning[torrent.Hash] = downloadInfo{
|
||||
path: filepath.Join(downloadDir, torrent.Name),
|
||||
size: torrent.Size,
|
||||
}
|
||||
|
||||
if uint(len(downloadsRunning)) == q.config.ConcurrentDownloads {
|
||||
break
|
||||
|
|
38
queue/rpc.go
Normal file
38
queue/rpc.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
package queue
|
||||
|
||||
import "errors"
|
||||
|
||||
type RPCReq struct {
|
||||
method string
|
||||
replyChan chan interface{}
|
||||
}
|
||||
|
||||
type RPCResponse string
|
||||
|
||||
type Status struct {
|
||||
queueChan chan RPCReq
|
||||
}
|
||||
|
||||
type RPCArgs struct{}
|
||||
|
||||
func (s *Status) Downloads(_ RPCArgs, reply *RPCResponse) error {
|
||||
replyChan := make(chan interface{})
|
||||
req := RPCReq{
|
||||
method: "download_status",
|
||||
replyChan: replyChan,
|
||||
}
|
||||
|
||||
s.queueChan <- req
|
||||
qReply := <-replyChan
|
||||
|
||||
switch qReply.(type) {
|
||||
case string:
|
||||
*reply = RPCResponse(qReply.(string))
|
||||
break
|
||||
default:
|
||||
return errors.New("error: unexpected return value")
|
||||
break
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Reference in a new issue