Compare commits

...

19 commits

Author SHA1 Message Date
Tony Blyler 838d0aa3a9 Use Go 1.7 for Travis and builds 2016-08-20 18:26:02 -04:00
Tony Blyler 588f0fb89b
Add a backoff timeout to checking diskspace and logging about it 2016-07-28 22:11:22 -04:00
Tony Blyler 5cb62347e7
Add ability to prevent downloads based off of available disk space 2016-07-23 17:23:36 -04:00
Tony Blyler d248260789
Clean up rpc.go file 2016-07-23 16:26:52 -04:00
Tony Blyler 44e04ef998 Merge pull request #7 from doomrobo/master
Implement download progress checking via RPC
2016-07-20 17:25:43 -04:00
Michael Rosenberg 01da49db25 Change defer to explicit function call before os.Exit() 2016-07-07 21:56:44 -04:00
Michael Rosenberg 4ad1968b2a Don't leave RPC socket on the filesystem if an error happens in NewQueue 2016-07-04 18:17:24 -04:00
Michael Rosenberg fccc859d57 Fix alignment on download progress example 2016-06-28 20:42:51 -04:00
Michael Rosenberg e958dcb451 New command line option -getStatus to view formatted download status; config now requires RPC socket path; queue.Close() now runs after interrupt is sent 2016-06-28 20:36:54 -04:00
Tony Blyler 7b8d4ed528
Update easysftp to its vendored version 2016-06-18 20:53:54 -04:00
Tony Blyler 2c78267568 Merge pull request #6 from doomrobo/patch-1
Update README.md
2016-06-18 20:44:33 -04:00
Michael Rosenberg d884835580 Update README.md
Include protocol scheme in the `rutorrent.addr` attribute. Omission of this gives me a frustratingly unhelpful error `Failed to update torrent list from rTorrent: 'Post mycoolrtorrentserver.com/XMLRPC: unsupported protocol scheme ""'`
2016-06-18 00:18:33 -04:00
Tony Blyler 270e766867
More Travis fixes 2016-06-17 10:12:56 -04:00
Tony Blyler d775b8f80d
Fix poor mistakes with travis 2016-06-17 10:04:47 -04:00
Tony Blyler c485a4d507
Fix travis notfications 2016-06-17 09:26:24 -04:00
Tony Blyler 4133c751eb
Fix Travis builds 2016-06-17 09:03:41 -04:00
Tony Blyler 9729316082
Do not try to run tests since we do not have any 2016-06-17 08:46:36 -04:00
Tony Blyler e269cde865
Convert yaml to use vendoring 2016-06-16 10:08:56 -04:00
Tony Blyler b5c5d2177b
Use vendoring 2016-06-15 20:41:31 -04:00
18 changed files with 375 additions and 32 deletions

36
.gitmodules vendored
View file

@ -0,0 +1,36 @@
[submodule "metainfo/vendor/github.com/anacrolix/missinggo"]
path = metainfo/vendor/github.com/anacrolix/missinggo
url = git://github.com/anacrolix/missinggo
[submodule "metainfo/vendor/github.com/anacrolix/torrent"]
path = metainfo/vendor/github.com/anacrolix/torrent
url = git://github.com/anacrolix/torrent
[submodule "metainfo/vendor/github.com/bradfitz/iter"]
path = metainfo/vendor/github.com/bradfitz/iter
url = git://github.com/bradfitz/iter
[submodule "queue/vendor/github.com/anacrolix/missinggo"]
path = queue/vendor/github.com/anacrolix/missinggo
url = git://github.com/anacrolix/missinggo
[submodule "queue/vendor/github.com/anacrolix/torrent"]
path = queue/vendor/github.com/anacrolix/torrent
url = git://github.com/anacrolix/torrent
[submodule "queue/vendor/github.com/bradfitz/iter"]
path = queue/vendor/github.com/bradfitz/iter
url = git://github.com/bradfitz/iter
[submodule "queue/vendor/github.com/fsnotify/fsnotify"]
path = queue/vendor/github.com/fsnotify/fsnotify
url = git://github.com/fsnotify/fsnotify
[submodule "queue/vendor/github.com/kr/fs"]
path = queue/vendor/github.com/kr/fs
url = git://github.com/kr/fs
[submodule "queue/vendor/github.com/pkg/sftp"]
path = queue/vendor/github.com/pkg/sftp
url = git://github.com/pkg/sftp
[submodule "queue/vendor/github.com/tblyler/easysftp"]
path = queue/vendor/github.com/tblyler/easysftp
url = git://github.com/tblyler/easysftp
[submodule "cmd/hoarder/vendor/github.com/go-yaml/yaml"]
path = cmd/hoarder/vendor/github.com/go-yaml/yaml
url = git://github.com/go-yaml/yaml
[submodule "queue/vendor/github.com/shirou/gopsutil"]
path = queue/vendor/github.com/shirou/gopsutil
url = https://github.com/shirou/gopsutil

