#!/usr/bin/python3 import datetime import os import re import sys from requests import Session from requests.adapters import HTTPAdapter from urllib3.util import Retry from debian_parser import PackagesParser # Handle parameters from environment API_URL = os.environ.get('PLUGIN_API_URL', None) if not API_URL: print('API URL not provided') sys.exit(1) API_USERNAME = os.environ.get('PLUGIN_API_USERNAME', None) if not API_USERNAME: print('API username not provided') sys.exit(1) API_PASSWORD = os.environ.get('PLUGIN_API_PASSWORD', None) if not API_PASSWORD: print('API password not provided') sys.exit(1) MAX_RETRY = os.environ.get('PLUGIN_MAX_RETRIES', None) REPO_NAME = os.environ.get('PLUGIN_REPO_NAME', 'stable') REPO_COMPONENT = os.environ.get('PLUGIN_REPO_COMPONENT', 'main') DIST = os.environ.get('PLUGIN_PATH', 'dist') SOURCE_NAME = os.environ.get('PLUGIN_SOURCE_PACKAGE_NAME', None) # List changes files changes_files_regex = ( # pylint: disable=consider-using-f-string re.compile(r'^%s_.*\.changes$' % SOURCE_NAME) if SOURCE_NAME else re.compile(r'^.*\.changes$') ) changes_files = [] try: for filename in os.listdir(DIST): filepath = os.path.join(DIST, filename) if not os.path.isfile(filepath): continue if changes_files_regex.match(filename): changes_files.append(filepath) except FileNotFoundError: print(f'Specified directory path "{DIST}" not found') sys.exit(1) except NotADirectoryError: print(f'Specified path "{DIST}" is not a directory') sys.exit(1) if not changes_files: print(f'No changes file found in {DIST}') sys.exit(1) # Initialize Aptly API client session = Session() session.auth = (API_USERNAME, API_PASSWORD) if MAX_RETRY: retries = Retry( total=int(MAX_RETRY), status_forcelist=list(range(500, 600)) ) session.mount(API_URL, HTTPAdapter(max_retries=retries)) # List and upload files from changes files def list_files_in_changes_file(filepath): """ List files included by a changes file """ dirpath = os.path.dirname(filepath) with open(filepath, "r", encoding="utf-8") as fd: changes_file = fd.read() parser = PackagesParser(changes_file) files = [] for infos in parser.parse(): for info in infos: if info['tag'].lower() != 'files': continue for line in info['value'].split(' '): if not line: continue files.append(os.path.join(dirpath, line.split()[-1])) return files def changes_file2package_name(changes_file): """ Retrieve package name from changes file name """ return os.path.basename(changes_file).split('_')[0] def upload_file(package_name, filepath): """ Upload a file using Aptly API """ url = f'{API_URL}/files/{package_name}' with open(filepath, 'rb') as fd: result = session.post(url, files={'file': fd}) return ( result.status_code == 200 and f'{package_name}/{os.path.basename(filepath)}' in result.json() ) def include_file(package_name, changes_file): """ Include a changes file using Aptly API """ url = ( f'{API_URL}/repos/{REPO_NAME}/include/{package_name}/' f'{os.path.basename(changes_file)}' ) result = session.post(url) data = result.json() if data.get('FailedFiles'): print() print(f'Some error occurred including {changes_file}:') print('Failed files:') for failed_file in data['FailedFiles']: print(f' - {os.path.basename(failed_file)}') if data.get('Report', {}).get('Warnings'): print('Warnings:') print(' - %s' % '\n - '.join(data['Report']['Warnings'])) print() return False if not ( result.status_code == 200 and data.get('Report', {}).get('Added') ): print(f'Unknown error occurred including {changes_file}') return False return True for changes_file in changes_files: package_name = changes_file2package_name(changes_file) print(f'Handle changes file {changes_file}:') filepaths = [changes_file] + list_files_in_changes_file(changes_file) print(' - Upload files:') for filepath in filepaths: if not upload_file(package_name, filepath): print( f' - {filepath}: fail to upload file, pass this changes ' 'file' ) sys.exit(1) else: print(f' - {filepath}') print(f' - Include changes file {changes_file}:') if include_file(package_name, changes_file): print(' - Changes file included') else: sys.exit(1) # Create a snapshot of the repository snap_name = datetime.datetime.now().strftime('%Y%m%d-%H%M%S') + f'{REPO_NAME}' print(f'Create new snapshot "{snap_name}" of repository "{REPO_NAME}"') url = f'{API_URL}/repos/{REPO_NAME}/snapshots' payload = {'Name': snap_name} result = session.post(url, json=payload) try: data = result.json() except Exception: data = {} error = ( result.status_code < 200 or result.status_code > 299 or data.get('Name') != snap_name or not data.get('CreatedAt') ) if error: print(f'Fail to create snapshot "{snap_name}" of repository "{REPO_NAME}"') sys.exit(1) # Update published snapshot of repository print( f'Update published snapshot of repository "{REPO_NAME}" to "{snap_name}"') url = f'{API_URL}/publish/:./{REPO_NAME}' payload = { 'Snapshots': [ { 'Component': REPO_COMPONENT, 'Name': snap_name } ] } result = session.put(url, json=payload) if ( result.status_code < 200 or result.status_code > 299 ): print( f'Fail to update published snapshot of repository "{REPO_NAME}" to ' f'"{snap_name}"') sys.exit(1) print("Done.")