""" Test SFTP client """ import atexit import getpass import logging import os import random import string import sys import tempfile from mylib.scripts.helpers import add_sftp_opts, get_opts_parser, init_logging from mylib.sftp import SFTPClient log = logging.getLogger("mylib.scripts.sftp_test") def main(argv=None): # pylint: disable=too-many-locals,too-many-statements """Script main""" if argv is None: argv = sys.argv[1:] # Options parser parser = get_opts_parser(just_try=True) add_sftp_opts(parser) test_opts = parser.add_argument_group("Test SFTP options") test_opts.add_argument( "-p", "--remote-upload-path", action="store", type=str, dest="upload_path", help="Remote upload path (default: on remote initial connection directory)", ) options = parser.parse_args() # Initialize logs init_logging(options, "Test SFTP client") if options.sftp_user and not options.sftp_password: options.sftp_password = getpass.getpass("Please enter SFTP password: ") log.info("Initialize Email client") sftp = SFTPClient(options=options) sftp.connect() atexit.register(sftp.close) log.debug("Create temporary file") test_content = b"Juste un test." tmp_dir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with tmp_file = os.path.join( tmp_dir.name, f'tmp{"".join(random.choice(string.ascii_lowercase) for i in range(8))}', # nosec ) log.debug('Temporary file path: "%s"', tmp_file) with open(tmp_file, "wb") as file_desc: file_desc.write(test_content) log.debug( "Upload file %s to SFTP server (in %s)", tmp_file, options.upload_path if options.upload_path else "remote initial connection directory", ) if not sftp.upload_file(tmp_file, options.upload_path): log.error("Fail to upload test file on SFTP server") sys.exit(1) log.info("Test file uploaded on SFTP server") remote_filepath = ( os.path.join(options.upload_path, os.path.basename(tmp_file)) if options.upload_path else os.path.basename(tmp_file) ) if not sftp._just_try: # pylint: disable=protected-access with tempfile.NamedTemporaryFile() as tmp_file2: log.info("Retrieve test file to %s", tmp_file2.name) if not sftp.get_file(remote_filepath, tmp_file2.name): log.error("Fail to retrieve test file") else: with open(tmp_file2.name, "rb") as file_desc: content = file_desc.read() log.debug("Read content: %s", content) if test_content == content: log.info("Content file retrieved match with uploaded one") else: log.error("Content file retrieved doest not match with uploaded one") try: log.info("Remotly open test file %s", remote_filepath) file_desc = sftp.open_file(remote_filepath) content = file_desc.read() log.debug("Read content: %s", content) if test_content == content: log.info("Content of remote file match with uploaded one") else: log.error("Content of remote file doest not match with uploaded one") except Exception: # pylint: disable=broad-except log.exception("An exception occurred remotly opening test file %s", remote_filepath) if sftp.remove_file(remote_filepath): log.info("Test file removed on SFTP server") else: log.error("Fail to remove test file on SFTP server")