View file

@ -5,7 +5,7 @@ branches:
language: go
go:
- 1.6
- 1.7
env:
- DATE=$(date '+%s') TAG="$(echo ${TRAVIS_BRANCH} | sed 's/-prerelease//g')-prerelease"
@ -14,6 +14,18 @@ before_install:
- go get github.com/mitchellh/gox
- go get github.com/tcnksm/ghr
install:
- go get ./cmd/hoarder
# There are no tests at the moment :(
script:
- "true"
after_success:
- "[ \"${TRAVIS_PULL_REQUEST}\" = \"false\" ] && gox -ldflags \"-X 'main.buildVersion=${BUILD_VERSION} (${TRAVIS_COMMIT})' -X 'main.buildDate=${DATE}'\" -output \"dist/hoarder_{{.OS}}_{{.Arch}}\" ./cmd/hoarder || false"
- '[ "${TRAVIS_PULL_REQUEST}" = "false" ] && ghr --username tblyler --token "${GITHUB_TOKEN}" --replace --prerelease --debug "${TAG}" dist/ || false'
notifications:
email:
on_success: change
on_failure: change

View file

@ -43,9 +43,18 @@ download_jobs: 2
# Whether or not to attempt to resume a previously interrupted download
resume_downloads: true
# Path to the unix socket file that hoarder uses for RPC
rpc_socket_path: /tmp/hoarder.sock
# Whether or not to see if there is enough disk space before starting a download
check_disk_space: true
# (must have check_disk_space set to true) Minimum disk space to have after completed downloads (measured in bytes, 0 to disable check)
min_disk_space: 5368709120
rtorrent:
# The address to the rtorrent XMLRPC endpoint
addr: mycoolrtorrentserver.com/XMLRPC
addr: https://mycoolrtorrentserver.com/XMLRPC
# true to ignore the certificate authenticity; false to honor it
insecure_cert: false

View file

