diff --git a/.gitignore b/.gitignore index b25c15b..3d0d534 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ *~ +/venv +/dist diff --git a/.pylintrc b/.pylintrc index d9cf072..fa0c9d4 100644 --- a/.pylintrc +++ b/.pylintrc @@ -2,3 +2,4 @@ disable=locally-disabled, redefined-outer-name, invalid-name, + too-many-locals, diff --git a/Dockerfile b/Dockerfile index 07f21ff..b9660f2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,5 @@ FROM node:16-alpine ADD aptly-publish /bin/ RUN chmod +x /bin/aptly-publish -RUN apk -Uuv add python3 py3-requests py3-urllib3 py3-pip bash -RUN pip install debian-parser +RUN apk -Uuv add python3 py3-requests py3-urllib3 py3-pip py3-debian py3-chardet bash ENTRYPOINT /bin/aptly-publish diff --git a/aptly-publish b/aptly-publish index c9fb012..0390a79 100755 --- a/aptly-publish +++ b/aptly-publish @@ -1,183 +1,113 @@ #!/usr/bin/python3 -"""" +""" Entrypoint of a Woodpecker CI docker image plugin that permit to publish Debian packages on a Aptly repository using its API """ +import argparse import datetime +import logging import os import re import sys -from debian_parser import PackagesParser +from debian.deb822 import Changes +from debian.debfile import DebFile from requests import Session from requests.adapters import HTTPAdapter from urllib3.util import Retry - -def from_env(name, default=None): - """Retrieve a parameter from environment""" - for var in (f"PLUGIN_{name}", f"APTLY_{name}"): - if var in os.environ: - return os.environ[var] - return default +included_in_changes_files = {} -# Handle parameters from environment -API_URL = from_env("API_URL", None) -if not API_URL: - print("API URL not provided") - sys.exit(1) - -API_USERNAME = from_env("API_USERNAME", None) -if not API_USERNAME: - print("API username not provided") - sys.exit(1) - -API_PASSWORD = from_env("API_PASSWORD", None) -if not API_PASSWORD: - print("API password not provided") - sys.exit(1) - -MAX_RETRY = from_env("MAX_RETRIES", None) - -REPO_NAME = from_env("REPO_NAME", None) -PREFIX = from_env("PREFIX", ".") -REPO_COMPONENT = from_env("REPO_COMPONENT", "main") -INPUT_PATH = from_env("PATH", "dist") -SOURCE_NAME = from_env("SOURCE_PACKAGE_NAME", None) -FORCE_OVERWRITE = from_env("FORCE_OVERWRITE", "false").lower() in ["1", "true", "yes"] - -# List changes files -changes_files_regex = ( - # pylint: disable=consider-using-f-string - re.compile(r"^%s_.*\.changes$" % SOURCE_NAME) - if SOURCE_NAME - else re.compile(r"^.*\.changes$") -) -changes_files = [] -try: - for filename in os.listdir(INPUT_PATH): - filepath = os.path.join(INPUT_PATH, filename) - if not os.path.isfile(filepath): - continue - if changes_files_regex.match(filename): - changes_files.append(filepath) -except FileNotFoundError: - print(f'Specified directory path "{INPUT_PATH}" not found') - sys.exit(1) -except NotADirectoryError: - print(f'Specified path "{INPUT_PATH}" is not a directory') - sys.exit(1) - -if not changes_files: - print(f"No changes file found in {INPUT_PATH}") - sys.exit(1) - - -# Initialize Aptly API client -session = Session() -session.auth = (API_USERNAME, API_PASSWORD) -if MAX_RETRY: - retries = Retry(total=int(MAX_RETRY), status_forcelist=list(range(500, 600))) - session.mount(API_URL, HTTPAdapter(max_retries=retries)) - - -def get_repo_name(dist): +def get_repo_name(args, dist): """Compute and retrieve repository name""" - if REPO_NAME: - return REPO_NAME - value = f"{dist}_{REPO_COMPONENT}" - if PREFIX != ".": - value = f"{PREFIX}_{value}" + if args.repo_name: + return args.repo_name + value = f"{dist}_{args.repo_component}" + if args.prefix != ".": + value = f"{args.prefix}_{value}" return value def parse_changes_file(filepath): - """Parse changes file to detect distribution and included files""" + """Parse changes file to detect source package name, distribution and included files""" dirpath = os.path.dirname(filepath) with open(filepath, encoding="utf-8") as file_desc: - changes_file = file_desc.read() + changes = Changes(file_desc) - parser = PackagesParser(changes_file) - package_name = None - distribution = None - files = [] - for infos in parser.parse(): - for info in infos: - if info["tag"].lower() == "files": - for line in info["value"].split(" "): - if not line: - continue - files.append(os.path.join(dirpath, line.split()[-1])) - if info["tag"].lower() == "distribution": - if distribution: - print( - "More than one distribution found in changes file" - f"{os.path.basename(filepath)}." - ) - sys.exit(1) - distribution = info["value"] - if info["tag"].lower() == "source": - if package_name: - print( - "More than one source package name found in changes " - f"file {os.path.basename(filepath)}." - ) - sys.exit(1) - package_name = info["value"] + package_name = changes["Source"] + distribution = changes["Distribution"] + files = [os.path.join(dirpath, f["name"]) for f in changes["files"]] if not package_name: - print( - "Fail to detect source package name from changes file " f"{os.path.basename(filepath)}." + logging.error( + "Failed to detect source package name from changes file %s.", os.path.basename(filepath) ) sys.exit(1) if not distribution: - print("Fail to detect distribution from changes file " f"{os.path.basename(filepath)}.") + logging.error( + "Failed to detect distribution from changes file %s.", os.path.basename(filepath) + ) sys.exit(1) if not files: - print("No included file found in changes file" f"{os.path.basename(filepath)}.") + logging.error("No included file found in changes file %s.", os.path.basename(filepath)) sys.exit(1) - return (package_name, distribution, files) + return package_name, distribution, files -def get_published_distribution_other_components_sources(distribution): +def parse_deb_package(filepath): + """Parse Debian package to detect package name and distribution""" + with open(filepath, "rb") as f: + deb_package = DebFile(fileobj=f) + # Lire les métadonnées + control = deb_package.debcontrol() + changelog = deb_package.changelog() + return control["Package"], control["Version"], changelog.distributions + + +def get_published_distribution_other_components_sources(args, distribution): """Retrieve current published distribution using Aptly API""" - url = f"{API_URL}/publish" + url = f"{args.api_url}/publish" result = session.get(url) if result.status_code != 200: - print( - "Fail to retrieve current published distribution " - f"{distribution} using Aptly API (HTTP code: {result.status_code})" + logging.error( + "Failed to retrieve current published distribution %s using Aptly API (HTTP code: %s)", + distribution, + result.status_code, ) sys.exit(1) for data in result.json(): - if data["Prefix"] != PREFIX: + if data["Prefix"] != args.prefix: continue if data["Distribution"] != distribution: continue if data["SourceKind"] != "snapshot": - print( - f"The distribution {distribution} currently published on " - f'prefix "{PREFIX}" do not sourcing packages from snapshot(s) ' - f'but from {data["SourceKind"]}.' + logging.error( + "The distribution %s currently published on prefix '%s' do not sourcing packages " + "from snapshot(s) but from %s.", + distribution, + args.prefix, + data["SourceKind"], ) sys.exit(1) - return [source for source in data["Sources"] if source["Component"] != REPO_COMPONENT] - print( - f"Distribution {distribution} seem not currently published on prefix " - f'"{PREFIX}". Please manually publish it a first time before using ' - f"{os.path.basename(sys.argv[0])}." + return [source for source in data["Sources"] if source["Component"] != args.repo_component] + logging.error( + "Distribution %s seem not currently published on prefix '%s'. Please manually publish it " + "a first time before using %s.", + distribution, + args.prefix, + os.path.basename(sys.argv[0]), ) sys.exit(1) -def upload_file(package_name, filepath): +def upload_file(session, package_name, filepath): """Upload a file using Aptly API""" - url = f"{API_URL}/files/{package_name}" + url = f"{args.api_url}/files/{package_name}" with open(filepath, "rb") as file_desc: result = session.post(url, files={"file": file_desc}) return ( @@ -186,60 +116,142 @@ def upload_file(package_name, filepath): ) -def include_file(repo_name, package_name, changes_file): +def include_changes_file(repo_name, package_name, changes_file): """Include a changes file using Aptly API""" - url = f"{API_URL}/repos/{repo_name}/include/{package_name}/" f"{os.path.basename(changes_file)}" + url = ( + f"{args.api_url}/repos/{repo_name}/include/{package_name}/" + f"{os.path.basename(changes_file)}" + ) result = session.post(url) data = result.json() if data.get("FailedFiles"): - print() - print(f"Some error occurred including {changes_file}:") - print("Failed files:") - for failed_file in data["FailedFiles"]: - print(f" - {os.path.basename(failed_file)}") - if data.get("Report", {}).get("Warnings"): - print("Warnings:") - print(" - %s" % "\n - ".join(data["Report"]["Warnings"])) - print() + logging.error( + "Some error occurred including %s:\nFailed files:\n - %s\nWarnings: - %s", + changes_file, + "\n - ".join(data["FailedFiles"]), + "\n - ".join(data.get("Report", {}).get("Warnings")) + if data.get("Report", {}).get("Warnings") + else "No warning", + ) return False if not (result.status_code == 200 and data.get("Report", {}).get("Added")): - print(f"Unknown error occurred including {changes_file}" "See APTLY API logs for details.") + logging.error( + "Unknown error occurred including %s. See APTLY API logs for details.", changes_file + ) return False return True -for changes_file in changes_files: - print(f"Handle changes file {changes_file}:") +def include_deb_package(repo_name, path): + """Include a changes file using Aptly API""" + url = f"{args.api_url}/repos/{repo_name}/file/{os.path.basename(path)}" + result = session.post(url) + if result.status_code < 200 or result.status_code > 299: + data = result.json() + logging.error("Failed to include Debian package %s: %s", path, data.get("error")) + return False + return True + + +def list_changes_and_deb_packages(args): + """List changes files according to specified script arguments""" + changes_files_regex = ( + # pylint: disable=consider-using-f-string + re.compile(r"^%s_.*\.changes$" % args.source_package_name, re.I) + if args.source_package_name + else re.compile(r"^.*\.changes$", re.I) + ) + deb_packages_regex = re.compile(r"^.*\.deb$", re.I) + changes_files = [] + deb_packages = [] + try: + for filename in os.listdir(args.path): + filepath = os.path.join(args.path, filename) + if not os.path.isfile(filepath): + continue + if changes_files_regex.match(filename): + changes_files.append(filepath) + elif deb_packages_regex.match(filename): + deb_packages.append(filepath) + except FileNotFoundError: + logging.error('Specified directory path "%s" not found', args.path) + sys.exit(1) + except NotADirectoryError: + logging.error('Specified path "%s" is not a directory', args.path) + sys.exit(1) + return changes_files, deb_packages + + +def handle_changes_file(args, session, changes_file): + """Handle one changes file""" + logging.info("Handle changes file %s:", changes_file) package_name, distribution, filepaths = parse_changes_file(changes_file) - filepaths += [changes_file] - repo_name = get_repo_name(distribution) + logging.info(" Detected package: %s (distribution=%s)", package_name, distribution) + logging.debug(" Included files:\n %s", "\n ".join(filepaths)) + filepaths.append(changes_file) + repo_name = get_repo_name(args, distribution) - other_components_sources = get_published_distribution_other_components_sources(distribution) + other_components_sources = get_published_distribution_other_components_sources( + args, distribution + ) - print(" - Upload files:") + logging.info(" Uploading files:") for filepath in filepaths: - if not upload_file(package_name, filepath): - print(f" - {filepath}: fail to upload file. See APTLY API logs " "for details.") + if not upload_file(session, package_name, filepath): + logging.error("Failed to upload %s. See APTLY API logs for details.", filepath) sys.exit(1) - else: - print(f" - {filepath}") + logging.info(" %s", filepath) + included_in_changes_files[filepath] = changes_file - print(f" - Include changes file {changes_file}:") - if include_file(repo_name, package_name, changes_file): - print(" - Changes file included") + logging.info(" Including changes file %s...", changes_file) + if include_changes_file(repo_name, package_name, changes_file): + logging.info(" Changes file %s included", changes_file) else: sys.exit(1) + return distribution, repo_name, other_components_sources + + +def handle_deb_package(args, session, path): + """Handle one debian package""" + logging.info("Handle Debian package %s:", path) + package_name, version, distribution = parse_deb_package(path) + logging.info( + " Detected package: %s (version=%s, distribution=%s)", package_name, version, distribution + ) + repo_name = get_repo_name(args, distribution) + + other_components_sources = get_published_distribution_other_components_sources( + args, distribution + ) + + logging.info(" Uploading Debian package %s...", path) + if not upload_file(session, package_name, path): + logging.error("Failed to upload file. See APTLY API logs for details.") + sys.exit(1) + logging.info(" Debian package %s uploaded.", path) + + logging.info(" Adding Debian package %s...", path) + if not include_deb_package(repo_name, path): + logging.error("Failed to add file. See APTLY API logs for details.") + sys.exit(1) + logging.info(" Debian package %s added.", path) + + return distribution, repo_name, other_components_sources + + +def publish_repo(args, session, distribution, repo_name, other_components_sources): + """Publish repository of specified distributions""" # Create a snapshot of the repository snap_name = datetime.datetime.now().strftime(f"%Y%m%d-%H%M%S_{repo_name}") - print(f'Create new snapshot "{snap_name}" of repository "{repo_name}"') + logging.info('Create new snapshot "%s" of repository "%s"', snap_name, repo_name) - url = f"{API_URL}/repos/{repo_name}/snapshots" + url = f"{args.api_url}/repos/{repo_name}/snapshots" payload = {"Name": snap_name} result = session.post(url, json=payload) try: data = result.json() - except Exception: # pylint: disable=broad-except + except ValueError: data = {} error = ( result.status_code < 200 @@ -248,33 +260,119 @@ for changes_file in changes_files: or not data.get("CreatedAt") ) if error: - print( - f'Fail to create snapshot "{snap_name}" of repository ' - f'"{repo_name}". See APTLY API logs for details.' + logging.error( + 'Failed to create snapshot "%s" of repository "%s". See APTLY API logs for details.', + snap_name, + repo_name, ) sys.exit(1) # Update published snapshot of the distribution - print( - f'Update published snapshot of distribution "{distribution}" to ' - f'"{snap_name}" (prefix: {PREFIX}, component: {REPO_COMPONENT})' + logging.info( + "Update published snapshot of distribution '%s' to '%s' (prefix: '%s', component: '%s')", + distribution, + snap_name, + args.prefix, + args.repo_component, ) if other_components_sources: - print("Note: keep other currently published components:") + logging.info("Note: keep other currently published components:") for source in other_components_sources: - print(f'- {source["Component"]}: {source["Name"]}') - url = f"{API_URL}/publish/:{PREFIX}/{distribution}" + logging.info(" %s: %s", source["Component"], source["Name"]) + url = f"{args.api_url}/publish/:{args.prefix}/{distribution}" payload = { - "Snapshots": other_components_sources + [{"Component": REPO_COMPONENT, "Name": snap_name}], - "ForceOverwrite": FORCE_OVERWRITE, + "Snapshots": other_components_sources + + [{"Component": args.repo_component, "Name": snap_name}], + "ForceOverwrite": args.force_overwrite, } result = session.put(url, json=payload) if result.status_code < 200 or result.status_code > 299: - print( - "Fail to update published snapshot of distribution " - f'"{distribution}" to "{snap_name}" (prefix: {PREFIX}, ' - f"component: {REPO_COMPONENT}). See APTLY API logs for details." + logging.error( + "Failed to update published snapshot of distribution '%s' to '%s' " + "(prefix: '%s', component: '%s'). See APTLY API logs for details.", + distribution, + snap_name, + args.prefix, + args.repo_component, ) sys.exit(1) + return True -print("Done.") + +def from_env(name, default=None): + """Retrieve a parameter from environment""" + for var in (f"PLUGIN_{name}", f"APTLY_{name}", f"APT_{name}"): + if var in os.environ: + return os.environ[var] + return default + + +parser = argparse.ArgumentParser() + +parser.add_argument("--api-url", help="APTLY API URL", default=from_env("API_URL")) +parser.add_argument("--api-username", help="APTLY API username", default=from_env("API_USERNAME")) +parser.add_argument("--api-password", help="APTLY API password", default=from_env("API_PASSWORD")) +parser.add_argument("--max-retries", default=from_env("MAX_RETRIES")) +parser.add_argument("--repo-name", default=from_env("REPO_NAME")) +parser.add_argument("--prefix", default=from_env("PREFIX", ".")) +parser.add_argument("--repo-component", default=from_env("REPO_COMPONENT", "main")) +parser.add_argument("--path", default=from_env("PATH", "dist")) +parser.add_argument("--source-package-name", default=from_env("SOURCE_PACKAGE_NAME")) +parser.add_argument( + "--force-overwrite", + action="store_true", + default=from_env("FORCE_OVERWRITE", "false").lower() in ["1", "true", "yes", "on"], +) +parser.add_argument("-d", "--debug", action="store_true", help="Enable debug mode") + +args = parser.parse_args() + +# Handle parameters from environment +if not args.api_url: + parser.error("API URL not provided") + +if not args.api_username: + parser.error("API username not provided") + +if not args.api_password: + parser.error("API password not provided") + +logging.basicConfig( + level=logging.DEBUG if args.debug else logging.INFO, format="%(levelname)s - %(message)s" +) + +# Initialize Aptly API client +session = Session() +session.auth = (args.api_username, args.api_password) +if args.max_retries: + retries = Retry(total=int(args.max_retries), status_forcelist=list(range(500, 600))) + session.mount(args.api_url, HTTPAdapter(max_retries=retries)) + +to_publish = [] +changes_files, deb_packages = list_changes_and_deb_packages(args) + +if not changes_files and not deb_packages: + logging.error("No changes file or Debian package found in %s", args.path) + sys.exit(1) + +for changes_file in changes_files: + pub = handle_changes_file(args, session, changes_file) + if pub not in to_publish: + to_publish.append(pub) + +for deb_package in deb_packages: + if deb_package in included_in_changes_files: + logging.debug( + "Debian file %s already handled by %s", + deb_package, + included_in_changes_files[deb_package], + ) + continue + pub = handle_deb_package(args, session, deb_package) + if pub not in to_publish: + to_publish.append(pub) + +for pub_args in to_publish: + publish_repo(args, session, *pub_args) + +logging.info("Done.")