#!/usr/bin/python3 """" Entrypoint of a Woodpecker CI docker image plugin that permit to publish Debian packages on a Aptly repository using its API """ import datetime import os import re import sys from debian_parser import PackagesParser from requests import Session from requests.adapters import HTTPAdapter from urllib3.util import Retry def from_env(name, default=None): """Retrieve a parameter from environment""" for var in (f"PLUGIN_{name}", f"APTLY_{name}"): if var in os.environ: return os.environ[var] return default # Handle parameters from environment API_URL = from_env("API_URL", None) if not API_URL: print("API URL not provided") sys.exit(1) API_USERNAME = from_env("API_USERNAME", None) if not API_USERNAME: print("API username not provided") sys.exit(1) API_PASSWORD = from_env("API_PASSWORD", None) if not API_PASSWORD: print("API password not provided") sys.exit(1) MAX_RETRY = from_env("MAX_RETRIES", None) REPO_NAME = from_env("REPO_NAME", None) PREFIX = from_env("PREFIX", ".") REPO_COMPONENT = from_env("REPO_COMPONENT", "main") INPUT_PATH = from_env("PATH", "dist") SOURCE_NAME = from_env("SOURCE_PACKAGE_NAME", None) FORCE_OVERWRITE = from_env("FORCE_OVERWRITE", "false").lower() in ["1", "true", "yes"] # List changes files changes_files_regex = ( # pylint: disable=consider-using-f-string re.compile(r"^%s_.*\.changes$" % SOURCE_NAME) if SOURCE_NAME else re.compile(r"^.*\.changes$") ) changes_files = [] try: for filename in os.listdir(INPUT_PATH): filepath = os.path.join(INPUT_PATH, filename) if not os.path.isfile(filepath): continue if changes_files_regex.match(filename): changes_files.append(filepath) except FileNotFoundError: print(f'Specified directory path "{INPUT_PATH}" not found') sys.exit(1) except NotADirectoryError: print(f'Specified path "{INPUT_PATH}" is not a directory') sys.exit(1) if not changes_files: print(f"No changes file found in {INPUT_PATH}") sys.exit(1) # Initialize Aptly API client session = Session() session.auth = (API_USERNAME, API_PASSWORD) if MAX_RETRY: retries = Retry(total=int(MAX_RETRY), status_forcelist=list(range(500, 600))) session.mount(API_URL, HTTPAdapter(max_retries=retries)) def get_repo_name(dist): """Compute and retrieve repository name""" if REPO_NAME: return REPO_NAME value = f"{dist}_{REPO_COMPONENT}" if PREFIX != ".": value = f"{PREFIX}_{value}" return value def parse_changes_file(filepath): """Parse changes file to detect distribution and included files""" dirpath = os.path.dirname(filepath) with open(filepath, encoding="utf-8") as file_desc: changes_file = file_desc.read() parser = PackagesParser(changes_file) package_name = None distribution = None files = [] for infos in parser.parse(): for info in infos: if info["tag"].lower() == "files": for line in info["value"].split(" "): if not line: continue files.append(os.path.join(dirpath, line.split()[-1])) if info["tag"].lower() == "distribution": if distribution: print( "More than one distribution found in changes file" f"{os.path.basename(filepath)}." ) sys.exit(1) distribution = info["value"] if info["tag"].lower() == "source": if package_name: print( "More than one source package name found in changes " f"file {os.path.basename(filepath)}." ) sys.exit(1) package_name = info["value"] if not package_name: print( "Fail to detect source package name from changes file " f"{os.path.basename(filepath)}." ) sys.exit(1) if not distribution: print("Fail to detect distribution from changes file " f"{os.path.basename(filepath)}.") sys.exit(1) if not files: print("No included file found in changes file" f"{os.path.basename(filepath)}.") sys.exit(1) return (package_name, distribution, files) def get_published_distribution_other_components_sources(distribution): """Retrieve current published distribution using Aptly API""" url = f"{API_URL}/publish" result = session.get(url) if result.status_code != 200: print( "Fail to retrieve current published distribution " f"{distribution} using Aptly API (HTTP code: {result.status_code})" ) sys.exit(1) for data in result.json(): if data["Prefix"] != PREFIX: continue if data["Distribution"] != distribution: continue if data["SourceKind"] != "snapshot": print( f"The distribution {distribution} currently published on " f'prefix "{PREFIX}" do not sourcing packages from snapshot(s) ' f'but from {data["SourceKind"]}.' ) sys.exit(1) return [source for source in data["Sources"] if source["Component"] != REPO_COMPONENT] print( f"Distribution {distribution} seem not currently published on prefix " f'"{PREFIX}". Please manually publish it a first time before using ' f"{os.path.basename(sys.argv[0])}." ) sys.exit(1) def upload_file(package_name, filepath): """Upload a file using Aptly API""" url = f"{API_URL}/files/{package_name}" with open(filepath, "rb") as file_desc: result = session.post(url, files={"file": file_desc}) return ( result.status_code == 200 and f"{package_name}/{os.path.basename(filepath)}" in result.json() ) def include_file(repo_name, package_name, changes_file): """Include a changes file using Aptly API""" url = f"{API_URL}/repos/{repo_name}/include/{package_name}/" f"{os.path.basename(changes_file)}" result = session.post(url) data = result.json() if data.get("FailedFiles"): print() print(f"Some error occurred including {changes_file}:") print("Failed files:") for failed_file in data["FailedFiles"]: print(f" - {os.path.basename(failed_file)}") if data.get("Report", {}).get("Warnings"): print("Warnings:") print(" - %s" % "\n - ".join(data["Report"]["Warnings"])) print() return False if not (result.status_code == 200 and data.get("Report", {}).get("Added")): print(f"Unknown error occurred including {changes_file}" "See APTLY API logs for details.") return False return True for changes_file in changes_files: print(f"Handle changes file {changes_file}:") package_name, distribution, filepaths = parse_changes_file(changes_file) filepaths += [changes_file] repo_name = get_repo_name(distribution) other_components_sources = get_published_distribution_other_components_sources(distribution) print(" - Upload files:") for filepath in filepaths: if not upload_file(package_name, filepath): print(f" - {filepath}: fail to upload file. See APTLY API logs " "for details.") sys.exit(1) else: print(f" - {filepath}") print(f" - Include changes file {changes_file}:") if include_file(repo_name, package_name, changes_file): print(" - Changes file included") else: sys.exit(1) # Create a snapshot of the repository snap_name = datetime.datetime.now().strftime(f"%Y%m%d-%H%M%S_{repo_name}") print(f'Create new snapshot "{snap_name}" of repository "{repo_name}"') url = f"{API_URL}/repos/{repo_name}/snapshots" payload = {"Name": snap_name} result = session.post(url, json=payload) try: data = result.json() except Exception: # pylint: disable=broad-except data = {} error = ( result.status_code < 200 or result.status_code > 299 or data.get("Name") != snap_name or not data.get("CreatedAt") ) if error: print( f'Fail to create snapshot "{snap_name}" of repository ' f'"{repo_name}". See APTLY API logs for details.' ) sys.exit(1) # Update published snapshot of the distribution print( f'Update published snapshot of distribution "{distribution}" to ' f'"{snap_name}" (prefix: {PREFIX}, component: {REPO_COMPONENT})' ) if other_components_sources: print("Note: keep other currently published components:") for source in other_components_sources: print(f'- {source["Component"]}: {source["Name"]}') url = f"{API_URL}/publish/:{PREFIX}/{distribution}" payload = { "Snapshots": other_components_sources + [{"Component": REPO_COMPONENT, "Name": snap_name}], "ForceOverwrite": FORCE_OVERWRITE, } result = session.put(url, json=payload) if result.status_code < 200 or result.status_code > 299: print( "Fail to update published snapshot of distribution " f'"{distribution}" to "{snap_name}" (prefix: {PREFIX}, ' f"component: {REPO_COMPONENT}). See APTLY API logs for details." ) sys.exit(1) print("Done.")