@ -3,10 +3,11 @@ package main
import (
"flag"
"fmt"
"github.com/go-yaml/yaml"
"github.com/tblyler/hoarder/queue"
"gopkg.in/yaml.v2"
"io/ioutil"
"log"
"net/rpc"
"os"
"os/signal"
"strconv"
@ -19,6 +20,7 @@ var buildDate = "Unknown"
func main() {
version := flag.Bool("version", false, "display version info")
configPath := flag.String("config", "", "path to the config file")
getStatus := flag.Bool("getStatus", false, "get the status of the current downloads")
flag.Parse()
if *version {
@ -54,6 +56,30 @@ func main() {
os.Exit(1)
}
if *getStatus {
rpc, err := rpc.Dial("unix", config.RPCSocketPath)
if err != nil {
logger.Printf("Unable to open RPC socket file '%s': '%s'", config.RPCSocketPath, err)
os.Exit(1)
}
reply := ""
err = rpc.Call("Status.Downloads", &queue.RPCArgs{}, &reply)
if err != nil {
logger.Printf("RPC call for download status failed: '%s'", err)
os.Exit(1)
}
if len(reply) > 0 {
fmt.Println(reply)
} else {
fmt.Println("No Downloads")
}
rpc.Close()
os.Exit(0)
}
q, err := queue.NewQueue(config, logger)
if err != nil {
logger.Printf("Failed to start hoarder: '%s'", err)
@ -74,4 +100,9 @@ func main() {
logger.Println("Got signal ", sig, " quitting")
stop <- true
<-done
errs := q.Close()
for _, err := range errs {
logger.Println(err)
}
}

1
cmd/hoarder/vendor/github.com/go-yaml/yaml generated vendored Submodule

@ -0,0 +1 @@
Subproject commit a83829b6f1293c91addabc89d0571c246397bbf4

1
metainfo/vendor/github.com/anacrolix/missinggo generated vendored Submodule

@ -0,0 +1 @@
Subproject commit e40875155efce3d98562ca9e265e152c364ada3e

1
metainfo/vendor/github.com/anacrolix/torrent generated vendored Submodule

@ -0,0 +1 @@
Subproject commit dcfee93f96d231b5590f991313b5d9f925757f52

1
metainfo/vendor/github.com/bradfitz/iter generated vendored Submodule

@ -0,0 +1 @@
Subproject commit 454541ec3da2a73fc34fd049b19ee5777bf19345

View file

@ -5,18 +5,28 @@ import (
"errors"
"fmt"
"github.com/fsnotify/fsnotify"
"github.com/shirou/gopsutil/disk"
"github.com/tblyler/easysftp"
"github.com/tblyler/go-rtorrent/rtorrent"
"github.com/tblyler/hoarder/metainfo"
"io/ioutil"
"log"
"net"
"net/rpc"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
)
const (
// DefaultDiskSpaceBackoff denotes how long to wait before checking
// that the diskspace threshold is not longer being exceeded
DefaultDiskSpaceBackoff = time.Second * 30
)
// Config defines the settings for watching, uploading, and downloading
type Config struct {
Rtorrent struct {
@ -38,9 +48,12 @@ type Config struct {
WatchDownloadPaths map[string]string `json:"watch_to_download_paths" yaml:"watch_to_download_paths,flow"`
TempDownloadPath string `json:"temp_download_path" yaml:"temp_download_path"`
FinishedTorrentFilePath map[string]string `json:"watch_to_finish_path" yaml:"watch_to_finish_path,flow"`
RPCSocketPath string `json:"rpc_socket_path" yaml:"rpc_socket_path"`
TorrentListUpdateInterval time.Duration `json:"rtorrent_update_interval" yaml:"rtorrent_update_interval"`
ConcurrentDownloads uint `json:"download_jobs" yaml:"download_jobs"`
ResumeDownloads bool `json:"resume_downloads" yaml:"resume_downloads"`
CheckDiskSpace bool `json:"check_disk_space" yaml:"check_disk_space"`
MinDiskSpace uint64 `json:"min_disk_space" yaml:"min_disk_space"`
}
// Queue watches the given folders for new .torrent files,
@ -54,6 +67,13 @@ type Queue struct {
torrentList map[string]rtorrent.Torrent
downloadQueue map[string]string
logger *log.Logger
rpcSocket net.Listener
rpcQueue chan RPCReq
}
type downloadInfo struct {
path string
size int
}
var prettyBytesValues = []float64{
@ -92,12 +112,27 @@ func prettyBytes(bytes float64) string {
return output
}
func newSftpClient(config *Config) (*easysftp.Client, error) {
return easysftp.Connect(&easysftp.ClientConfig{
Username: config.SSH.Username,
Password: config.SSH.Password,
KeyPath: config.SSH.KeyPath,
Host: config.SSH.Addr,
Timeout: config.SSH.Timeout,
FileMode: config.DownloadFileMode,
})
}
// NewQueue establishes all connections and watchers
func NewQueue(config *Config, logger *log.Logger) (*Queue, error) {
if config.WatchDownloadPaths == nil || len(config.WatchDownloadPaths) == 0 {
return nil, errors.New("Must have queue.QueueConfig.WatchDownloadPaths set")
}
if len(config.RPCSocketPath) == 0 {
return nil, errors.New("Must have queue.QueueConfig.RPCSocketPath set")
}
for watchPath, downloadPath := range config.WatchDownloadPaths {
config.WatchDownloadPaths[filepath.Clean(watchPath)] = filepath.Clean(downloadPath)
}
@ -125,16 +160,7 @@ func NewQueue(config *Config, logger *log.Logger) (*Queue, error) {
rtClient := rtorrent.New(config.Rtorrent.Addr, config.Rtorrent.InsecureCert)
rtClient.SetAuth(config.Rtorrent.Username, config.Rtorrent.Password)
q := &Queue{
rtClient: rtClient,
sftpClient: nil,
fsWatcher: fsWatcher,
config: config,
downloadQueue: make(map[string]string),
logger: logger,
}
sftpClient, err := q.newSftpClient()
sftpClient, err := newSftpClient(config)
if err != nil {
return nil, err
}
@ -142,29 +168,47 @@ func NewQueue(config *Config, logger *log.Logger) (*Queue, error) {
// the sftpClient connection was only made to verify settings
sftpClient.Close()
return q, nil
}
// Set up RPC
rpcQueue := make(chan RPCReq)
status := Status{rpcQueue}
rpc.Register(&status)
rpcSocket, err := net.Listen("unix", config.RPCSocketPath)
if err != nil {
return nil, err
}
go rpc.Accept(rpcSocket)
func (q *Queue) newSftpClient() (*easysftp.Client, error) {
return easysftp.Connect(&easysftp.ClientConfig{
Username: q.config.SSH.Username,
Password: q.config.SSH.Password,
KeyPath: q.config.SSH.KeyPath,
Host: q.config.SSH.Addr,
Timeout: q.config.SSH.Timeout,
FileMode: q.config.DownloadFileMode,
})
q := &Queue{
rtClient: rtClient,
sftpClient: nil,
fsWatcher: fsWatcher,
config: config,
downloadQueue: make(map[string]string),
logger: logger,
rpcSocket: rpcSocket,
rpcQueue: rpcQueue,
}
return q, nil
}
// Close all of the connections and watchers
func (q *Queue) Close() []error {
errs := []error{}
err := q.sftpClient.Close()
if q.sftpClient != nil {
err := q.sftpClient.Close()
if err != nil {
errs = append(errs, err)
}
}
err := q.fsWatcher.Close()
if err != nil {
errs = append(errs, err)
}
err = q.fsWatcher.Close()
err = q.rpcSocket.Close()
if err != nil {
errs = append(errs, err)
}
@ -215,6 +259,101 @@ func (q *Queue) addTorrentFilePath(path string) error {
return q.updateTorrentList()
}
func dirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if !info.IsDir() {
size += info.Size()
}
return err
})
return size, err
}
/* Output looks like
Totally.Legit.Download.x264-KILLERS |===============> | (50%)
ubuntu.13.37.iso |===> | ( 7%)
Errored.Download.mkv | | (error: could not stat file)
*/
func (q *Queue) getDownloadStatus(downloadsRunning map[string]downloadInfo) string {
// We use maps so that we can traverse in name order
paths := make(map[string]string, len(downloadsRunning))
names := make([]string, 0, len(downloadsRunning))
sizes := make(map[string]int, len(downloadsRunning))
for _, info := range downloadsRunning {
name := filepath.Base(info.path)
names = append(names, name)
paths[name] = info.path
sizes[name] = info.size
}
// Sort the torrent names so that they don't jump around every time this function is called
sort.Strings(names)
maxNameLen := 0
for _, name := range names {
if len(name) > maxNameLen {
maxNameLen = len(name)
}
}
output := ""
downloadBarLength := 30
for nameIdx, name := range names {
size := sizes[name]
path := paths[name]
// Get the size of the data that we've downloaded so far
var bytesDownloaded int64
stat, err := os.Stat(path)
if err == nil {
if stat.IsDir() {
bytesDownloaded, err = dirSize(path)
} else {
bytesDownloaded = stat.Size()
}
}
// Make the names and pad them with spaces on the right
output += name
for i := 0; i < maxNameLen-len(name); i++ {
output += " "
}
output += " |"
// Make the download bar
if err == nil {
// Make the bar proportional to the amount downloaded vs the total size, and make the
// final character a '>'
percentDone := float64(bytesDownloaded) / float64(size)
partialBarLength := int(float64(downloadBarLength) * percentDone)
for i := 0; i < partialBarLength-1; i++ {
output += "="
}
if partialBarLength > 0 {
output += ">"
}
for i := 0; i < downloadBarLength-partialBarLength; i++ {
output += " "
}
output += fmt.Sprintf("| (%2v%%)", int(100.0*percentDone))
} else {
for i := 0; i < downloadBarLength; i++ {
output += " "
}
output += fmt.Sprintf("| (error: %s)", err)
}
// Add a newline if there are more downloads to show
if nameIdx < len(names)-1 {
output += "\n"
}
}
return output
}
func (q *Queue) getFinishedTorrents() []rtorrent.Torrent {
torrents := []rtorrent.Torrent{}
for hash, torrentPath := range q.downloadQueue {
@ -257,7 +396,7 @@ func (q *Queue) downloadTorrent(torrent rtorrent.Torrent, torrentFilePath string
if info, err := os.Stat(downloadPath); os.IsExist(err) {
if !info.IsDir() {
return fmt.Errorf("Unable to downlaod to temp path '%s' since it is not a directory", downloadPath)
return fmt.Errorf("Unable to download to temp path '%s' since it is not a directory", downloadPath)
}
}
@ -334,8 +473,9 @@ func (q *Queue) Run(stop <-chan bool) {
var err error
finished := false
lastUpdateTime := time.Time{}
diskSpaceBackOff := time.Now()
downloadedHashes := make(chan string, q.config.ConcurrentDownloads)
downloadsRunning := make(map[string]bool)
downloadsRunning := make(map[string]downloadInfo)
for {
cont := true
for cont {
@ -372,6 +512,14 @@ func (q *Queue) Run(stop <-chan bool) {
cont = false
break
case rpcReq := <-q.rpcQueue:
switch rpcReq.method {
case "download_status":
status := q.getDownloadStatus(downloadsRunning)
rpcReq.replyChan <- status
}
break
case <-stop:
finished = true
cont = false
@ -428,14 +576,14 @@ func (q *Queue) Run(stop <-chan bool) {
}
}
if uint(len(downloadsRunning)) < q.config.ConcurrentDownloads {
if uint(len(downloadsRunning)) < q.config.ConcurrentDownloads && (!q.config.CheckDiskSpace || diskSpaceBackOff.Before(time.Now())) {
for _, torrent := range q.getFinishedTorrents() {
if _, exists := downloadsRunning[torrent.Hash]; exists {
continue
}
if q.sftpClient == nil {
q.sftpClient, err = q.newSftpClient()
q.sftpClient, err = newSftpClient(q.config)
if err != nil {
q.logger.Println("Failed to connect to sftp: ", err)
q.sftpClient = nil
@ -443,6 +591,54 @@ func (q *Queue) Run(stop <-chan bool) {
}
}
torrentFilePath := q.downloadQueue[torrent.Hash]
skip := false
if q.config.CheckDiskSpace {
diskSpacePaths := []string{q.config.WatchDownloadPaths[filepath.Dir(torrentFilePath)]}
if q.config.TempDownloadPath != "" {
diskSpacePaths = append(diskSpacePaths, q.config.TempDownloadPath)
}
downloadSizes := uint64(torrent.Size)
for _, dTorrent := range downloadsRunning {
downloadSizes += uint64(dTorrent.size)
}
for _, path := range diskSpacePaths {
fsStat, err := disk.Usage(path)
if err != nil {
q.logger.Printf("Failed to check disk space on '%s' for '%s' (%s): %s", path, torrent.Name, torrentFilePath, err)
continue
}
if q.config.MinDiskSpace == 0 {
if fsStat.Free > downloadSizes {
continue
}
diskSpaceBackOff = time.Now().Add(DefaultDiskSpaceBackoff)
q.logger.Printf("Not downloading '%s' (%s) not enough disk space, only %d bytes free on '%s'", torrent.Name, torrentFilePath, fsStat.Free, path)
skip = true
break
} else {
if fsStat.Free > downloadSizes && (fsStat.Free-downloadSizes) > q.config.MinDiskSpace {
continue
}
diskSpaceBackOff = time.Now().Add(DefaultDiskSpaceBackoff)
q.logger.Printf("Not downloading '%s' (%s) minimum disk space (%d) reached on '%s'", torrent.Name, torrentFilePath, q.config.MinDiskSpace, path)
skip = true
break
}
}
}
if skip {
continue
}
go func(torrent rtorrent.Torrent, torrentPath string, hashChan chan<- string) {
err := q.downloadTorrent(torrent, torrentPath)
if err != nil {
@ -452,9 +648,14 @@ func (q *Queue) Run(stop <-chan bool) {
}
hashChan <- torrent.Hash
}(torrent, q.downloadQueue[torrent.Hash], downloadedHashes)
}(torrent, torrentFilePath, downloadedHashes)
downloadsRunning[torrent.Hash] = true
destDownloadDir := q.config.WatchDownloadPaths[filepath.Dir(torrentFilePath)]
downloadDir := filepath.Join(q.config.TempDownloadPath, destDownloadDir)
downloadsRunning[torrent.Hash] = downloadInfo{
path: filepath.Join(downloadDir, torrent.Name),
size: torrent.Size,
}
if uint(len(downloadsRunning)) == q.config.ConcurrentDownloads {
break

42
queue/rpc.go Normal file
View file

@ -0,0 +1,42 @@
package queue
import "errors"
// RPCReq contains information for communicating with the RPC server
type RPCReq struct {
method string
replyChan chan interface{}
}
// RPCResponse holds the response
type RPCResponse string
// Status contains RPC request information
type Status struct {
queueChan chan RPCReq
}
// RPCArgs arg information for the RPC server
type RPCArgs struct{}
// Downloads gets download statuses
func (s *Status) Downloads(_ RPCArgs, reply *RPCResponse) error {
replyChan := make(chan interface{})
req := RPCReq{
method: "download_status",
replyChan: replyChan,
}
s.queueChan <- req
qReply := <-replyChan
switch qReply.(type) {
case string:
*reply = RPCResponse(qReply.(string))
break
default:
return errors.New("error: unexpected return value")
}
return nil
}

1
queue/vendor/github.com/anacrolix/missinggo generated vendored Submodule

@ -0,0 +1 @@
Subproject commit e40875155efce3d98562ca9e265e152c364ada3e

1
queue/vendor/github.com/anacrolix/torrent generated vendored Submodule

@ -0,0 +1 @@
Subproject commit ad5d5bc35038058d218d85a391deb50cbb8d5758

1
queue/vendor/github.com/bradfitz/iter generated vendored Submodule

@ -0,0 +1 @@
Subproject commit 454541ec3da2a73fc34fd049b19ee5777bf19345

1
queue/vendor/github.com/fsnotify/fsnotify generated vendored Submodule

@ -0,0 +1 @@
Subproject commit 30411dbcefb7a1da7e84f75530ad3abe4011b4f8

1
queue/vendor/github.com/kr/fs generated vendored Submodule

@ -0,0 +1 @@
Subproject commit 2788f0dbd16903de03cb8186e5c7d97b69ad387b

1
queue/vendor/github.com/pkg/sftp generated vendored Submodule

@ -0,0 +1 @@
Subproject commit d4c18e7ffdc496a38de67dde6e29b2f364afc472

1
queue/vendor/github.com/shirou/gopsutil generated vendored Submodule

@ -0,0 +1 @@
Subproject commit ee66bc560c366dd33b9a4046ba0b644caba46bed

1
queue/vendor/github.com/tblyler/easysftp generated vendored Submodule

@ -0,0 +1 @@
Subproject commit 19dd408ec9f250cbb648868965a0dff4bd82edfb