2022-06-28 11:05:43 +02:00
|
|
|
""" Test SFTP client """
|
2022-06-30 14:34:25 +02:00
|
|
|
import atexit
|
2023-01-16 12:56:12 +01:00
|
|
|
import getpass
|
2022-06-28 11:05:43 +02:00
|
|
|
import logging
|
|
|
|
import os
|
2022-06-30 14:34:25 +02:00
|
|
|
import random
|
|
|
|
import string
|
2023-01-16 12:56:12 +01:00
|
|
|
import sys
|
|
|
|
import tempfile
|
2022-06-28 11:05:43 +02:00
|
|
|
|
2023-01-16 12:56:12 +01:00
|
|
|
from mylib.scripts.helpers import add_sftp_opts, get_opts_parser, init_logging
|
2022-06-28 11:05:43 +02:00
|
|
|
from mylib.sftp import SFTPClient
|
|
|
|
|
2023-01-16 12:56:12 +01:00
|
|
|
log = logging.getLogger("mylib.scripts.sftp_test")
|
2022-06-28 11:05:43 +02:00
|
|
|
|
|
|
|
|
|
|
|
def main(argv=None): # pylint: disable=too-many-locals,too-many-statements
|
2023-01-16 12:56:12 +01:00
|
|
|
"""Script main"""
|
2022-06-28 11:05:43 +02:00
|
|
|
if argv is None:
|
|
|
|
argv = sys.argv[1:]
|
|
|
|
|
|
|
|
# Options parser
|
|
|
|
parser = get_opts_parser(just_try=True)
|
|
|
|
add_sftp_opts(parser)
|
|
|
|
|
2023-01-16 12:56:12 +01:00
|
|
|
test_opts = parser.add_argument_group("Test SFTP options")
|
2022-06-28 11:05:43 +02:00
|
|
|
|
|
|
|
test_opts.add_argument(
|
2023-01-16 12:56:12 +01:00
|
|
|
"-p",
|
|
|
|
"--remote-upload-path",
|
2022-06-28 11:05:43 +02:00
|
|
|
action="store",
|
|
|
|
type=str,
|
|
|
|
dest="upload_path",
|
|
|
|
help="Remote upload path (default: on remote initial connection directory)",
|
|
|
|
)
|
|
|
|
|
|
|
|
options = parser.parse_args()
|
|
|
|
|
|
|
|
# Initialize logs
|
2023-01-16 12:56:12 +01:00
|
|
|
init_logging(options, "Test SFTP client")
|
2022-06-28 11:05:43 +02:00
|
|
|
|
|
|
|
if options.sftp_user and not options.sftp_password:
|
2023-01-16 12:56:12 +01:00
|
|
|
options.sftp_password = getpass.getpass("Please enter SFTP password: ")
|
2022-06-28 11:05:43 +02:00
|
|
|
|
2023-01-16 12:56:12 +01:00
|
|
|
log.info("Initialize Email client")
|
2022-06-28 11:05:43 +02:00
|
|
|
sftp = SFTPClient(options=options, just_try=options.just_try)
|
|
|
|
sftp.connect()
|
2022-06-30 14:34:25 +02:00
|
|
|
atexit.register(sftp.close)
|
2022-06-28 11:05:43 +02:00
|
|
|
|
2023-01-16 12:56:12 +01:00
|
|
|
log.debug("Create tempory file")
|
|
|
|
test_content = b"Juste un test."
|
2022-06-30 14:34:25 +02:00
|
|
|
tmp_dir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with
|
|
|
|
tmp_file = os.path.join(
|
2023-01-16 12:56:12 +01:00
|
|
|
tmp_dir.name, f'tmp{"".join(random.choice(string.ascii_lowercase) for i in range(8))}'
|
2022-06-30 14:34:25 +02:00
|
|
|
)
|
|
|
|
log.debug('Temporary file path: "%s"', tmp_file)
|
2023-01-16 12:56:12 +01:00
|
|
|
with open(tmp_file, "wb") as file_desc:
|
2022-06-30 14:34:25 +02:00
|
|
|
file_desc.write(test_content)
|
|
|
|
|
2022-06-28 11:05:43 +02:00
|
|
|
log.debug(
|
2023-01-16 12:56:12 +01:00
|
|
|
"Upload file %s to SFTP server (in %s)",
|
|
|
|
tmp_file,
|
|
|
|
options.upload_path if options.upload_path else "remote initial connection directory",
|
|
|
|
)
|
2022-06-30 14:34:25 +02:00
|
|
|
if not sftp.upload_file(tmp_file, options.upload_path):
|
2023-01-16 12:56:12 +01:00
|
|
|
log.error("Fail to upload test file on SFTP server")
|
2022-06-30 14:34:25 +02:00
|
|
|
sys.exit(1)
|
|
|
|
|
2023-01-16 12:56:12 +01:00
|
|
|
log.info("Test file uploaded on SFTP server")
|
2022-06-30 14:34:25 +02:00
|
|
|
remote_filepath = (
|
|
|
|
os.path.join(options.upload_path, os.path.basename(tmp_file))
|
2023-01-16 12:56:12 +01:00
|
|
|
if options.upload_path
|
|
|
|
else os.path.basename(tmp_file)
|
2022-06-30 14:34:25 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile() as tmp_file2:
|
2023-01-16 12:56:12 +01:00
|
|
|
log.info("Retrieve test file to %s", tmp_file2.name)
|
2022-06-30 14:34:25 +02:00
|
|
|
if not sftp.get_file(remote_filepath, tmp_file2.name):
|
2023-01-16 12:56:12 +01:00
|
|
|
log.error("Fail to retrieve test file")
|
2022-06-30 14:34:25 +02:00
|
|
|
else:
|
2023-01-16 12:56:12 +01:00
|
|
|
with open(tmp_file2.name, "rb") as file_desc:
|
2022-06-30 14:34:25 +02:00
|
|
|
content = file_desc.read()
|
2023-01-16 12:56:12 +01:00
|
|
|
log.debug("Read content: %s", content)
|
2022-06-30 14:34:25 +02:00
|
|
|
if test_content == content:
|
2023-01-16 12:56:12 +01:00
|
|
|
log.info("Content file retrieved match with uploaded one")
|
2022-06-30 14:34:25 +02:00
|
|
|
else:
|
2023-01-16 12:56:12 +01:00
|
|
|
log.error("Content file retrieved doest not match with uploaded one")
|
2022-06-30 14:34:25 +02:00
|
|
|
|
|
|
|
try:
|
2023-01-16 12:56:12 +01:00
|
|
|
log.info("Remotly open test file %s", remote_filepath)
|
2022-06-30 14:34:25 +02:00
|
|
|
file_desc = sftp.open_file(remote_filepath)
|
|
|
|
content = file_desc.read()
|
2023-01-16 12:56:12 +01:00
|
|
|
log.debug("Read content: %s", content)
|
2022-06-30 14:34:25 +02:00
|
|
|
if test_content == content:
|
2023-01-16 12:56:12 +01:00
|
|
|
log.info("Content of remote file match with uploaded one")
|
2022-06-28 11:05:43 +02:00
|
|
|
else:
|
2023-01-16 12:56:12 +01:00
|
|
|
log.error("Content of remote file doest not match with uploaded one")
|
2022-06-30 14:34:25 +02:00
|
|
|
except Exception: # pylint: disable=broad-except
|
2023-01-16 12:56:12 +01:00
|
|
|
log.exception("An exception occurred remotly opening test file %s", remote_filepath)
|
2022-06-30 14:34:25 +02:00
|
|
|
|
|
|
|
if sftp.remove_file(remote_filepath):
|
2023-01-16 12:56:12 +01:00
|
|
|
log.info("Test file removed on SFTP server")
|
2022-06-30 14:34:25 +02:00
|
|
|
else:
|
2023-01-16 12:56:12 +01:00
|
|
|
log.error("Fail to remove test file on SFTP server")
|