#!/usr/bin/python3 """" Entrypoint of a Woodpecker CI docker image plugin that permit to publish Debian packages on a Aptly repository using its API """ import datetime import os import re import sys from requests import Session from requests.adapters import HTTPAdapter from urllib3.util import Retry from debian_parser import PackagesParser def from_env(name, default=None): """ Retrieve a parameter from environment """ for var in (f'PLUGIN_{name}', f'APTLY_{name}'): if var in os.environ: return os.environ[var] return default # Handle parameters from environment API_URL = from_env('API_URL', None) if not API_URL: print('API URL not provided') sys.exit(1) API_USERNAME = from_env('API_USERNAME', None) if not API_USERNAME: print('API username not provided') sys.exit(1) API_PASSWORD = from_env('API_PASSWORD', None) if not API_PASSWORD: print('API password not provided') sys.exit(1) MAX_RETRY = from_env('MAX_RETRIES', None) REPO_NAME = from_env('REPO_NAME', None) PREFIX = from_env('PREFIX', '.') REPO_COMPONENT = from_env('REPO_COMPONENT', 'main') INPUT_PATH = from_env('PATH', 'dist') SOURCE_NAME = from_env('SOURCE_PACKAGE_NAME', None) # List changes files changes_files_regex = ( # pylint: disable=consider-using-f-string re.compile(r'^%s_.*\.changes$' % SOURCE_NAME) if SOURCE_NAME else re.compile(r'^.*\.changes$') ) changes_files = [] try: for filename in os.listdir(INPUT_PATH): filepath = os.path.join(INPUT_PATH, filename) if not os.path.isfile(filepath): continue if changes_files_regex.match(filename): changes_files.append(filepath) except FileNotFoundError: print(f'Specified directory path "{INPUT_PATH}" not found') sys.exit(1) except NotADirectoryError: print(f'Specified path "{INPUT_PATH}" is not a directory') sys.exit(1) if not changes_files: print(f'No changes file found in {INPUT_PATH}') sys.exit(1) # Initialize Aptly API client session = Session() session.auth = (API_USERNAME, API_PASSWORD) if MAX_RETRY: retries = Retry( total=int(MAX_RETRY), status_forcelist=list(range(500, 600)) ) session.mount(API_URL, HTTPAdapter(max_retries=retries)) def get_repo_name(dist): """ Compute and retreive repository name """ if REPO_NAME: return REPO_NAME value = f'{dist}_{REPO_COMPONENT}' if PREFIX != ".": value = f'{PREFIX}_{value}' return value def parse_changes_file(filepath): """ Parse changes file to detect distribution and included files """ dirpath = os.path.dirname(filepath) with open(filepath, "r", encoding="utf-8") as file_desc: changes_file = file_desc.read() parser = PackagesParser(changes_file) package_name = None distribution = None files = [] for infos in parser.parse(): for info in infos: if info['tag'].lower() == 'files': for line in info['value'].split(' '): if not line: continue files.append(os.path.join(dirpath, line.split()[-1])) if info['tag'].lower() == 'distribution': if distribution: print( 'More than one distribution found in changes file' f'{os.path.basename(filepath)}.') sys.exit(1) distribution = info['value'] if info['tag'].lower() == 'source': if package_name: print( 'More than one source package name found in changes ' f'file {os.path.basename(filepath)}.') sys.exit(1) package_name = info['value'] if not package_name: print( 'Fail to detect source package name from changes file ' f'{os.path.basename(filepath)}.') sys.exit(1) if not distribution: print( 'Fail to detect distribution from changes file ' f'{os.path.basename(filepath)}.') sys.exit(1) if not files: print( 'No included file found in changes file' f'{os.path.basename(filepath)}.') sys.exit(1) return (package_name, distribution, files) def get_published_distribution_other_components_sources(distribution): """ Retreive current published distribution using Aptly API """ url = f'{API_URL}/publish' result = session.get(url) if result.status_code != 200: print( 'Fail to retreive current published distribution ' f'{distribution} using Aptly API (HTTP code: {result.status_code})' ) sys.exit(1) for data in result.json(): if data['Prefix'] != PREFIX: continue if data['Distribution'] != distribution: continue if data['SourceKind'] != 'snapshot': print( f'The distribution {distribution} currently published on ' f'prefix "{PREFIX}" do not sourcing packages from snapshot(s) ' f'but from {data["SourceKind"]}.' ) sys.exit(1) return [ source for source in data['Sources'] if source['Component'] != REPO_COMPONENT ] print( f'Distribution {distribution} seem not currently published on prefix ' f'"{PREFIX}". Please manually publish it a first time before using ' f'{os.path.basename(sys.argv[0])}.' ) sys.exit(1) return False def upload_file(package_name, filepath): """ Upload a file using Aptly API """ url = f'{API_URL}/files/{package_name}' with open(filepath, 'rb') as file_desc: result = session.post(url, files={'file': file_desc}) return ( result.status_code == 200 and f'{package_name}/{os.path.basename(filepath)}' in result.json() ) def include_file(repo_name, package_name, changes_file): """ Include a changes file using Aptly API """ url = ( f'{API_URL}/repos/{repo_name}/include/{package_name}/' f'{os.path.basename(changes_file)}' ) result = session.post(url) data = result.json() if data.get('FailedFiles'): print() print(f'Some error occurred including {changes_file}:') print('Failed files:') for failed_file in data['FailedFiles']: print(f' - {os.path.basename(failed_file)}') if data.get('Report', {}).get('Warnings'): print('Warnings:') print(' - %s' % '\n - '.join(data['Report']['Warnings'])) print() return False if not ( result.status_code == 200 and data.get('Report', {}).get('Added') ): print( f'Unknown error occurred including {changes_file}' 'See APTLY API logs for details.') return False return True for changes_file in changes_files: print(f'Handle changes file {changes_file}:') package_name, distribution, filepaths = parse_changes_file(changes_file) filepaths += [changes_file] repo_name = get_repo_name(distribution) other_components_sources = \ get_published_distribution_other_components_sources(distribution) print(' - Upload files:') for filepath in filepaths: if not upload_file(package_name, filepath): print( f' - {filepath}: fail to upload file. See APTLY API logs ' 'for details.') sys.exit(1) else: print(f' - {filepath}') print(f' - Include changes file {changes_file}:') if include_file(repo_name, package_name, changes_file): print(' - Changes file included') else: sys.exit(1) # Create a snapshot of the repository snap_name = datetime.datetime.now().strftime(f'%Y%m%d-%H%M%S_{repo_name}') print(f'Create new snapshot "{snap_name}" of repository "{repo_name}"') url = f'{API_URL}/repos/{repo_name}/snapshots' payload = {'Name': snap_name} result = session.post(url, json=payload) try: data = result.json() except Exception: # pylint: disable=broad-except data = {} error = ( result.status_code < 200 or result.status_code > 299 or data.get('Name') != snap_name or not data.get('CreatedAt') ) if error: print( f'Fail to create snapshot "{snap_name}" of repository ' f'"{repo_name}". See APTLY API logs for details.') sys.exit(1) # Update published snapshot of the distribution print( f'Update published snapshot of distribution "{distribution}" to ' f'"{snap_name}" (prefix: {PREFIX}, component: {REPO_COMPONENT})') if other_components_sources: print('Note: keep other currently published components:') for source in other_components_sources: print(f'- {source["Component"]}: {source["Name"]}') url = f'{API_URL}/publish/:{PREFIX}/{distribution}' payload = { 'Snapshots': other_components_sources + [ { 'Component': REPO_COMPONENT, 'Name': snap_name } ] } result = session.put(url, json=payload) if ( result.status_code < 200 or result.status_code > 299 ): print( 'Fail to update published snapshot of distribution ' f'"{distribution}" to "{snap_name}" (prefix: {PREFIX}, ' f'component: {REPO_COMPONENT}). See APTLY API logs for details.') sys.exit(1) print("Done.")