# -*- coding: utf-8 -*- """ Test SFTP client """ import atexit import tempfile import logging import sys import os import random import string import getpass from mylib.sftp import SFTPClient from mylib.scripts.helpers import get_opts_parser, add_sftp_opts from mylib.scripts.helpers import init_logging log = logging.getLogger('mylib.scripts.sftp_test') def main(argv=None): # pylint: disable=too-many-locals,too-many-statements """ Script main """ if argv is None: argv = sys.argv[1:] # Options parser parser = get_opts_parser(just_try=True) add_sftp_opts(parser) test_opts = parser.add_argument_group('Test SFTP options') test_opts.add_argument( '-p', '--remote-upload-path', action="store", type=str, dest="upload_path", help="Remote upload path (default: on remote initial connection directory)", ) options = parser.parse_args() # Initialize logs init_logging(options, 'Test SFTP client') if options.sftp_user and not options.sftp_password: options.sftp_password = getpass.getpass('Please enter SFTP password: ') log.info('Initialize Email client') sftp = SFTPClient(options=options, just_try=options.just_try) sftp.connect() atexit.register(sftp.close) log.debug('Create tempory file') test_content = b'Juste un test.' tmp_dir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with tmp_file = os.path.join( tmp_dir.name, f'tmp{"".join(random.choice(string.ascii_lowercase) for i in range(8))}' ) log.debug('Temporary file path: "%s"', tmp_file) with open(tmp_file, 'wb') as file_desc: file_desc.write(test_content) log.debug( 'Upload file %s to SFTP server (in %s)', tmp_file, options.upload_path if options.upload_path else "remote initial connection directory") if not sftp.upload_file(tmp_file, options.upload_path): log.error('Fail to upload test file on SFTP server') sys.exit(1) log.info('Test file uploaded on SFTP server') remote_filepath = ( os.path.join(options.upload_path, os.path.basename(tmp_file)) if options.upload_path else os.path.basename(tmp_file) ) with tempfile.NamedTemporaryFile() as tmp_file2: log.info('Retrieve test file to %s', tmp_file2.name) if not sftp.get_file(remote_filepath, tmp_file2.name): log.error('Fail to retrieve test file') else: with open(tmp_file2.name, 'rb') as file_desc: content = file_desc.read() log.debug('Read content: %s', content) if test_content == content: log.info('Content file retrieved match with uploaded one') else: log.error('Content file retrieved doest not match with uploaded one') try: log.info('Remotly open test file %s', remote_filepath) file_desc = sftp.open_file(remote_filepath) content = file_desc.read() log.debug('Read content: %s', content) if test_content == content: log.info('Content of remote file match with uploaded one') else: log.error('Content of remote file doest not match with uploaded one') except Exception: # pylint: disable=broad-except log.exception('An exception occurred remotly opening test file %s', remote_filepath) if sftp.remove_file(remote_filepath): log.info('Test file removed on SFTP server') else: log.error('Fail to remove test file on SFTP server')