Rewrote hoarder in Bash.

This commit is contained in:
Tony Blyler 2015-02-06 22:20:06 -05:00
parent 758ce5cb8e
commit b1b0c7b4c3
7 changed files with 391 additions and 1059 deletions

.gitignore vendored
View file

@ -1,2 +1 @@

View file

@ -4,17 +4,15 @@ Uploads .torrent files from a local "blackhole" to a remote (SSH) rtorrent watch
1. `go get`
2. `go install`
* bash
* curl
* rsync
* scp
Make sure you make a copy of the conf file in the repo to suit your needs.
Edit the variables at the top of to your liking.
After installation, just run the hoarder executable with the --config flag to specify where the config file is.
Run with bash

View file

@ -1,31 +0,0 @@
// Configuration for Hoarder
// All fields and values are necessary
// Should Hoarder automatically restart when one of its processes errors? (default true if not set)
restart_on_error: "true"
// Username for XMLRPC
xml_user: "testuser"
// Password for XMLRPC
xml_pass: "supersecure"
// Address to XMLRPC
xml_address: "https://mysweetaddress:443/xmlrpc"
// Amount of download threads per file
threads: "4"
// Username for logging into the ssh server
ssh_user: "testsshuser"
// Password for the logging into the ssh server
ssh_pass: "bestpasswordever"
// Server address for the ssh server
ssh_server: "sshserveraddress"
// Port for the ssh server
ssh_port: "22"
// Location to temporarily download files
temp_download_dir: "/home/user/tmp_download/"
// Location to move downloaded files from temp_download_dir to
local_download_dir: "/home/user/Downloads/"
// Locaiton to watch for .torrent files
local_torrent_dir: "/home/user/torrent_files/"
// Remote location on the SSH server to download torrent data from
remote_download_dir: "/home/user/files/"
// Remote location on the SSH server to upload .torrent files to
remote_torrent_dir: "/home/user/watch/"

View file

