ParallelSSH/ssh2-python

`SocketRecvError` on resuming upload of partially uploaded files

NamamiShanker opened this issue · 2 comments

Bug reports

I am writing a program that will upload files to a CrushFTP server through SFTP and retry on connection loss. Below is my Connection class for wrapping ssh2-python library, and my retry logic to resume upload on connection loss.

Steps to reproduce:

  1. Example code that produces error.
    The Connection class that wraps ssh2-python
class Connection:

	def __init__(self, host: str, username: str = None, password: str = None, port: int = 22, retries: int = 5):
		self.host = host
		self.username = username
		self.password = password
		self.port = port
		logger.info(f"Attempting to connect to {host}")
		self.connected = False
		while not self.connected and retries:
			try:
				self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
				self.sock.connect((host, port))
				self.connected = True
			except Exception as e:
				if retries:
					logger.error(f"SFTP Conncetion failed : Retrying : Retries left {retries}")
					time.sleep(5)
					retries -= 1
				else:
					logger.exception("Unable to establish connection to the host")
					raise e
		logger.info(f"Connected to {host}")
		self.session = Session()
		self.session.set_timeout(5000)
		self.session.handshake(self.sock)
		self.session.userauth_password(username, password)
		self.sftp = self.session.sftp_init()
		logger.info("Opened sftp connection")
		self.mode = LIBSSH2_SFTP_S_IRUSR | LIBSSH2_SFTP_S_IWUSR | LIBSSH2_SFTP_S_IRGRP | LIBSSH2_SFTP_S_IROTH

		self.f_flags_append = LIBSSH2_FXF_APPEND
		self.f_flags_write = LIBSSH2_FXF_WRITE
		self.f_flags_read = LIBSSH2_FXF_READ
	
	def reconnect(self):
		self.close()
		self.__init__(self.host, self.username, self.password, self.port)
		return self

	def put(self, localpath: Path, remotepath: str, update_callback=None, set_callback=None):
		if not localpath.is_file():
			logger.exception(f"{localpath} is not a file")
			return
		logger.info(f"Uploading {localpath} to {remotepath}")
		
		try:
			with self.sftp.open(remotepath, self.f_flags_read, self.mode) as remote_fh:
				if localpath.stat().st_size == remote_fh.fstat().filesize:
					logger.info(f"{localpath} already exists on {remotepath}")
					return remote_fh.fstat()
		except ssh2.exceptions.SFTPProtocolError as e:
			with self.sftp.open(remotepath, self.f_flags_write, self.mode) as remote_fh:
				pass
			logger.info(f"Creating remote file {remotepath}")
		with open(localpath, 'rb', common.BUFSIZE) as local_fh, self.sftp.open(remotepath, self.f_flags_append, self.mode) as remote_fh:
			remote_size = remote_fh.fstat().filesize
			if(remote_size > 0):
				logger.info(f"File {localpath} partially exists at {remotepath}: Existing file size {remote_size} bytes : Remaining file size {localpath.stat().st_size - remote_size} bytes")
				if(set_callback):
					set_callback(remote_size, localpath.stat().st_size)
			logger.info(f"Seek pointer: {remote_fh.tell64()}")
			remote_fh.seek64(remote_size)
			local_fh.seek(remote_size)
			while data := local_fh.read(common.BUFSIZE):
				remote_fh.write(data)
				if update_callback:
					update_callback(data.__sizeof__(), localpath.stat().st_size)
		return remote_fh.fstat()
	
	def close(self):
		logger.info("Closing connection")
		self.session.disconnect()
		self.sock.close()
		logger.info("Connection closed")

The retry logic, where functions _show_updates and _set_update are just tqdm updation callbacks:

def _transfer_file(self, file: Path, update_callback=_show_updates, set_callback=_set_update) -> int:
	"""Transfer a single file and return the number of bytes transferred"""
	self.current_file = file
	now = datetime.now()
	while True:
		try:
			transferred_file = self.client.put(file, self.remote_folder_path + file.name, update_callback=update_callback, set_callback=set_callback)

		except exceptions.Timeout as e:
			logger.error("Connection lost : Retrying")
			self.client = self.client.reconnect()
		except exceptions.SocketSendError as e:
			logger.error(f"Connection lost : Retrying in {self.retry_time_secs} seconds")
			time.sleep(self.retry_time_secs)
			self.client = self.client.reconnect()
		except exceptions.SocketRecvError as e:
			logger.error(f"SocketRecvError: Connection lost : Retrying in {self.retry_time_secs} seconds")
			time.sleep(self.retry_time_secs)
			self.client = self.client.reconnect()
		except Exception as e:
			logger.error(f"Error transferring file {file.name}")
			raise e
		else:
			logging.info(f"File {file.resolve()} : Size {transferred_file.filesize} bytes : Started {now} : Finished {datetime.now()}")

			return transferred_file.filesize
  1. Stack trace or error messages.
INFO 2022-09-05 16:11:33,009 - Attempting to connect to 192.168.1.79
INFO 2022-09-05 16:11:33,009 - Attempting to connect to 192.168.1.79
INFO 2022-09-05 16:11:33,010 - Connected to 192.168.1.79
INFO 2022-09-05 16:11:33,099 - Opened sftp connection
INFO 2022-09-05 16:11:33,099 - Connected to 192.168.1.79
INFO 2022-09-05 16:11:33,112 - Uploading testfiles/archive.part01.rar to /archive.part01.rar                                                                                                                                                                                                                       
INFO 2022-09-05 16:11:33,122 - File testfiles/archive.part01.rar partially exists at /archive.part01.rar: Existing file size 9175040 bytes : Remaining file size 95682560 bytes                                                                                                                                    
INFO 2022-09-05 16:11:33,123 - Seek pointer: 0                                                                                                                                                                                                                                                                     
 24%|██████████████████████████████████████████████████████████████▏                                                                                                                                                                                                       | 24.9M/105M [00:09<00:49, 1.63Mbytes/s]
 24%|██████████████████████████████████████████████████████████████▏                                                                                                                                                                                                       | 24.9M/105M [00:09<00:31, 2.56Mbytes/s]
Traceback (most recent call last):
  File "/home/namami/dev/ftp/ft_manager/sftp_manager/ssh2_sftp.py", line 81, in put
    remote_fh.write(data)
  File "ssh2/sftp_handle.pyx", line 292, in ssh2.sftp_handle.SFTPHandle.write
  File "ssh2/utils.pyx", line 148, in ssh2.utils.handle_error_codes
ssh2.exceptions.Timeout

Expected behaviour: The script should have resumed upload of the partially uploaded file.

Actual behaviour: I get a SocketRecvError.

Additional info: libssh2 v1.0.0:

When I run the script again, the uploading resumes without any errors.
How should I handle the different errors so that the uploading of files can be resumed.

Hi there,

Thanks for the interest.

Not seeing an issue with library here. Timeout is set at self.session.set_timeout(5000). This causes file transfers to timeout after 5sec. The library correctly raises ssh2.exceptions.Timeout after 5sec of transferring.

If timeouts are not wanted, set_timeout should not be called.

To be closed unless an issue with library is shown.

Not seeing an issue. Timeout raised correctly after set_timeout is used.