diff --git a/.gitignore b/.gitignore index d0be0af..1377554 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1 @@ -hoarder.conf *.swp diff --git a/README.md b/README.md index c6ba610..d442b22 100644 --- a/README.md +++ b/README.md @@ -4,17 +4,15 @@ Uploads .torrent files from a local "blackhole" to a remote (SSH) rtorrent watch Requirements ------------ -Go - -Install -------- -1. `go get github.com/tblyler/hoarder` -2. `go install github.com/tblyler/hoarder` +* bash +* curl +* rsync +* scp Configuration ------------- -Make sure you make a copy of the conf file in the repo to suit your needs. +Edit the variables at the top of hoarder.sh to your liking. Running ------- -After installation, just run the hoarder executable with the --config flag to specify where the config file is. +Run hoarder.sh with bash diff --git a/hoarder.conf.ex b/hoarder.conf.ex deleted file mode 100644 index 1098f1a..0000000 --- a/hoarder.conf.ex +++ /dev/null @@ -1,31 +0,0 @@ -// Configuration for Hoarder -// All fields and values are necessary - -// Should Hoarder automatically restart when one of its processes errors? (default true if not set) -restart_on_error: "true" -// Username for XMLRPC -xml_user: "testuser" -// Password for XMLRPC -xml_pass: "supersecure" -// Address to XMLRPC -xml_address: "https://mysweetaddress:443/xmlrpc" -// Amount of download threads per file -threads: "4" -// Username for logging into the ssh server -ssh_user: "testsshuser" -// Password for the logging into the ssh server -ssh_pass: "bestpasswordever" -// Server address for the ssh server -ssh_server: "sshserveraddress" -// Port for the ssh server -ssh_port: "22" -// Location to temporarily download files -temp_download_dir: "/home/user/tmp_download/" -// Location to move downloaded files from temp_download_dir to -local_download_dir: "/home/user/Downloads/" -// Locaiton to watch for .torrent files -local_torrent_dir: "/home/user/torrent_files/" -// Remote location on the SSH server to download torrent data from -remote_download_dir: "/home/user/files/" -// Remote location on the SSH server to upload .torrent files to -remote_torrent_dir: "/home/user/watch/" diff --git a/hoarder.go b/hoarder.go deleted file mode 100644 index ef1934e..0000000 --- a/hoarder.go +++ /dev/null @@ -1,331 +0,0 @@ -// Watch a torrent directory, poll rtorrent, and download completed torrents over SFTP. -package main - -import ( - "errors" - "flag" - "github.com/adampresley/sigint" - "io/ioutil" - "log" - "os" - "path/filepath" - "strings" - "time" -) - -// Load information from a given config file config_path -func loadConfig(configPath string) (map[string]string, error) { - file, err := os.Open(configPath) - if err != nil { - log.Println("Failed to open configuration file " + configPath) - return nil, err - } - - data, err := ioutil.ReadAll(file) - if err != nil { - log.Println("Failed to read configuration file " + configPath) - return nil, err - } - - config := make(map[string]string) - - lines := strings.Split(string(data), "\n") - for _, line := range lines { - line = strings.TrimSpace(line) - // Ignore comments - if len(line) <= 2 || line[:2] == "//" { - continue - } - - // Ignore malformed lines - sepPosition := strings.Index(line, ": \"") - if sepPosition == -1 { - continue - } - - config[line[:sepPosition]] = line[sepPosition+3 : len(line)-1] - } - - return config, nil -} - -// Checker routine to see if torrents are completed -func checker(config map[string]string, checkerChan <-chan map[string]string, com chan<- error) error { - for { - torrentInfo := <-checkerChan - - log.Println("Started checking " + torrentInfo["torrent_path"]) - - torrent, err := NewTorrent(config["xml_user"], config["xml_pass"], config["xml_address"], torrentInfo["torrent_path"]) - if err != nil { - if !os.IsNotExist(err) { - log.Println("Failed to initialize torrent for " + torrentInfo["torrent_path"] + ": " + err.Error()) - } - - continue - } - - syncer, err := NewSync(config["threads"], config["ssh_user"], config["ssh_pass"], config["ssh_server"], config["ssh_port"]) - defer syncer.Close() - if err != nil { - log.Println("Failed to create a new sync: " + err.Error()) - com <- err - return err - } - - completed, err := torrent.GetTorrentComplete() - if err != nil { - log.Println("Failed to see if " + torrent.path + " is completed: " + err.Error()) - com <- err - return err - } - - name, err := torrent.GetTorrentName() - if err != nil { - com <- err - return err - } - - if completed { - log.Println(name + " is completed, starting download now") - - remoteDownloadPath := filepath.Join(config["remote_download_dir"], name) - exists, err := syncer.Exists(remoteDownloadPath) - if err != nil { - log.Println("Failed to see if " + remoteDownloadPath + " exists: " + err.Error()) - com <- err - return err - } - - // file/dir to downlaod does not exist! - if !exists { - err = errors.New(remoteDownloadPath + " does not exist on remote server") - com <- err - return err - } - - completedDestination := filepath.Join(torrentInfo["local_download_dir"], name) - - _, err = os.Stat(completedDestination) - if err == nil { - err = errors.New(completedDestination + " already exists, not downloading") - continue - } else if !os.IsNotExist(err) { - log.Println("Failed to stat: " + completedDestination + ": " + err.Error()) - com <- err - return err - } - - err = syncer.GetPath(remoteDownloadPath, config["temp_download_dir"]) - if err != nil { - log.Println("Failed to download " + remoteDownloadPath + ": " + err.Error()) - com <- err - return err - } - - log.Println("Successfully downloaded " + name) - - tempDestination := filepath.Join(config["temp_download_dir"], name) - - err = os.Rename(tempDestination, completedDestination) - if err != nil { - log.Println("Failed to move " + tempDestination + " to " + completedDestination + ": " + err.Error()) - com <- err - return err - } - - err = os.Remove(torrent.path) - if err != nil && !os.IsNotExist(err) { - log.Println("Failed to remove " + torrent.path + ": " + err.Error()) - com <- err - return err - } - } else { - log.Println(name + " is not completed, waiting for it to finish") - } - - syncer.Close() - } - - com <- nil - return nil -} - -// Scanner routine to see if there are new torrent_files -func scanner(config map[string]string, checkerChan chan<- map[string]string, com chan<- error) error { - watchDirs := map[string]string{config["local_torrent_dir"]: config["local_download_dir"]} - dirContents, err := ioutil.ReadDir(config["local_torrent_dir"]) - - if err != nil { - com <- err - return err - } - - for _, file := range dirContents { - if file.IsDir() { - watchDirs[filepath.Join(config["local_torrent_dir"], file.Name())] = filepath.Join(config["local_download_dir"], file.Name()) - } - } - - uploaded := make(map[string]bool) - downloadingTorrentPath := "" - for { - for watchDir, downloadDir := range watchDirs { - torrentFiles, err := ioutil.ReadDir(watchDir) - if err != nil { - com <- err - return err - } - - for _, torrentFile := range torrentFiles { - if torrentFile.IsDir() { - // skip because we don't do more than one level of watching - continue - } - - torrentPath := filepath.Join(watchDir, torrentFile.Name()) - - if !uploaded[torrentPath] { - syncer, err := NewSync("1", config["ssh_user"], config["ssh_pass"], config["ssh_server"], config["ssh_port"]) - if err != nil { - log.Println("Failed to create a new sync: " + err.Error()) - syncer.Close() - continue - } - - destinationTorrent := filepath.Join(config["remote_torrent_dir"], filepath.Base(torrentPath)) - exists, err := syncer.Exists(destinationTorrent) - if err != nil { - log.Println("Failed to see if " + torrentPath + " already exists on the server: " + err.Error()) - syncer.Close() - continue - } - - if exists { - uploaded[torrentPath] = true - } else { - err = syncer.SendFiles(map[string]string{torrentPath: destinationTorrent}) - if err == nil { - log.Println("Successfully uploaded " + torrentPath + " to " + destinationTorrent) - uploaded[torrentPath] = true - } else { - log.Println("Failed to upload " + torrentPath + " to " + destinationTorrent + ": " + err.Error()) - } - - syncer.Close() - continue - } - - syncer.Close() - } - - downloadInfo := map[string]string{ - "torrent_path": torrentPath, - "local_download_dir": downloadDir, - } - - // try to send the info to the checker goroutine (nonblocking) - select { - case checkerChan <- downloadInfo: - // don't keep track of completed downloads in the uploaded map - if downloadingTorrentPath != "" { - delete(uploaded, downloadingTorrentPath) - } - - downloadingTorrentPath = torrentPath - break - default: - break - } - } - } - - time.Sleep(time.Second * 30) - } - - com <- nil - return nil -} - -func die(exitCode int) { - log.Println("Quiting") - os.Exit(exitCode) -} - -func main() { - sigint.ListenForSIGINT(func() { - die(1) - }) - - var configPath string - flag.StringVar(&configPath, "config", "", "Location of the config file") - flag.Parse() - - if configPath == "" { - log.Println("Missing argument for configuration file path") - flag.PrintDefaults() - die(1) - } - - log.Println("Reading configuration file") - config, err := loadConfig(configPath) - if err != nil { - log.Println(err) - die(1) - } - - log.Println("Successfully read configuration file") - - checkerChan := make(chan map[string]string, 50) - - if err != nil { - log.Println(err) - die(1) - } - - log.Println("Starting the scanner routine") - scannerCom := make(chan error) - go scanner(config, checkerChan, scannerCom) - - log.Println("Starting the checker routine") - checkerCom := make(chan error) - go checker(config, checkerChan, checkerCom) - - restartOnError := true - if config["restart_on_error"] != "" { - restartOnError = config["restart_on_error"] == "true" - } - - for { - select { - case err := <-scannerCom: - if err != nil { - log.Println("Scanner failed: " + err.Error()) - - if restartOnError { - log.Println("Restarting scanner") - go scanner(config, checkerChan, scannerCom) - } else { - log.Println("Quiting due to scanner error") - die(1) - } - } - case err := <-checkerCom: - if err != nil { - log.Println("Checker failed: " + err.Error()) - - if restartOnError { - log.Println("Restarting checker") - go checker(config, checkerChan, checkerCom) - } else { - log.Println("Quiting due to checker error") - die(1) - } - } - default: - break - } - - time.Sleep(time.Second * 5) - } -} diff --git a/hoarder.sh b/hoarder.sh new file mode 100755 index 0000000..0983a0d --- /dev/null +++ b/hoarder.sh @@ -0,0 +1,385 @@ +#!/bin/sh + +## START CONFIGURATION +# Location where the .torrent files are stored locally +TORRENT_FILE_PATH='/home/tblyler/torrent_files' +# Location to initially download torrent data to from the remote SSH server +TORRENT_TMP_DOWNLOAD='/home/tblyler/torrents_tmp' +# Location to move the completed torrent data to from TORRENT_TMP_DOWNLOAD +TORRENT_DOWNLOAD='/home/tblyler/torrents' +# Amount of rsync processes to have running at one time +RSYNC_PROCESSES=2 +# Location on the remote SSH server to copy the .torrent files to +SSH_SERVER_TORRENT_FILE_PATH='watch' +# Location on the remote SSH server where the torrent data is stored +SSH_SERVER_DOWNLOAD_PATH='files' +# Address of the remote SSH server where the torrents are downloaded +SSH_SERVER='remote.rtorrent.com' +# The username to use to login to the SSH server +SSH_USER='sshUserName' +# The XMLRPC basic HTTP authentication username +XML_USER='XMLRPCUserName' +# The XMLRPC basic HTTP authentication password +XML_PASS='XMLRPCPassword' +# The XMLRPC url +XML_URL='https://XMLRPCURL.com/XMLRPC' +## END CONFIGURATION + +if ! which curl > /dev/null; then + echo 'curl must be installed' + exit 1 +fi + +if ! which scp > /dev/null; then + echo 'scp must be installed' + exit 1 +fi + +if ! which rsync > /dev/null; then + echo 'rsync must be installed' + exit 1 +fi + +if ! which python > /dev/null; then + if ! which python2 > /dev/null; then + echo 'python must be install' + exit 1 + fi +fi + +# Hacky method to create the XML for an XMLRPC request to rtorrent +xml() { + local method=$1 + local args=$2 + echo " + +${method} + + +${args} + + +" +} + +# Returns the current entity and its content in an XML response +read_dom() { + local IFS=\> + read -d \< ENTITY CONTENT +} + +# Sends an XMLRPC request to rtorrent via curl and returns its data +xml_curl() { + local method=$1 + local args=$2 + local xml_post=`xml "${method}" "${args}"` + local curl_command='curl -s' + if [[ "${XML_USER}" != '' ]]; then + local curl_command="${curl_command} --basic -u '${XML_USER}" + if [[ "${XML_USER}" != '' ]]; then + local curl_command="${curl_command}:${XML_PASS}" + fi + + local curl_command="${curl_command}'" + fi + + local curl_command="${curl_command} -d \"${xml_post}\" '${XML_URL}'" + + local xml_response=$(eval "${curl_command}") + local curl_return=$? + + echo "${xml_response}" + return $curl_return +} + +# Gets .torrent's name from the remote rtorrent XMLRPC +get_torrent_name() { + local torrent_hash=$1 + local xml_response=`xml_curl d.get_name "${torrent_hash}"` + local curl_return=$? + + if [[ "${curl_return}" -ne 0 ]]; then + echo "Curl failed to get torrent name with error code ${curl_return}" + return $curl_return + fi + + local torrent_name=`echo "${xml_response}" | while read_dom; do + if [[ "${ENTITY}" = "name" ]] && [[ "${CONTENT}" = "faultCode" ]]; then + local error=true + fi + + if [[ ! "${error}" ]] && [[ "${ENTITY}" = "string" ]]; then + echo "${CONTENT}" + fi + done` + + if [[ "${torrent_name}" = '' ]]; then + echo "${xml_response}" + return 1 + else + echo "${torrent_name}" + return 0 + fi +} + +# Get .torrent's completion status from the remote rtorrent +get_torrent_complete() { + local torrent_hash=$1 + local xml_response=`xml_curl d.get_complete "${torrent_hash}"` + local curl_return=$? + + if [[ "${curl_return}" -ne 0 ]]; then + echo "Curl failed to get torrent name with error code ${curl_return}" + return ${curl_return} + fi + + local torrent_completed=`echo "${xml_response}" | while read_dom; do + if [[ "${ENTITY}" = "name" ]] && [[ "${CONTENT}" = "faultCode" ]]; then + local error=true + fi + + if [[ ! "${error}" ]] && [[ "${ENTITY}" = "i8" ]]; then + echo "${CONTENT}" + fi + done` + + if [[ "${torrent_completed}" = '' ]]; then + echo "${xml_response}" + return 1 + else + echo "${torrent_completed}" + return 0 + fi +} + +# Check if a .torrent is loaded on the remote rtorrent +get_torrent_added() { + local torrent_hash=$1 + local xml_response=`xml_curl d.get_complete "${torrent_hash}"` + local curl_return=$? + + if [[ "${curl_return}" -ne 0 ]]; then + echo "Curl failed to get torrent name with error code ${curl_return}" + return ${curl_return} + fi + + local torrent_added=`echo "${xml_response}" | while read_dom; do + if [[ "${CONTENT}" = 'Could not find info-hash.' ]]; then + echo "${CONTENT}" + fi + done` + + if [[ "${torrent_added}" = '' ]]; then + echo 1 + else + echo 0 + fi +} + +# Get the info hash for a given .torrent file +get_torrent_hash() { + local torrent_file=$1 + if [[ ! -f "${torrent_file}" ]]; then + return 1 + fi + + local python_bin='python2' + if ! which "${python_bin}" 2>&1 > /dev/null; then + local python_bin='python' + if ! which "${python_bin}" 2>&1 > /dev/null; then + return 1 + fi + fi + + local torrent_hash=`"${python_bin}" - << END +import hashlib + +def compute_hash(file_path): + try: + data = open(file_path, 'rb').read() + except: + return False + data_len = len(data) + start = data.find("infod") + if start == -1: + return False + + start += 4 + current = start + 1 + dir_depth = 1 + while current < data_len and dir_depth > 0: + if data[current] == 'e': + dir_depth -= 1 + current += 1 + elif data[current] == 'l' or data[current] == 'd': + dir_depth += 1 + current += 1 + elif data[current] == 'i': + current += 1 + while data[current] != 'e': + current += 1 + current += 1 + elif data[current].isdigit(): + num = data[current] + current += 1 + while data[current] != ':': + num += data[current] + current += 1 + current += 1 + int(num) + else: + return False + + return hashlib.sha1(data[start:current]).hexdigest().upper() + +print(compute_hash("${torrent_file}")) +END + ` + + if [[ ! $? ]] || [[ "${torrent_hash}" = 'False' ]]; then + return 1 + fi + + echo $torrent_hash +} + +# keep track of the .torrent files to be downloaded +declare -A TORRENT_QUEUE +# keep track of the rsyncs to download torrent data +declare -A RUNNING_RSYNCS +# run indefinitely +while true; do + # check to make sure the path of the local .torrent files exists + if [[ ! -d "${TORRENT_FILE_PATH}" ]]; then + echo "${TORRENT_FILE_PATH} Does not exist" + exit 1 + fi + # enumerate the .torrent file directory + for file in `ls "${TORRENT_FILE_PATH}"`; do + # store the file/directory's full path + file="${TORRENT_FILE_PATH}/${file}" + # check if the path is a directory + if [[ -d "${file}" ]]; then + # enumerate the directory + for sub_file in `ls "${file}"`; do + # store the file/directory's full path + sub_file="${file}/${sub_file}" + # this is the furthest we will descend + if [[ -f "${sub_file}" ]]; then + # get the torrent hash for the .torrent file + torrent_hash=`get_torrent_hash "${sub_file}"` + if [[ ! $? ]]; then + echo "Failed to get the torrent hash of ${sub_file}" + continue + fi + + # add the torrent to the queue if it is not already in the queue + if [[ ! ${TORRENT_QUEUE[${torrent_hash}]+_} ]]; then + $TORRENT_QUEUE[$torrent_hash]="${sub_file}" + fi + fi + done + # check that the path is a file + elif [[ -f "${file}" ]]; then + # get the torrent hash for the .torrent file + torrent_hash=`get_torrent_hash "${file}"` + if [[ ! $? ]]; then + echo "Failed to get the torrent hash of ${file}" + continue + fi + + # add the torrent to the queue if it is not already in the queue + if [[ ! ${TORRENT_QUEUE[${torrent_hash}]+_} ]]; then + $TORRENT_QUEUE[$torrent_hash]="${file}" + fi + fi + done + + # go through the torrent queue + for torrent_hash in "${!TORRENT_QUEUE[@]}"; do + # continue if the torrent is already being downloaded + if [[ ${RUNNING_RSYNCS[$torrent_hash]+_} ]]; then + continue + fi + + # check to see if the torrent is on the rtorrent server + torrent_added=`get_torrent_added "${torrent_hash}"` + if [[ ! $? ]]; then + echo "Failed to see if ${TORRENT_QUEUE[$torrent_hash]} exists on the rtorrent server" + continue + fi + + # if the torrent is not on the rtorrent server, upload it + if [[ $torrent_added -eq 0 ]]; then + scp "${TORRENT_QUEUE[$torrent_hash]}" "${SSH_USER}@${SSH_SERVER}:${SSH_SERVER_TORRENT_FILE_PATH}" + if [[ ! $? ]]; then + echo "Failed to upload ${TORRENT_QUEUE[$torrent_hash]}" + fi + fi + done + + # if the amount of running rsyncs is belwo the desire amount, run items from the queue + if [[ ${#RUNNING_RSYNCS[@]} -lt ${RSYNC_PROCESSES} ]]; then + for torrent_hash in "${!TORRENT_QUEUE[@]}"; do + # make sure this torrent is not already being downloaded + if [[ ${RUNNING_RSYNCS[${torrent_hash}]+_} ]]; then + continue + fi + + # see if the torrent is finished downloading remotely + torrent_completed=`get_torrent_complete "${torrent_hash}"` + if [[ ! $? ]]; then + echo "Failed to check if ${TORRENT_QUEUE[$torrent_hash]} is completed" + continue + fi + + # the torrent is finished downloading remotely + if [[ "${torrent_completed}" -eq 1 ]]; then + torrent_name=`get_torrent_name "${torrent_hash}"` + if [[ ! $? ]]; then + echo "Failed to get torrent name for ${TORRENT_QUEUE[$torrent_hash]}" + continue + fi + + # start the download and record the PID + rsync -hrvP --inplace "${SSH_USER}@${SSH_SERVER}:${SSH_SERVER_DOWNLOAD_PATH}/${torrent_name}" "${TORRENT_TMP_DOWNLOAD}/" & + ${RUNNING_RSYNCS[${torrent_hash}]}=$! + fi + done + fi + + # checkup on the running rsyncs + for torrent_hash in "${!RUNNING_RSYNCS[@]}"; do + pid=${RUNNING_RSYNCS[$torrent_hash]} + # check to see if the given PID is still running + if [[ `jobs | grep -c $pid` -eq 0 ]]; then + # get the return code of the PID + wait $pid + return=$? + if [[ $return ]]; then + echo "Successfully downloaded ${TORRENT_QUEUE[$torrent_hash]}" + torrent_name=`get_torrent_name "${torrent_hash}"` + if [[ $? ]]; then + final_location_dir="${TORRENT_DOWNLOAD}" + if [[ `dirname "${TORRENT_QUEUE[$torrent_hash]}"` != "${TORRENT_FILE_PATH}" ]]; then + final_location_dir="${final_location_dir}/$(basename `dirname "${TORRENT_FILE_PATH}"`)" + fi + + if [[ ! -d "${final_location_dir}" ]]; then + mkdir -p "${final_location_dir}" + fi + + mv "${TORRENT_TMP_DOWNLOAD}/${torrent_name}" "${final_location_dir}/" + unset TORRENT_QUEUE[$torrent_hash] + else + echo "Failed to get torrent name for ${TORRENT_QUEUE[$torrent_hash]}" + fi + else + echo "Failed to download ${TORRENT_QUEUE[$torrent_hash]} with rsync return code $return" + fi + + unset RUNNING_RSYNCS[$torrent_hash] + fi + done + + sleep 5s +done diff --git a/rtorrent.go b/rtorrent.go deleted file mode 100644 index 4ec34b0..0000000 --- a/rtorrent.go +++ /dev/null @@ -1,134 +0,0 @@ -// Communicates with rtorrent's XMLRPC interface, and can gather info regarding a .torrent file. -package main - -import ( - "bytes" - "crypto/sha1" - "encoding/hex" - "errors" - "io" - "io/ioutil" - "net/http" - "os" - "regexp" - "strings" - bencode "github.com/jackpal/bencode-go" -) - -// Torrent Keeps track of a torrent file's and rtorrent's XMLRPC information. -type Torrent struct { - path string - hash string - xmlUser string - xmlPass string - xmlAddress string -} - -// NewTorrent Create a new Torrent instance while computing its hash. -func NewTorrent(xmlUser string, xmlPass string, xmlAddress string, filePath string) (*Torrent, error) { - hash, err := getTorrentHash(filePath) - if err != nil { - return nil, err - } - - return &Torrent{filePath, hash, xmlUser, xmlPass, xmlAddress}, nil -} - -// Compute the torrent hash for a given torrent file path returning an all caps sha1 hash as a string. -func getTorrentHash(filePath string) (string, error) { - file, err := os.Open(filePath) - if err != nil { - return "", err - } - - defer file.Close() - - data, err := bencode.Decode(file) - if err != nil { - return "", err - } - - decoded, ok := data.(map[string]interface{}) - if !ok { - return "", errors.New("unable to convert data to map") - } - - var encoded bytes.Buffer - bencode.Marshal(&encoded, decoded["info"]) - - encodedString := encoded.String() - - hash := sha1.New() - io.WriteString(hash, encodedString) - - hashString := strings.ToUpper(hex.EncodeToString(hash.Sum(nil))) - - return hashString, nil -} - -// Send a command and its argument to the rtorrent XMLRPC and get the response. -func (t Torrent) xmlRPCSend (command string, arg string) (string, error) { - // This is hacky XML to send to the server - buf := []byte("\n" + - "\n" + - "" + command + "\n" + - "\n" + - "\n" + - "" + arg + "\n" + - "\n" + - "\n" + - "\n") - - buffer := bytes.NewBuffer(buf) - - request, err := http.NewRequest("POST", t.xmlAddress, buffer) - if err != nil { - return "", err - } - - // Set the basic HTTP auth if we have a user or password - if t.xmlUser != "" || t.xmlPass != "" { - request.SetBasicAuth(t.xmlUser, t.xmlPass) - } - - client := &http.Client{} - resp, err := client.Do(request) - if err != nil { - return "", err - } - - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return "", err - } - - re, err := regexp.Compile("<.*>(.*)") - if err != nil { - return "", err - } - - values := re.FindAllStringSubmatch(string(body), -1) - - if len(values) != 1 { - return "", nil - } - - return values[0][1], nil -} - -// GetTorrentName Get the torrent's name from rtorrent. -func (t Torrent) GetTorrentName() (string, error) { - return t.xmlRPCSend("d.get_name", t.hash) -} - -// GetTorrentComplete Get the completion status of the torrent from rtorrent. -func (t Torrent) GetTorrentComplete() (bool, error) { - complete, err := t.xmlRPCSend("d.get_complete", t.hash) - if err != nil { - return false, err - } - - return complete == "1", nil -} diff --git a/sync.go b/sync.go deleted file mode 100644 index 6be8208..0000000 --- a/sync.go +++ /dev/null @@ -1,554 +0,0 @@ -// Send and receive files via SFTP using multiple download streams concurrently (for downloads). -package main - -import ( - "bytes" - "encoding/binary" - "errors" - "github.com/pkg/sftp" - "golang.org/x/crypto/ssh" - "io" - "log" - "os" - "path/filepath" - "strconv" - "time" -) - -const ( - // NoProgress no progress for current file - NoProgress int64 = -1 -) - -// Sync Keeps track of the connection information. -type Sync struct { - sshClient *ssh.Client - sftpClients []*sftp.Client - sftpClientCount int -} - -// NewSync Create a new Sync object, connect to the SSH server, and create sftp clients -func NewSync(threads string, user string, pass string, server string, port string) (*Sync, error) { - // convert the threads input to an int - clientCount, err := strconv.Atoi(threads) - if err != nil { - return nil, err - } - - if clientCount < 1 { - return nil, errors.New("Must have a thread count >= 1") - } - - sshClient, err := newSSHClient(user, pass, server, port) - if err != nil { - return nil, err - } - - // initialize a total of client_count sftp clients - sftpClients := make([]*sftp.Client, clientCount) - for i := 0; i < clientCount; i++ { - sftpClient, err := sftp.NewClient(sshClient) - if err != nil { - return nil, err - } - - sftpClients[i] = sftpClient - } - - return &Sync{sshClient, sftpClients, clientCount}, nil -} - -// Close Closes all of the ssh and sftp connections to the SSH server. -func (s Sync) Close() error { - var returnError error - for i := 0; i < s.sftpClientCount; i++ { - err := s.sftpClients[i].Close() - if err != nil { - returnError = err - } - } - - err := s.sshClient.Close() - if err != nil { - return err - } - - return returnError -} - -// Create a new SSH client instance and confirm that we can make sessions -func newSSHClient(user string, pass string, server string, port string) (*ssh.Client, error) { - sshConfig := &ssh.ClientConfig{ - User: user, - Auth: []ssh.AuthMethod{ - ssh.Password(pass), - }, - } - - sshClient, err := ssh.Dial("tcp", server+":"+port, sshConfig) - - if err != nil { - return sshClient, err - } - - session, err := sshClient.NewSession() - if err != nil { - return sshClient, err - } - - defer session.Close() - - return sshClient, err -} - -// SendFiles Send a list of files in the format of {"source_path": "destination"} to the SSH server. This does not handle directories. -func (s Sync) SendFiles(files map[string]string) error { - return SendFiles(s.sftpClients[0], files) -} - -// SendFiles Send a list of files in the format of {"source_path": "destination"} to the SSH server. This does not handle directories. -func SendFiles(sftpClient *sftp.Client, files map[string]string) error { - for sourceFile, destinationFile := range files { - // 512KB buffer for reading/sending data - data := make([]byte, 524288) - - // Open file that we will be sending - sourceData, err := os.Open(sourceFile) - if err != nil { - log.Println("SendFiles: Failed to open source file " + sourceFile) - return err - } - - // Get the info of the file that we will be sending - sourceStat, err := sourceData.Stat() - if err != nil { - log.Println("SendFiles: Failed to stat source file " + sourceFile) - return err - } - - // Extract the size of the file that we will be sending - sourceSize := sourceStat.Size() - // Create the destination file for the source file we're sending - newFile, err := sftpClient.Create(destinationFile) - if err != nil { - log.Println("SendFiles: Failed to create destination file " + destinationFile) - return err - } - - // Track our position in reading/writing the file - var currentPosition int64 - for currentPosition < sourceSize { - // If the next iteration will be greater than the file size, reduce to the data size - if currentPosition+int64(len(data)) > sourceSize { - data = make([]byte, sourceSize-currentPosition) - } - - // Read data from the source file - read, err := sourceData.Read(data) - if err != nil { - // If it's the end of the file and we didn't read anything, break - if err == io.EOF { - if read == 0 { - break - } - } else { - return err - } - } - - // Write the data from the source file to the destination file - _, err = newFile.Write(data) - if err != nil { - return err - } - - // Update the current position in the file - currentPosition += int64(read) - } - - // close the source file - err = sourceData.Close() - if err != nil { - return err - } - - // close the destination file - err = newFile.Close() - if err != nil { - return err - } - } - - return nil -} - -// GetFile Get a file from the source_file path to be stored in destination_file path -func (s Sync) GetFile(sourceFile string, destinationFile string) error { - // Store channels for all the concurrent download parts - channels := make([]chan error, s.sftpClientCount) - - // Make channels for all the concurrent downloads - for i := 0; i < s.sftpClientCount; i++ { - channels[i] = make(chan error) - } - - // Start the concurrent downloads - for i := 0; i < s.sftpClientCount; i++ { - go GetFile(s.sftpClients[i], sourceFile, destinationFile, i+1, s.sftpClientCount, channels[i]) - } - - // Block until all downloads are completed or one errors - allDone := false - for !allDone { - allDone = true - for i, channel := range channels { - if channel == nil { - continue - } - - select { - case err := <-channel: - if err != nil { - return err - } - - channels[i] = nil - break - default: - // still running - if allDone { - allDone = false - } - break - } - } - - time.Sleep(time.Second) - } - - err := destroyProgress(destinationFile) - if err != nil { - return err - } - - return nil -} - -// GetFile Get a file from the source_file path to be stored in destination_file path. -// worker_number and work_total are not zero indexed, but 1 indexed -func GetFile(sftpClient *sftp.Client, sourceFile string, destinationFile string, workerNumber int, workerTotal int, com chan<- error) error { - // Open source_data for reading - sourceData, err := sftpClient.OpenFile(sourceFile, os.O_RDONLY) - if err != nil { - com <- err - return err - } - - // Get info for source_data - stat, err := sourceData.Stat() - if err != nil { - com <- err - return err - } - - // Extract the size of source_data - statSize := stat.Size() - - // Calculate which byte to start reading data from - start, err := getProgress(destinationFile, workerNumber) - if err != nil { - com <- err - return err - } - - if start == NoProgress { - if workerNumber == 1 { - start = 0 - } else { - start = (statSize * int64(workerNumber-1)) / int64(workerTotal) - } - } - - // Calculate which byte to stop reading data from - var stop int64 - if workerNumber == workerTotal { - stop = statSize - } else { - stop = (statSize * int64(workerNumber)) / int64(workerTotal) - } - - // Create the new file for writing - newFile, err := os.OpenFile(destinationFile, os.O_WRONLY|os.O_CREATE, 0777) - if err != nil { - com <- err - return err - } - - // Seek to the computed start point - offset, err := sourceData.Seek(start, 0) - if err != nil { - com <- err - return err - } - - // Seeking messed up real bad - if offset != start { - err = errors.New("Returned incorrect offset for source " + sourceFile) - com <- err - return err - } - - // Seek to the computed start point - offset, err = newFile.Seek(start, 0) - if err != nil { - com <- err - return err - } - - // Seeking messed up real bad - if offset != start { - err = errors.New("Return incorrect offset for destination " + destinationFile) - com <- err - return err - } - - // 512KB chunks - var dataSize int64 = 524288 - // Change the size if the chunk is larger than the file - chunkDifference := stop - start - if chunkDifference < dataSize { - dataSize = chunkDifference - } - - // Initialize the buffer for reading/writing - data := make([]byte, dataSize) - var currentSize int64 - for currentSize = start; currentSize < stop; currentSize += dataSize { - err = updateProgress(destinationFile, currentSize, workerNumber) - if err != nil { - com <- err - return err - } - - // Adjust the size of the buffer if the next iteration will be greater than what has yet to be read - if currentSize+dataSize > stop { - dataSize = stop - currentSize - data = make([]byte, dataSize) - } - - // Read the chunk - read, err := sourceData.Read(data) - if err != nil { - // Exit the loop if we're at the end of the file and no data was read - if err == io.EOF { - if read == 0 { - break - } - } else { - com <- err - return err - } - } - - // Write the chunk - _, err = newFile.Write(data) - if err != nil { - com <- err - return err - } - } - - err = updateProgress(destinationFile, currentSize, workerNumber) - if err != nil { - com <- err - return err - } - - // Close out the files - err = sourceData.Close() - if err != nil { - com <- err - return err - } - - err = newFile.Close() - if err != nil { - com <- err - return err - } - - com <- nil - return nil -} - -// GetPath Get a given directory or file defined by source_path and save it to destination_path -func (s Sync) GetPath(sourcePath string, destinationPath string) error { - // Get all the dirs and files underneath source_path - dirs, files, err := s.getChildren(sourcePath) - - // Remove the trailing slash if it exists - if sourcePath[len(sourcePath)-1] == '/' { - sourcePath = sourcePath[:len(sourcePath)-1] - } - - // Get the parent path of source_path - sourceBase := filepath.Dir(sourcePath) - sourceBaseLen := len(sourceBase) - - // Make all the directories in destination_path - for _, dir := range dirs { - dir = filepath.Join(destinationPath, filepath.FromSlash(dir[sourceBaseLen:])) - err = os.MkdirAll(dir, 0777) - if err != nil { - return err - } - } - - // Get all the files and place them in destination_path - for _, file := range files { - newFile := filepath.Join(destinationPath, filepath.FromSlash(file[sourceBaseLen:])) - err = s.GetFile(file, newFile) - if err != nil { - return err - } - } - - return nil -} - -// Get the directories and files underneath a given sftp root path -func (s Sync) getChildren(root string) ([]string, []string, error) { - // Used to walk through the path - walker := s.sftpClients[0].Walk(root) - - // Keep track of the directories - var dirs []string - // Keep track of the files - var files []string - - // Walk through the files and directories - for walker.Step() { - err := walker.Err() - if err != nil { - return nil, nil, err - } - - stat := walker.Stat() - if stat.IsDir() { - dirs = append(dirs, walker.Path()) - } else { - files = append(files, walker.Path()) - } - } - - err := walker.Err() - if err != nil { - return nil, nil, err - } - - return dirs, files, nil -} - -// Exists Determine if a directory, file, or link exists -func (s Sync) Exists(path string) (bool, error) { - _, err := s.sftpClients[0].Lstat(path) - - if err != nil { - if err.Error() == "sftp: \"No such file\" (SSH_FX_NO_SUCH_FILE)" { - return false, nil - } - - return false, err - } - - return true, nil -} - -func getProgress(filePath string, workerNumber int) (int64, error) { - file, err := os.Open(getProgressPath(filePath)) - if err != nil { - if os.IsNotExist(err) { - return NoProgress, nil - } - - return 0, err - } - - fileStat, err := file.Stat() - if err != nil { - return 0, err - } - - if fileStat.Size() == 0 { - return NoProgress, nil - } - - var progress int64 - progressSize := int64(binary.Size(progress)) - offset := progressSize * int64(workerNumber-1) - - realOffset, err := file.Seek(offset, os.SEEK_SET) - if err != nil { - return 0, err - } - - if realOffset != offset { - return 0, errors.New("getProgress: Tried to seek to " + string(offset) + " but got " + string(realOffset) + " instead") - } - - progressData := make([]byte, progressSize) - - read, err := file.Read(progressData) - if err != nil { - if err == io.EOF { - return NoProgress, nil - } - - return 0, err - } - - if int64(read) != progressSize { - return NoProgress, nil - } - - err = binary.Read(bytes.NewReader(progressData), binary.BigEndian, &progress) - if err != nil { - return 0, err - } - - return progress, nil -} - -func updateProgress(filePath string, written int64, workerNumber int) error { - file, err := os.OpenFile(getProgressPath(filePath), os.O_WRONLY|os.O_CREATE, 0777) - if err != nil { - return err - } - - writtenSize := int64(binary.Size(written)) - offset := writtenSize * int64(workerNumber-1) - - realOffset, err := file.Seek(offset, os.SEEK_SET) - if err != nil { - return err - } - - if realOffset != offset { - return errors.New("updateProgress: Tried to seek to " + string(offset) + " but got " + string(realOffset) + " instead") - } - - return binary.Write(file, binary.BigEndian, written) -} - -func destroyProgress(filePath string) error { - err := os.Remove(getProgressPath(filePath)) - if err != nil && !os.IsNotExist(err) { - return err - } - - return nil -} - -func getProgressPath(filePath string) string { - return filepath.Join(filepath.Dir(filePath), "."+filepath.Base(filePath)+".progress") -}