@ -1,331 +0,0 @@
// Watch a torrent directory, poll rtorrent, and download completed torrents over SFTP.
package main
import (
// Load information from a given config file config_path
func loadConfig(configPath string) (map[string]string, error) {
file, err := os.Open(configPath)
if err != nil {
log.Println("Failed to open configuration file " + configPath)
return nil, err
data, err := ioutil.ReadAll(file)
if err != nil {
log.Println("Failed to read configuration file " + configPath)
return nil, err
config := make(map[string]string)
lines := strings.Split(string(data), "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
// Ignore comments
if len(line) <= 2 || line[:2] == "//" {
// Ignore malformed lines
sepPosition := strings.Index(line, ": \"")
if sepPosition == -1 {
config[line[:sepPosition]] = line[sepPosition+3 : len(line)-1]
return config, nil
// Checker routine to see if torrents are completed
func checker(config map[string]string, checkerChan <-chan map[string]string, com chan<- error) error {
for {
torrentInfo := <-checkerChan
log.Println("Started checking " + torrentInfo["torrent_path"])
torrent, err := NewTorrent(config["xml_user"], config["xml_pass"], config["xml_address"], torrentInfo["torrent_path"])
if err != nil {
if !os.IsNotExist(err) {
log.Println("Failed to initialize torrent for " + torrentInfo["torrent_path"] + ": " + err.Error())
syncer, err := NewSync(config["threads"], config["ssh_user"], config["ssh_pass"], config["ssh_server"], config["ssh_port"])
defer syncer.Close()
if err != nil {
log.Println("Failed to create a new sync: " + err.Error())
com <- err
return err
completed, err := torrent.GetTorrentComplete()
if err != nil {
log.Println("Failed to see if " + torrent.path + " is completed: " + err.Error())
com <- err
return err
name, err := torrent.GetTorrentName()
if err != nil {
com <- err
return err
if completed {
log.Println(name + " is completed, starting download now")
remoteDownloadPath := filepath.Join(config["remote_download_dir"], name)
exists, err := syncer.Exists(remoteDownloadPath)
if err != nil {
log.Println("Failed to see if " + remoteDownloadPath + " exists: " + err.Error())
com <- err
return err
// file/dir to downlaod does not exist!
if !exists {
err = errors.New(remoteDownloadPath + " does not exist on remote server")
com <- err
return err
completedDestination := filepath.Join(torrentInfo["local_download_dir"], name)
_, err = os.Stat(completedDestination)
if err == nil {
err = errors.New(completedDestination + " already exists, not downloading")
} else if !os.IsNotExist(err) {
log.Println("Failed to stat: " + completedDestination + ": " + err.Error())
com <- err
return err
err = syncer.GetPath(remoteDownloadPath, config["temp_download_dir"])
if err != nil {
log.Println("Failed to download " + remoteDownloadPath + ": " + err.Error())
com <- err
return err
log.Println("Successfully downloaded " + name)
tempDestination := filepath.Join(config["temp_download_dir"], name)
err = os.Rename(tempDestination, completedDestination)
if err != nil {
log.Println("Failed to move " + tempDestination + " to " + completedDestination + ": " + err.Error())
com <- err
return err
err = os.Remove(torrent.path)
if err != nil && !os.IsNotExist(err) {
log.Println("Failed to remove " + torrent.path + ": " + err.Error())
com <- err
return err
} else {
log.Println(name + " is not completed, waiting for it to finish")
com <- nil
return nil
// Scanner routine to see if there are new torrent_files
func scanner(config map[string]string, checkerChan chan<- map[string]string, com chan<- error) error {
watchDirs := map[string]string{config["local_torrent_dir"]: config["local_download_dir"]}
dirContents, err := ioutil.ReadDir(config["local_torrent_dir"])
if err != nil {
com <- err
return err
for _, file := range dirContents {
if file.IsDir() {
watchDirs[filepath.Join(config["local_torrent_dir"], file.Name())] = filepath.Join(config["local_download_dir"], file.Name())
uploaded := make(map[string]bool)
downloadingTorrentPath := ""
for {
for watchDir, downloadDir := range watchDirs {
torrentFiles, err := ioutil.ReadDir(watchDir)
if err != nil {
com <- err
return err
for _, torrentFile := range torrentFiles {
if torrentFile.IsDir() {
// skip because we don't do more than one level of watching
torrentPath := filepath.Join(watchDir, torrentFile.Name())
if !uploaded[torrentPath] {
syncer, err := NewSync("1", config["ssh_user"], config["ssh_pass"], config["ssh_server"], config["ssh_port"])
if err != nil {
log.Println("Failed to create a new sync: " + err.Error())
destinationTorrent := filepath.Join(config["remote_torrent_dir"], filepath.Base(torrentPath))
exists, err := syncer.Exists(destinationTorrent)
if err != nil {
log.Println("Failed to see if " + torrentPath + " already exists on the server: " + err.Error())
if exists {
uploaded[torrentPath] = true
} else {
err = syncer.SendFiles(map[string]string{torrentPath: destinationTorrent})
if err == nil {
log.Println("Successfully uploaded " + torrentPath + " to " + destinationTorrent)
uploaded[torrentPath] = true
} else {
log.Println("Failed to upload " + torrentPath + " to " + destinationTorrent + ": " + err.Error())
downloadInfo := map[string]string{
"torrent_path": torrentPath,
"local_download_dir": downloadDir,
// try to send the info to the checker goroutine (nonblocking)
select {
case checkerChan <- downloadInfo:
// don't keep track of completed downloads in the uploaded map
if downloadingTorrentPath != "" {
delete(uploaded, downloadingTorrentPath)
downloadingTorrentPath = torrentPath
time.Sleep(time.Second * 30)
com <- nil
return nil
func die(exitCode int) {
func main() {
sigint.ListenForSIGINT(func() {
var configPath string
flag.StringVar(&configPath, "config", "", "Location of the config file")
if configPath == "" {
log.Println("Missing argument for configuration file path")
log.Println("Reading configuration file")
config, err := loadConfig(configPath)
if err != nil {
log.Println("Successfully read configuration file")
checkerChan := make(chan map[string]string, 50)
if err != nil {
log.Println("Starting the scanner routine")
scannerCom := make(chan error)
go scanner(config, checkerChan, scannerCom)
log.Println("Starting the checker routine")
checkerCom := make(chan error)
go checker(config, checkerChan, checkerCom)
restartOnError := true
if config["restart_on_error"] != "" {
restartOnError = config["restart_on_error"] == "true"
for {
select {
case err := <-scannerCom:
if err != nil {
log.Println("Scanner failed: " + err.Error())
if restartOnError {
log.Println("Restarting scanner")
go scanner(config, checkerChan, scannerCom)
} else {
log.Println("Quiting due to scanner error")
case err := <-checkerCom:
if err != nil {
log.Println("Checker failed: " + err.Error())
if restartOnError {
log.Println("Restarting checker")
go checker(config, checkerChan, checkerCom)
} else {
log.Println("Quiting due to checker error")
time.Sleep(time.Second * 5)

385 Executable file
View file

@ -0,0 +1,385 @@
# Location where the .torrent files are stored locally
# Location to initially download torrent data to from the remote SSH server
# Location to move the completed torrent data to from TORRENT_TMP_DOWNLOAD
# Amount of rsync processes to have running at one time
# Location on the remote SSH server to copy the .torrent files to
# Location on the remote SSH server where the torrent data is stored
# Address of the remote SSH server where the torrents are downloaded
# The username to use to login to the SSH server
# The XMLRPC basic HTTP authentication username
# The XMLRPC basic HTTP authentication password
# The XMLRPC url
if ! which curl > /dev/null; then
echo 'curl must be installed'
exit 1
if ! which scp > /dev/null; then
echo 'scp must be installed'
exit 1
if ! which rsync > /dev/null; then
echo 'rsync must be installed'
exit 1
if ! which python > /dev/null; then
if ! which python2 > /dev/null; then
echo 'python must be install'
exit 1
# Hacky method to create the XML for an XMLRPC request to rtorrent
xml() {
local method=$1
local args=$2
echo "<?xml version='1.0'?>
# Returns the current entity and its content in an XML response
read_dom() {
local IFS=\>
# Sends an XMLRPC request to rtorrent via curl and returns its data
xml_curl() {
local method=$1
local args=$2
local xml_post=`xml "${method}" "${args}"`
local curl_command='curl -s'
if [[ "${XML_USER}" != '' ]]; then
local curl_command="${curl_command} --basic -u '${XML_USER}"
if [[ "${XML_USER}" != '' ]]; then
local curl_command="${curl_command}:${XML_PASS}"
local curl_command="${curl_command}'"
local curl_command="${curl_command} -d \"${xml_post}\" '${XML_URL}'"
local xml_response=$(eval "${curl_command}")
local curl_return=$?
echo "${xml_response}"
return $curl_return
# Gets .torrent's name from the remote rtorrent XMLRPC
get_torrent_name() {
local torrent_hash=$1
local xml_response=`xml_curl d.get_name "${torrent_hash}"`
local curl_return=$?
if [[ "${curl_return}" -ne 0 ]]; then
echo "Curl failed to get torrent name with error code ${curl_return}"
return $curl_return
local torrent_name=`echo "${xml_response}" | while read_dom; do
if [[ "${ENTITY}" = "name" ]] && [[ "${CONTENT}" = "faultCode" ]]; then
local error=true
if [[ ! "${error}" ]] && [[ "${ENTITY}" = "string" ]]; then
echo "${CONTENT}"
if [[ "${torrent_name}" = '' ]]; then
echo "${xml_response}"
return 1
echo "${torrent_name}"
return 0
# Get .torrent's completion status from the remote rtorrent
get_torrent_complete() {
local torrent_hash=$1
local xml_response=`xml_curl d.get_complete "${torrent_hash}"`
local curl_return=$?
if [[ "${curl_return}" -ne 0 ]]; then
echo "Curl failed to get torrent name with error code ${curl_return}"
return ${curl_return}
local torrent_completed=`echo "${xml_response}" | while read_dom; do
if [[ "${ENTITY}" = "name" ]] && [[ "${CONTENT}" = "faultCode" ]]; then
local error=true
if [[ ! "${error}" ]] && [[ "${ENTITY}" = "i8" ]]; then
echo "${CONTENT}"
if [[ "${torrent_completed}" = '' ]]; then
echo "${xml_response}"
return 1
echo "${torrent_completed}"
return 0
# Check if a .torrent is loaded on the remote rtorrent
get_torrent_added() {
local torrent_hash=$1
local xml_response=`xml_curl d.get_complete "${torrent_hash}"`
local curl_return=$?
if [[ "${curl_return}" -ne 0 ]]; then
echo "Curl failed to get torrent name with error code ${curl_return}"
return ${curl_return}
local torrent_added=`echo "${xml_response}" | while read_dom; do
if [[ "${CONTENT}" = 'Could not find info-hash.' ]]; then
echo "${CONTENT}"
if [[ "${torrent_added}" = '' ]]; then
echo 1
echo 0
# Get the info hash for a given .torrent file
get_torrent_hash() {
local torrent_file=$1
if [[ ! -f "${torrent_file}" ]]; then
return 1
local python_bin='python2'
if ! which "${python_bin}" 2>&1 > /dev/null; then
local python_bin='python'
if ! which "${python_bin}" 2>&1 > /dev/null; then
return 1
local torrent_hash=`"${python_bin}" - << END
import hashlib
def compute_hash(file_path):
data = open(file_path, 'rb').read()
return False
data_len = len(data)
start = data.find("infod")
if start == -1:
return False
start += 4
current = start + 1
dir_depth = 1
while current < data_len and dir_depth > 0:
if data[current] == 'e':
dir_depth -= 1
current += 1
elif data[current] == 'l' or data[current] == 'd':
dir_depth += 1
current += 1
elif data[current] == 'i':
current += 1
while data[current] != 'e':
current += 1
current += 1
elif data[current].isdigit():
num = data[current]
current += 1
while data[current] != ':':
num += data[current]
current += 1
current += 1 + int(num)
return False
return hashlib.sha1(data[start:current]).hexdigest().upper()
if [[ ! $? ]] || [[ "${torrent_hash}" = 'False' ]]; then
return 1
echo $torrent_hash
# keep track of the .torrent files to be downloaded
# keep track of the rsyncs to download torrent data
# run indefinitely
while true; do
# check to make sure the path of the local .torrent files exists
if [[ ! -d "${TORRENT_FILE_PATH}" ]]; then
echo "${TORRENT_FILE_PATH} Does not exist"
exit 1
# enumerate the .torrent file directory
for file in `ls "${TORRENT_FILE_PATH}"`; do
# store the file/directory's full path
# check if the path is a directory
if [[ -d "${file}" ]]; then
# enumerate the directory
for sub_file in `ls "${file}"`; do
# store the file/directory's full path
# this is the furthest we will descend
if [[ -f "${sub_file}" ]]; then
# get the torrent hash for the .torrent file
torrent_hash=`get_torrent_hash "${sub_file}"`
if [[ ! $? ]]; then
echo "Failed to get the torrent hash of ${sub_file}"
# add the torrent to the queue if it is not already in the queue
if [[ ! ${TORRENT_QUEUE[${torrent_hash}]+_} ]]; then
# check that the path is a file
elif [[ -f "${file}" ]]; then
# get the torrent hash for the .torrent file
torrent_hash=`get_torrent_hash "${file}"`
if [[ ! $? ]]; then
echo "Failed to get the torrent hash of ${file}"
# add the torrent to the queue if it is not already in the queue
if [[ ! ${TORRENT_QUEUE[${torrent_hash}]+_} ]]; then
# go through the torrent queue
for torrent_hash in "${!TORRENT_QUEUE[@]}"; do
# continue if the torrent is already being downloaded
if [[ ${RUNNING_RSYNCS[$torrent_hash]+_} ]]; then
# check to see if the torrent is on the rtorrent server
torrent_added=`get_torrent_added "${torrent_hash}"`
if [[ ! $? ]]; then
echo "Failed to see if ${TORRENT_QUEUE[$torrent_hash]} exists on the rtorrent server"
# if the torrent is not on the rtorrent server, upload it
if [[ $torrent_added -eq 0 ]]; then
if [[ ! $? ]]; then
echo "Failed to upload ${TORRENT_QUEUE[$torrent_hash]}"
# if the amount of running rsyncs is belwo the desire amount, run items from the queue
if [[ ${#RUNNING_RSYNCS[@]} -lt ${RSYNC_PROCESSES} ]]; then
for torrent_hash in "${!TORRENT_QUEUE[@]}"; do
# make sure this torrent is not already being downloaded
if [[ ${RUNNING_RSYNCS[${torrent_hash}]+_} ]]; then
# see if the torrent is finished downloading remotely
torrent_completed=`get_torrent_complete "${torrent_hash}"`
if [[ ! $? ]]; then
echo "Failed to check if ${TORRENT_QUEUE[$torrent_hash]} is completed"
# the torrent is finished downloading remotely
if [[ "${torrent_completed}" -eq 1 ]]; then
torrent_name=`get_torrent_name "${torrent_hash}"`
if [[ ! $? ]]; then
echo "Failed to get torrent name for ${TORRENT_QUEUE[$torrent_hash]}"
# start the download and record the PID
rsync -hrvP --inplace "${SSH_USER}@${SSH_SERVER}:${SSH_SERVER_DOWNLOAD_PATH}/${torrent_name}" "${TORRENT_TMP_DOWNLOAD}/" &
# checkup on the running rsyncs
for torrent_hash in "${!RUNNING_RSYNCS[@]}"; do
# check to see if the given PID is still running
if [[ `jobs | grep -c $pid` -eq 0 ]]; then
# get the return code of the PID
wait $pid
if [[ $return ]]; then
echo "Successfully downloaded ${TORRENT_QUEUE[$torrent_hash]}"
torrent_name=`get_torrent_name "${torrent_hash}"`
if [[ $? ]]; then
if [[ `dirname "${TORRENT_QUEUE[$torrent_hash]}"` != "${TORRENT_FILE_PATH}" ]]; then
final_location_dir="${final_location_dir}/$(basename `dirname "${TORRENT_FILE_PATH}"`)"
if [[ ! -d "${final_location_dir}" ]]; then
mkdir -p "${final_location_dir}"
mv "${TORRENT_TMP_DOWNLOAD}/${torrent_name}" "${final_location_dir}/"
unset TORRENT_QUEUE[$torrent_hash]
echo "Failed to get torrent name for ${TORRENT_QUEUE[$torrent_hash]}"
echo "Failed to download ${TORRENT_QUEUE[$torrent_hash]} with rsync return code $return"
unset RUNNING_RSYNCS[$torrent_hash]
sleep 5s

View file

@ -1,134 +0,0 @@
// Communicates with rtorrent's XMLRPC interface, and can gather info regarding a .torrent file.
package main
import (
bencode ""
// Torrent Keeps track of a torrent file's and rtorrent's XMLRPC information.
type Torrent struct {
path string
hash string
xmlUser string
xmlPass string
xmlAddress string
// NewTorrent Create a new Torrent instance while computing its hash.
func NewTorrent(xmlUser string, xmlPass string, xmlAddress string, filePath string) (*Torrent, error) {
hash, err := getTorrentHash(filePath)
if err != nil {
return nil, err
return &Torrent{filePath, hash, xmlUser, xmlPass, xmlAddress}, nil
// Compute the torrent hash for a given torrent file path returning an all caps sha1 hash as a string.
func getTorrentHash(filePath string) (string, error) {
file, err := os.Open(filePath)
if err != nil {
return "", err
defer file.Close()
data, err := bencode.Decode(file)
if err != nil {
return "", err
decoded, ok := data.(map[string]interface{})
if !ok {
return "", errors.New("unable to convert data to map")
var encoded bytes.Buffer
bencode.Marshal(&encoded, decoded["info"])
encodedString := encoded.String()
hash := sha1.New()
io.WriteString(hash, encodedString)
hashString := strings.ToUpper(hex.EncodeToString(hash.Sum(nil)))
return hashString, nil
// Send a command and its argument to the rtorrent XMLRPC and get the response.
func (t Torrent) xmlRPCSend (command string, arg string) (string, error) {
// This is hacky XML to send to the server
buf := []byte("<?xml version='1.0'?>\n" +
"<methodCall>\n" +
"<methodName>" + command + "</methodName>\n" +
"<params>\n" +
"<param>\n" +
"<value><string>" + arg + "</string></value>\n" +
"</param>\n" +
"</params>\n" +
buffer := bytes.NewBuffer(buf)
request, err := http.NewRequest("POST", t.xmlAddress, buffer)
if err != nil {
return "", err
// Set the basic HTTP auth if we have a user or password
if t.xmlUser != "" || t.xmlPass != "" {
request.SetBasicAuth(t.xmlUser, t.xmlPass)
client := &http.Client{}
resp, err := client.Do(request)
if err != nil {
return "", err
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
re, err := regexp.Compile("<value><.*>(.*)</.*></value>")
if err != nil {
return "", err
values := re.FindAllStringSubmatch(string(body), -1)
if len(values) != 1 {
return "", nil
return values[0][1], nil
// GetTorrentName Get the torrent's name from rtorrent.
func (t Torrent) GetTorrentName() (string, error) {
return t.xmlRPCSend("d.get_name", t.hash)
// GetTorrentComplete Get the completion status of the torrent from rtorrent.
func (t Torrent) GetTorrentComplete() (bool, error) {
complete, err := t.xmlRPCSend("d.get_complete", t.hash)
if err != nil {
return false, err
return complete == "1", nil

View file

@ -1,554 +0,0 @@
// Send and receive files via SFTP using multiple download streams concurrently (for downloads).
package main
import (
const (
// NoProgress no progress for current file
NoProgress int64 = -1
// Sync Keeps track of the connection information.
type Sync struct {
sshClient *ssh.Client
sftpClients []*sftp.Client
sftpClientCount int
// NewSync Create a new Sync object, connect to the SSH server, and create sftp clients
func NewSync(threads string, user string, pass string, server string, port string) (*Sync, error) {
// convert the threads input to an int
clientCount, err := strconv.Atoi(threads)
if err != nil {
return nil, err
if clientCount < 1 {
return nil, errors.New("Must have a thread count >= 1")
sshClient, err := newSSHClient(user, pass, server, port)
if err != nil {
return nil, err
// initialize a total of client_count sftp clients
sftpClients := make([]*sftp.Client, clientCount)
for i := 0; i < clientCount; i++ {
sftpClient, err := sftp.NewClient(sshClient)
if err != nil {
return nil, err
sftpClients[i] = sftpClient
return &Sync{sshClient, sftpClients, clientCount}, nil
// Close Closes all of the ssh and sftp connections to the SSH server.
func (s Sync) Close() error {
var returnError error
for i := 0; i < s.sftpClientCount; i++ {
err := s.sftpClients[i].Close()
if err != nil {
returnError = err
err := s.sshClient.Close()
if err != nil {
return err
return returnError
// Create a new SSH client instance and confirm that we can make sessions
func newSSHClient(user string, pass string, server string, port string) (*ssh.Client, error) {
sshConfig := &ssh.ClientConfig{
User: user,
Auth: []ssh.AuthMethod{
sshClient, err := ssh.Dial("tcp", server+":"+port, sshConfig)
if err != nil {
return sshClient, err
session, err := sshClient.NewSession()
if err != nil {
return sshClient, err
defer session.Close()
return sshClient, err
// SendFiles Send a list of files in the format of {"source_path": "destination"} to the SSH server. This does not handle directories.
func (s Sync) SendFiles(files map[string]string) error {
return SendFiles(s.sftpClients[0], files)
// SendFiles Send a list of files in the format of {"source_path": "destination"} to the SSH server. This does not handle directories.
func SendFiles(sftpClient *sftp.Client, files map[string]string) error {
for sourceFile, destinationFile := range files {
// 512KB buffer for reading/sending data
data := make([]byte, 524288)
// Open file that we will be sending
sourceData, err := os.Open(sourceFile)
if err != nil {
log.Println("SendFiles: Failed to open source file " + sourceFile)
return err
// Get the info of the file that we will be sending
sourceStat, err := sourceData.Stat()
if err != nil {
log.Println("SendFiles: Failed to stat source file " + sourceFile)
return err
// Extract the size of the file that we will be sending
sourceSize := sourceStat.Size()
// Create the destination file for the source file we're sending
newFile, err := sftpClient.Create(destinationFile)
if err != nil {
log.Println("SendFiles: Failed to create destination file " + destinationFile)
return err
// Track our position in reading/writing the file
var currentPosition int64
for currentPosition < sourceSize {
// If the next iteration will be greater than the file size, reduce to the data size
if currentPosition+int64(len(data)) > sourceSize {
data = make([]byte, sourceSize-currentPosition)
// Read data from the source file
read, err := sourceData.Read(data)
if err != nil {
// If it's the end of the file and we didn't read anything, break
if err == io.EOF {
if read == 0 {
} else {
return err
// Write the data from the source file to the destination file
_, err = newFile.Write(data)
if err != nil {
return err
// Update the current position in the file
currentPosition += int64(read)
// close the source file
err = sourceData.Close()
if err != nil {
return err
// close the destination file
err = newFile.Close()
if err != nil {
return err
return nil
// GetFile Get a file from the source_file path to be stored in destination_file path
func (s Sync) GetFile(sourceFile string, destinationFile string) error {
// Store channels for all the concurrent download parts
channels := make([]chan error, s.sftpClientCount)
// Make channels for all the concurrent downloads
for i := 0; i < s.sftpClientCount; i++ {
channels[i] = make(chan error)
// Start the concurrent downloads
for i := 0; i < s.sftpClientCount; i++ {
go GetFile(s.sftpClients[i], sourceFile, destinationFile, i+1, s.sftpClientCount, channels[i])
// Block until all downloads are completed or one errors
allDone := false
for !allDone {
allDone = true
for i, channel := range channels {
if channel == nil {
select {
case err := <-channel:
if err != nil {
return err
channels[i] = nil
// still running
if allDone {
allDone = false
err := destroyProgress(destinationFile)
if err != nil {
return err
return nil
// GetFile Get a file from the source_file path to be stored in destination_file path.
// worker_number and work_total are not zero indexed, but 1 indexed
func GetFile(sftpClient *sftp.Client, sourceFile string, destinationFile string, workerNumber int, workerTotal int, com chan<- error) error {
// Open source_data for reading
sourceData, err := sftpClient.OpenFile(sourceFile, os.O_RDONLY)
if err != nil {
com <- err
return err
// Get info for source_data
stat, err := sourceData.Stat()
if err != nil {
com <- err
return err
// Extract the size of source_data
statSize := stat.Size()
// Calculate which byte to start reading data from
start, err := getProgress(destinationFile, workerNumber)
if err != nil {
com <- err
return err
if start == NoProgress {
if workerNumber == 1 {
start = 0
} else {
start = (statSize * int64(workerNumber-1)) / int64(workerTotal)
// Calculate which byte to stop reading data from
var stop int64
if workerNumber == workerTotal {
stop = statSize
} else {
stop = (statSize * int64(workerNumber)) / int64(workerTotal)
// Create the new file for writing
newFile, err := os.OpenFile(destinationFile, os.O_WRONLY|os.O_CREATE, 0777)
if err != nil {
com <- err
return err
// Seek to the computed start point
offset, err := sourceData.Seek(start, 0)
if err != nil {
com <- err
return err
// Seeking messed up real bad
if offset != start {
err = errors.New("Returned incorrect offset for source " + sourceFile)
com <- err
return err
// Seek to the computed start point
offset, err = newFile.Seek(start, 0)
if err != nil {
com <- err
return err
// Seeking messed up real bad
if offset != start {
err = errors.New("Return incorrect offset for destination " + destinationFile)
com <- err
return err
// 512KB chunks
var dataSize int64 = 524288
// Change the size if the chunk is larger than the file
chunkDifference := stop - start
if chunkDifference < dataSize {
dataSize = chunkDifference
// Initialize the buffer for reading/writing
data := make([]byte, dataSize)
var currentSize int64
for currentSize = start; currentSize < stop; currentSize += dataSize {
err = updateProgress(destinationFile, currentSize, workerNumber)
if err != nil {
com <- err
return err
// Adjust the size of the buffer if the next iteration will be greater than what has yet to be read
if currentSize+dataSize > stop {
dataSize = stop - currentSize
data = make([]byte, dataSize)
// Read the chunk
read, err := sourceData.Read(data)
if err != nil {
// Exit the loop if we're at the end of the file and no data was read
if err == io.EOF {
if read == 0 {
} else {
com <- err
return err
// Write the chunk
_, err = newFile.Write(data)
if err != nil {
com <- err
return err
err = updateProgress(destinationFile, currentSize, workerNumber)
if err != nil {
com <- err
return err
// Close out the files
err = sourceData.Close()
if err != nil {
com <- err
return err
err = newFile.Close()
if err != nil {
com <- err
return err
com <- nil
return nil
// GetPath Get a given directory or file defined by source_path and save it to destination_path
func (s Sync) GetPath(sourcePath string, destinationPath string) error {
// Get all the dirs and files underneath source_path
dirs, files, err := s.getChildren(sourcePath)
// Remove the trailing slash if it exists
if sourcePath[len(sourcePath)-1] == '/' {
sourcePath = sourcePath[:len(sourcePath)-1]
// Get the parent path of source_path
sourceBase := filepath.Dir(sourcePath)
sourceBaseLen := len(sourceBase)
// Make all the directories in destination_path
for _, dir := range dirs {
dir = filepath.Join(destinationPath, filepath.FromSlash(dir[sourceBaseLen:]))
err = os.MkdirAll(dir, 0777)
if err != nil {
return err
// Get all the files and place them in destination_path
for _, file := range files {
newFile := filepath.Join(destinationPath, filepath.FromSlash(file[sourceBaseLen:]))
err = s.GetFile(file, newFile)
if err != nil {
return err
return nil
// Get the directories and files underneath a given sftp root path
func (s Sync) getChildren(root string) ([]string, []string, error) {
// Used to walk through the path
walker := s.sftpClients[0].Walk(root)
// Keep track of the directories
var dirs []string
// Keep track of the files
var files []string
// Walk through the files and directories
for walker.Step() {
err := walker.Err()
if err != nil {
return nil, nil, err
stat := walker.Stat()
if stat.IsDir() {
dirs = append(dirs, walker.Path())
} else {
files = append(files, walker.Path())
err := walker.Err()
if err != nil {
return nil, nil, err
return dirs, files, nil
// Exists Determine if a directory, file, or link exists
func (s Sync) Exists(path string) (bool, error) {
_, err := s.sftpClients[0].Lstat(path)
if err != nil {
if err.Error() == "sftp: \"No such file\" (SSH_FX_NO_SUCH_FILE)" {
return false, nil
return false, err
return true, nil
func getProgress(filePath string, workerNumber int) (int64, error) {
file, err := os.Open(getProgressPath(filePath))
if err != nil {
if os.IsNotExist(err) {
return NoProgress, nil
return 0, err
fileStat, err := file.Stat()
if err != nil {
return 0, err
if fileStat.Size() == 0 {
return NoProgress, nil
var progress int64
progressSize := int64(binary.Size(progress))
offset := progressSize * int64(workerNumber-1)
realOffset, err := file.Seek(offset, os.SEEK_SET)
if err != nil {
return 0, err
if realOffset != offset {
return 0, errors.New("getProgress: Tried to seek to " + string(offset) + " but got " + string(realOffset) + " instead")
progressData := make([]byte, progressSize)
read, err := file.Read(progressData)
if err != nil {
if err == io.EOF {
return NoProgress, nil
return 0, err
if int64(read) != progressSize {
return NoProgress, nil
err = binary.Read(bytes.NewReader(progressData), binary.BigEndian, &progress)
if err != nil {
return 0, err
return progress, nil
func updateProgress(filePath string, written int64, workerNumber int) error {
file, err := os.OpenFile(getProgressPath(filePath), os.O_WRONLY|os.O_CREATE, 0777)
if err != nil {
return err
writtenSize := int64(binary.Size(written))
offset := writtenSize * int64(workerNumber-1)
realOffset, err := file.Seek(offset, os.SEEK_SET)
if err != nil {
return err
if realOffset != offset {
return errors.New("updateProgress: Tried to seek to " + string(offset) + " but got " + string(realOffset) + " instead")
return binary.Write(file, binary.BigEndian, written)
func destroyProgress(filePath string) error {
err := os.Remove(getProgressPath(filePath))
if err != nil && !os.IsNotExist(err) {
return err
return nil
func getProgressPath(filePath string) string {
return filepath.Join(filepath.Dir(filePath), "."+filepath.Base(filePath)+".progress")