package queue import ( "bytes" "errors" "fmt" "github.com/fsnotify/fsnotify" "github.com/shirou/gopsutil/disk" "github.com/tblyler/easysftp" "github.com/tblyler/go-rtorrent/rtorrent" "github.com/tblyler/hoarder/metainfo" "io/ioutil" "log" "net" "net/rpc" "os" "path/filepath" "sort" "strconv" "strings" "time" ) const ( // DefaultDiskSpaceBackoff denotes how long to wait before checking // that the diskspace threshold is not longer being exceeded DefaultDiskSpaceBackoff = time.Second * 30 ) // Config defines the settings for watching, uploading, and downloading type Config struct { Rtorrent struct { Addr string `json:"addr" yaml:"addr"` InsecureCert bool `json:"insecure_cert" yaml:"insecure_cert"` Username string `json:"username" yaml:"username"` Password string `json:"password" yaml:"password"` } `json:"rtorrent" yaml:"rtorrent,flow"` SSH struct { Username string `json:"username" yaml:"username"` Password string `json:"password" yaml:"password"` KeyPath string `json:"privkey_path" yaml:"privkey_path"` Addr string `json:"addr" yaml:"addr"` Timeout time.Duration `json:"connect_timeout" yaml:"connect_timeout"` } `json:"ssh" yaml:"ssh,flow"` DownloadFileMode os.FileMode `json:"file_download_filemode" yaml:"file_download_filemode"` WatchDownloadPaths map[string]string `json:"watch_to_download_paths" yaml:"watch_to_download_paths,flow"` TempDownloadPath string `json:"temp_download_path" yaml:"temp_download_path"` FinishedTorrentFilePath map[string]string `json:"watch_to_finish_path" yaml:"watch_to_finish_path,flow"` RPCSocketPath string `json:"rpc_socket_path" yaml:"rpc_socket_path"` TorrentListUpdateInterval time.Duration `json:"rtorrent_update_interval" yaml:"rtorrent_update_interval"` ConcurrentDownloads uint `json:"download_jobs" yaml:"download_jobs"` ResumeDownloads bool `json:"resume_downloads" yaml:"resume_downloads"` CheckDiskSpace bool `json:"check_disk_space" yaml:"check_disk_space"` MinDiskSpace uint64 `json:"min_disk_space" yaml:"min_disk_space"` } // Queue watches the given folders for new .torrent files, // uploads them to the given rTorrent server, // and then downloads them over SSH to the given download path. type Queue struct { rtClient *rtorrent.RTorrent sftpClient *easysftp.Client fsWatcher *fsnotify.Watcher config *Config torrentList map[string]rtorrent.Torrent downloadQueue map[string]string logger *log.Logger rpcSocket net.Listener rpcQueue chan RPCReq } type downloadInfo struct { path string size int } var prettyBytesValues = []float64{ 1024, 1024 * 1024, 1024 * 1024 * 1024, 1024 * 1024 * 1024 * 1024, 1024 * 1024 * 1024 * 1024 * 1024, 1024 * 1024 * 1024 * 1024 * 1024 * 1024, } var prettyBytesNames = []string{ "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", } func prettyBytes(bytes float64) string { output := strconv.FormatFloat(bytes, 'f', 2, 64) + "B" for i, divisor := range prettyBytesValues { newBytes := bytes / divisor if newBytes > 1024 { continue } if newBytes < 1 { break } output = strconv.FormatFloat(newBytes, 'f', 2, 64) + prettyBytesNames[i] } return output } func newSftpClient(config *Config) (*easysftp.Client, error) { return easysftp.Connect(&easysftp.ClientConfig{ Username: config.SSH.Username, Password: config.SSH.Password, KeyPath: config.SSH.KeyPath, Host: config.SSH.Addr, Timeout: config.SSH.Timeout, FileMode: config.DownloadFileMode, }) } // NewQueue establishes all connections and watchers func NewQueue(config *Config, logger *log.Logger) (*Queue, error) { if config.WatchDownloadPaths == nil || len(config.WatchDownloadPaths) == 0 { return nil, errors.New("Must have queue.QueueConfig.WatchDownloadPaths set") } if len(config.RPCSocketPath) == 0 { return nil, errors.New("Must have queue.QueueConfig.RPCSocketPath set") } for watchPath, downloadPath := range config.WatchDownloadPaths { config.WatchDownloadPaths[filepath.Clean(watchPath)] = filepath.Clean(downloadPath) } if config.ConcurrentDownloads == 0 { config.ConcurrentDownloads = 1 } if logger == nil { logger = log.New(ioutil.Discard, "", 0) } fsWatcher, err := fsnotify.NewWatcher() if err != nil { return nil, err } for watchPath := range config.WatchDownloadPaths { err = fsWatcher.Add(watchPath) if err != nil { return nil, err } } rtClient := rtorrent.New(config.Rtorrent.Addr, config.Rtorrent.InsecureCert) rtClient.SetAuth(config.Rtorrent.Username, config.Rtorrent.Password) sftpClient, err := newSftpClient(config) if err != nil { return nil, err } // the sftpClient connection was only made to verify settings sftpClient.Close() // Set up RPC rpcQueue := make(chan RPCReq) status := Status{rpcQueue} rpc.Register(&status) rpcSocket, err := net.Listen("unix", config.RPCSocketPath) if err != nil { return nil, err } go rpc.Accept(rpcSocket) q := &Queue{ rtClient: rtClient, sftpClient: nil, fsWatcher: fsWatcher, config: config, downloadQueue: make(map[string]string), logger: logger, rpcSocket: rpcSocket, rpcQueue: rpcQueue, } return q, nil } // Close all of the connections and watchers func (q *Queue) Close() []error { errs := []error{} if q.sftpClient != nil { err := q.sftpClient.Close() if err != nil { errs = append(errs, err) } } err := q.fsWatcher.Close() if err != nil { errs = append(errs, err) } err = q.rpcSocket.Close() if err != nil { errs = append(errs, err) } return errs } func (q *Queue) updateTorrentList() error { torrents, err := q.rtClient.GetTorrents(rtorrent.ViewMain) if err != nil { return err } torrentList := make(map[string]rtorrent.Torrent) for _, torrent := range torrents { torrentList[torrent.Hash] = torrent } q.torrentList = torrentList return nil } func (q *Queue) addTorrentFilePath(path string) error { torrentData, err := ioutil.ReadFile(path) if err != nil { return err } torrentHash, err := metainfo.GetTorrentHashHexString(bytes.NewReader(torrentData)) if err != nil { return err } q.downloadQueue[torrentHash] = path if _, exists := q.torrentList[torrentHash]; exists { // the torrent is already on the server return nil } err = q.rtClient.AddTorrent(torrentData) if err != nil { return err } return q.updateTorrentList() } func dirSize(path string) (int64, error) { var size int64 err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { if !info.IsDir() { size += info.Size() } return err }) return size, err } /* Output looks like Totally.Legit.Download.x264-KILLERS |===============> | (50%) ubuntu.13.37.iso |===> | ( 7%) Errored.Download.mkv | | (error: could not stat file) */ func (q *Queue) getDownloadStatus(downloadsRunning map[string]downloadInfo) string { // We use maps so that we can traverse in name order paths := make(map[string]string, len(downloadsRunning)) names := make([]string, 0, len(downloadsRunning)) sizes := make(map[string]int, len(downloadsRunning)) for _, info := range downloadsRunning { name := filepath.Base(info.path) names = append(names, name) paths[name] = info.path sizes[name] = info.size } // Sort the torrent names so that they don't jump around every time this function is called sort.Strings(names) maxNameLen := 0 for _, name := range names { if len(name) > maxNameLen { maxNameLen = len(name) } } output := "" downloadBarLength := 30 for nameIdx, name := range names { size := sizes[name] path := paths[name] // Get the size of the data that we've downloaded so far var bytesDownloaded int64 stat, err := os.Stat(path) if err == nil { if stat.IsDir() { bytesDownloaded, err = dirSize(path) } else { bytesDownloaded = stat.Size() } } // Make the names and pad them with spaces on the right output += name for i := 0; i < maxNameLen-len(name); i++ { output += " " } output += " |" // Make the download bar if err == nil { // Make the bar proportional to the amount downloaded vs the total size, and make the // final character a '>' percentDone := float64(bytesDownloaded) / float64(size) partialBarLength := int(float64(downloadBarLength) * percentDone) for i := 0; i < partialBarLength-1; i++ { output += "=" } if partialBarLength > 0 { output += ">" } for i := 0; i < downloadBarLength-partialBarLength; i++ { output += " " } output += fmt.Sprintf("| (%2v%%)", int(100.0*percentDone)) } else { for i := 0; i < downloadBarLength; i++ { output += " " } output += fmt.Sprintf("| (error: %s)", err) } // Add a newline if there are more downloads to show if nameIdx < len(names)-1 { output += "\n" } } return output } func (q *Queue) getFinishedTorrents() []rtorrent.Torrent { torrents := []rtorrent.Torrent{} for hash, torrentPath := range q.downloadQueue { torrent, exists := q.torrentList[hash] if !exists { err := q.addTorrentFilePath(torrentPath) if err == nil { q.logger.Printf("Added torrent '%s' to rTorrent", torrentPath) } else { q.logger.Printf("Unable to add torrent '%s' error '%s'", torrentPath, err) } continue } if !torrent.Completed { continue } torrent.Hash = strings.ToUpper(torrent.Hash) torrents = append(torrents, torrent) } return torrents } func (q *Queue) downloadTorrent(torrent rtorrent.Torrent, torrentFilePath string) error { if !torrent.Completed { return fmt.Errorf("'%s' is not a completed torrent, not downloading", torrentFilePath) } var downloadPath string destDownloadPath := q.config.WatchDownloadPaths[filepath.Dir(torrentFilePath)] if q.config.TempDownloadPath == "" { downloadPath = destDownloadPath } else { downloadPath = filepath.Join(q.config.TempDownloadPath, destDownloadPath) if info, err := os.Stat(downloadPath); os.IsExist(err) { if !info.IsDir() { return fmt.Errorf("Unable to download to temp path '%s' since it is not a directory", downloadPath) } } err := os.MkdirAll(downloadPath, q.config.DownloadFileMode) if err != nil { return fmt.Errorf("Unable to create temp download path '%s' error '%s'", downloadPath, err) } } q.logger.Printf("Downloading '%s' (%s) to '%s' (%s) %s", torrent.Name, torrentFilePath, downloadPath, destDownloadPath, prettyBytes(float64(torrent.Size))) err := q.sftpClient.Mirror(torrent.Path, downloadPath, q.config.ResumeDownloads) if err != nil { return fmt.Errorf("Failed to download '%s' to '%s' error '%s'", torrent.Path, downloadPath, err) } // we need to move the downlaod from the temp directory to the destination if downloadPath != destDownloadPath { fileName := filepath.Base(torrent.Path) downFullPath := filepath.Join(downloadPath, fileName) destFullPath := filepath.Join(destDownloadPath, fileName) err = os.Rename(downFullPath, destFullPath) if err != nil { return fmt.Errorf("Failed to move temp path '%s' to dest path '%s' error '%s'", downFullPath, destFullPath, err) } } parentTorrentPath := filepath.Dir(torrentFilePath) if movePath, exists := q.config.FinishedTorrentFilePath[parentTorrentPath]; exists { destFullPath := filepath.Join(movePath, filepath.Base(torrentFilePath)) err = os.Rename(torrentFilePath, destFullPath) if err != nil && os.IsExist(err) { return fmt.Errorf("Failed to move torrent file from '%s' to '%s' error '%s'", torrentFilePath, destFullPath, err) } } else { err = os.Remove(torrentFilePath) if err != nil && os.IsExist(err) { return fmt.Errorf("Failed to remove torrent file '%s' error '%s'", torrentFilePath, err) } } q.logger.Printf("Successfully downloaded '%s' (%s)", torrent.Name, torrentFilePath) return nil } // Run executes all steps needed for looking at the queue, catching updates, and processing all work func (q *Queue) Run(stop <-chan bool) { // for the initial run, load all files in the specified directories for localPath := range q.config.WatchDownloadPaths { files, err := ioutil.ReadDir(localPath) if err != nil { q.logger.Printf("Unable to read local path '%s' error '%s'", localPath, err) } for _, file := range files { // skip directories or files that do not end with .torrent if file.IsDir() || !strings.HasSuffix(file.Name(), ".torrent") { continue } fullPath := filepath.Join(localPath, file.Name()) q.logger.Printf("Adding %s to download queue", fullPath) err = q.addTorrentFilePath(fullPath) if err == nil { q.logger.Printf("Added torrent '%s' to rTorrent", fullPath) } else { q.logger.Printf("Failed to add torrent at '%s' error '%s'", fullPath, err) } } } // watch all directories for file changes var err error finished := false lastUpdateTime := time.Time{} diskSpaceBackOff := time.Now() downloadedHashes := make(chan string, q.config.ConcurrentDownloads) downloadsRunning := make(map[string]downloadInfo) for { cont := true for cont { select { case event := <-q.fsWatcher.Events: if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create || event.Op&fsnotify.Rename == fsnotify.Rename { // skip files that do not end with .torrent if !strings.HasSuffix(event.Name, ".torrent") { break } q.logger.Printf("Adding %s to download queue", event.Name) err = q.addTorrentFilePath(event.Name) if err == nil { q.logger.Printf("Added torrent '%s' to rTorrent", event.Name) } else { q.logger.Printf("Failed to add '%s' error '%s'", event.Name, err) } } else if event.Op&fsnotify.Remove == fsnotify.Remove { for hash, path := range q.downloadQueue { if path == event.Name { q.logger.Printf("Removing torrent '%s' from queue", path) delete(q.downloadQueue, hash) break } } } break case err = <-q.fsWatcher.Errors: q.logger.Printf("Error while watching folders '%s'", err) cont = false break case rpcReq := <-q.rpcQueue: switch rpcReq.method { case "download_status": status := q.getDownloadStatus(downloadsRunning) rpcReq.replyChan <- status } break case <-stop: finished = true cont = false break default: time.Sleep(time.Second) cont = false break } } if finished { break } if time.Now().Sub(lastUpdateTime) >= q.config.TorrentListUpdateInterval { err := q.updateTorrentList() if err == nil { lastUpdateTime = time.Now() } else { q.logger.Printf("Failed to update torrent list from rTorrent: '%s'", err) } } if len(downloadsRunning) == 0 { // close the sftp connection since it is not being used if q.sftpClient != nil { q.sftpClient.Close() q.sftpClient = nil } } else { cont = true for cont { select { case finishedHash := <-downloadedHashes: if finishedHash == "" { break } if finishedHash[0] == '!' { finishedHash = finishedHash[1:] } else { delete(q.downloadQueue, finishedHash) } delete(downloadsRunning, finishedHash) break default: cont = false break } } } if uint(len(downloadsRunning)) < q.config.ConcurrentDownloads && (!q.config.CheckDiskSpace || diskSpaceBackOff.Before(time.Now())) { for _, torrent := range q.getFinishedTorrents() { if _, exists := downloadsRunning[torrent.Hash]; exists { continue } if q.sftpClient == nil { q.sftpClient, err = newSftpClient(q.config) if err != nil { q.logger.Println("Failed to connect to sftp: ", err) q.sftpClient = nil continue } } torrentFilePath := q.downloadQueue[torrent.Hash] skip := false if q.config.CheckDiskSpace { diskSpacePaths := []string{q.config.WatchDownloadPaths[filepath.Dir(torrentFilePath)]} if q.config.TempDownloadPath != "" { diskSpacePaths = append(diskSpacePaths, q.config.TempDownloadPath) } downloadSizes := uint64(torrent.Size) for _, dTorrent := range downloadsRunning { downloadSizes += uint64(dTorrent.size) } for _, path := range diskSpacePaths { fsStat, err := disk.Usage(path) if err != nil { q.logger.Printf("Failed to check disk space on '%s' for '%s' (%s): %s", path, torrent.Name, torrentFilePath, err) continue } if q.config.MinDiskSpace == 0 { if fsStat.Free > downloadSizes { continue } diskSpaceBackOff = time.Now().Add(DefaultDiskSpaceBackoff) q.logger.Printf("Not downloading '%s' (%s) not enough disk space, only %d bytes free on '%s'", torrent.Name, torrentFilePath, fsStat.Free, path) skip = true break } else { if fsStat.Free > downloadSizes && (fsStat.Free-downloadSizes) > q.config.MinDiskSpace { continue } diskSpaceBackOff = time.Now().Add(DefaultDiskSpaceBackoff) q.logger.Printf("Not downloading '%s' (%s) minimum disk space (%d) reached on '%s'", torrent.Name, torrentFilePath, q.config.MinDiskSpace, path) skip = true break } } } if skip { continue } go func(torrent rtorrent.Torrent, torrentPath string, hashChan chan<- string) { err := q.downloadTorrent(torrent, torrentPath) if err != nil { q.logger.Printf("Failed to download '%s' (%s): %s", torrent.Name, torrentPath, err) hashChan <- "!" + torrent.Hash return } hashChan <- torrent.Hash }(torrent, torrentFilePath, downloadedHashes) destDownloadDir := q.config.WatchDownloadPaths[filepath.Dir(torrentFilePath)] downloadDir := filepath.Join(q.config.TempDownloadPath, destDownloadDir) downloadsRunning[torrent.Hash] = downloadInfo{ path: filepath.Join(downloadDir, torrent.Name), size: torrent.Size, } if uint(len(downloadsRunning)) == q.config.ConcurrentDownloads { break } } } } }