python-mylib/mylib/scripts/sftp_test.py

106 lines
3.6 KiB
Python

""" Test SFTP client """
import atexit
import getpass
import logging
import os
import random
import string
import sys
import tempfile
from mylib.scripts.helpers import add_sftp_opts, get_opts_parser, init_logging
from mylib.sftp import SFTPClient
log = logging.getLogger("mylib.scripts.sftp_test")
def main(argv=None): # pylint: disable=too-many-locals,too-many-statements
"""Script main"""
if argv is None:
argv = sys.argv[1:]
# Options parser
parser = get_opts_parser(just_try=True)
add_sftp_opts(parser)
test_opts = parser.add_argument_group("Test SFTP options")
test_opts.add_argument(
"-p",
"--remote-upload-path",
action="store",
type=str,
dest="upload_path",
help="Remote upload path (default: on remote initial connection directory)",
)
options = parser.parse_args()
# Initialize logs
init_logging(options, "Test SFTP client")
if options.sftp_user and not options.sftp_password:
options.sftp_password = getpass.getpass("Please enter SFTP password: ")
log.info("Initialize Email client")
sftp = SFTPClient(options=options)
sftp.connect()
atexit.register(sftp.close)
log.debug("Create tempory file")
test_content = b"Juste un test."
tmp_dir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with
tmp_file = os.path.join(
tmp_dir.name,
f'tmp{"".join(random.choice(string.ascii_lowercase) for i in range(8))}', # nosec
)
log.debug('Temporary file path: "%s"', tmp_file)
with open(tmp_file, "wb") as file_desc:
file_desc.write(test_content)
log.debug(
"Upload file %s to SFTP server (in %s)",
tmp_file,
options.upload_path if options.upload_path else "remote initial connection directory",
)
if not sftp.upload_file(tmp_file, options.upload_path):
log.error("Fail to upload test file on SFTP server")
sys.exit(1)
log.info("Test file uploaded on SFTP server")
remote_filepath = (
os.path.join(options.upload_path, os.path.basename(tmp_file))
if options.upload_path
else os.path.basename(tmp_file)
)
if not sftp._just_try: # pylint: disable=protected-access
with tempfile.NamedTemporaryFile() as tmp_file2:
log.info("Retrieve test file to %s", tmp_file2.name)
if not sftp.get_file(remote_filepath, tmp_file2.name):
log.error("Fail to retrieve test file")
else:
with open(tmp_file2.name, "rb") as file_desc:
content = file_desc.read()
log.debug("Read content: %s", content)
if test_content == content:
log.info("Content file retrieved match with uploaded one")
else:
log.error("Content file retrieved doest not match with uploaded one")
try:
log.info("Remotly open test file %s", remote_filepath)
file_desc = sftp.open_file(remote_filepath)
content = file_desc.read()
log.debug("Read content: %s", content)
if test_content == content:
log.info("Content of remote file match with uploaded one")
else:
log.error("Content of remote file doest not match with uploaded one")
except Exception: # pylint: disable=broad-except
log.exception("An exception occurred remotly opening test file %s", remote_filepath)
if sftp.remove_file(remote_filepath):
log.info("Test file removed on SFTP server")
else:
log.error("Fail to remove test file on SFTP server")