Allow to handle standalone Debian package file and replace dependency on lib debian-parser by more common debian lib

This commit is contained in:
Benjamin Renard 2025-02-09 19:11:28 +01:00
parent a27af14040
commit 449f5f51c7
4 changed files with 272 additions and 172 deletions

2
.gitignore vendored
View file

@ -1 +1,3 @@
*~ *~
/venv
/dist

View file

@ -2,3 +2,4 @@
disable=locally-disabled, disable=locally-disabled,
redefined-outer-name, redefined-outer-name,
invalid-name, invalid-name,
too-many-locals,

View file

@ -1,6 +1,5 @@
FROM node:16-alpine FROM node:16-alpine
ADD aptly-publish /bin/ ADD aptly-publish /bin/
RUN chmod +x /bin/aptly-publish RUN chmod +x /bin/aptly-publish
RUN apk -Uuv add python3 py3-requests py3-urllib3 py3-pip bash RUN apk -Uuv add python3 py3-requests py3-urllib3 py3-pip py3-debian py3-chardet bash
RUN pip install debian-parser
ENTRYPOINT /bin/aptly-publish ENTRYPOINT /bin/aptly-publish

View file

@ -1,183 +1,113 @@
#!/usr/bin/python3 #!/usr/bin/python3
"""" """
Entrypoint of a Woodpecker CI docker image plugin that permit to publish Debian Entrypoint of a Woodpecker CI docker image plugin that permit to publish Debian
packages on a Aptly repository using its API packages on a Aptly repository using its API
""" """
import argparse
import datetime import datetime
import logging
import os import os
import re import re
import sys import sys
from debian_parser import PackagesParser from debian.deb822 import Changes
from debian.debfile import DebFile
from requests import Session from requests import Session
from requests.adapters import HTTPAdapter from requests.adapters import HTTPAdapter
from urllib3.util import Retry from urllib3.util import Retry
included_in_changes_files = {}
def from_env(name, default=None):
"""Retrieve a parameter from environment"""
for var in (f"PLUGIN_{name}", f"APTLY_{name}"):
if var in os.environ:
return os.environ[var]
return default
# Handle parameters from environment def get_repo_name(args, dist):
API_URL = from_env("API_URL", None)
if not API_URL:
print("API URL not provided")
sys.exit(1)
API_USERNAME = from_env("API_USERNAME", None)
if not API_USERNAME:
print("API username not provided")
sys.exit(1)
API_PASSWORD = from_env("API_PASSWORD", None)
if not API_PASSWORD:
print("API password not provided")
sys.exit(1)
MAX_RETRY = from_env("MAX_RETRIES", None)
REPO_NAME = from_env("REPO_NAME", None)
PREFIX = from_env("PREFIX", ".")
REPO_COMPONENT = from_env("REPO_COMPONENT", "main")
INPUT_PATH = from_env("PATH", "dist")
SOURCE_NAME = from_env("SOURCE_PACKAGE_NAME", None)
FORCE_OVERWRITE = from_env("FORCE_OVERWRITE", "false").lower() in ["1", "true", "yes"]
# List changes files
changes_files_regex = (
# pylint: disable=consider-using-f-string
re.compile(r"^%s_.*\.changes$" % SOURCE_NAME)
if SOURCE_NAME
else re.compile(r"^.*\.changes$")
)
changes_files = []
try:
for filename in os.listdir(INPUT_PATH):
filepath = os.path.join(INPUT_PATH, filename)
if not os.path.isfile(filepath):
continue
if changes_files_regex.match(filename):
changes_files.append(filepath)
except FileNotFoundError:
print(f'Specified directory path "{INPUT_PATH}" not found')
sys.exit(1)
except NotADirectoryError:
print(f'Specified path "{INPUT_PATH}" is not a directory')
sys.exit(1)
if not changes_files:
print(f"No changes file found in {INPUT_PATH}")
sys.exit(1)
# Initialize Aptly API client
session = Session()
session.auth = (API_USERNAME, API_PASSWORD)
if MAX_RETRY:
retries = Retry(total=int(MAX_RETRY), status_forcelist=list(range(500, 600)))
session.mount(API_URL, HTTPAdapter(max_retries=retries))
def get_repo_name(dist):
"""Compute and retrieve repository name""" """Compute and retrieve repository name"""
if REPO_NAME: if args.repo_name:
return REPO_NAME return args.repo_name
value = f"{dist}_{REPO_COMPONENT}" value = f"{dist}_{args.repo_component}"
if PREFIX != ".": if args.prefix != ".":
value = f"{PREFIX}_{value}" value = f"{args.prefix}_{value}"
return value return value
def parse_changes_file(filepath): def parse_changes_file(filepath):
"""Parse changes file to detect distribution and included files""" """Parse changes file to detect source package name, distribution and included files"""
dirpath = os.path.dirname(filepath) dirpath = os.path.dirname(filepath)
with open(filepath, encoding="utf-8") as file_desc: with open(filepath, encoding="utf-8") as file_desc:
changes_file = file_desc.read() changes = Changes(file_desc)
parser = PackagesParser(changes_file) package_name = changes["Source"]
package_name = None distribution = changes["Distribution"]
distribution = None files = [os.path.join(dirpath, f["name"]) for f in changes["files"]]
files = []
for infos in parser.parse():
for info in infos:
if info["tag"].lower() == "files":
for line in info["value"].split(" "):
if not line:
continue
files.append(os.path.join(dirpath, line.split()[-1]))
if info["tag"].lower() == "distribution":
if distribution:
print(
"More than one distribution found in changes file"
f"{os.path.basename(filepath)}."
)
sys.exit(1)
distribution = info["value"]
if info["tag"].lower() == "source":
if package_name:
print(
"More than one source package name found in changes "
f"file {os.path.basename(filepath)}."
)
sys.exit(1)
package_name = info["value"]
if not package_name: if not package_name:
print( logging.error(
"Fail to detect source package name from changes file " f"{os.path.basename(filepath)}." "Failed to detect source package name from changes file %s.", os.path.basename(filepath)
) )
sys.exit(1) sys.exit(1)
if not distribution: if not distribution:
print("Fail to detect distribution from changes file " f"{os.path.basename(filepath)}.") logging.error(
"Failed to detect distribution from changes file %s.", os.path.basename(filepath)
)
sys.exit(1) sys.exit(1)
if not files: if not files:
print("No included file found in changes file" f"{os.path.basename(filepath)}.") logging.error("No included file found in changes file %s.", os.path.basename(filepath))
sys.exit(1) sys.exit(1)
return (package_name, distribution, files) return package_name, distribution, files
def get_published_distribution_other_components_sources(distribution): def parse_deb_package(filepath):
"""Parse Debian package to detect package name and distribution"""
with open(filepath, "rb") as f:
deb_package = DebFile(fileobj=f)
# Lire les métadonnées
control = deb_package.debcontrol()
changelog = deb_package.changelog()
return control["Package"], control["Version"], changelog.distributions
def get_published_distribution_other_components_sources(args, distribution):
"""Retrieve current published distribution using Aptly API""" """Retrieve current published distribution using Aptly API"""
url = f"{API_URL}/publish" url = f"{args.api_url}/publish"
result = session.get(url) result = session.get(url)
if result.status_code != 200: if result.status_code != 200:
print( logging.error(
"Fail to retrieve current published distribution " "Failed to retrieve current published distribution %s using Aptly API (HTTP code: %s)",
f"{distribution} using Aptly API (HTTP code: {result.status_code})" distribution,
result.status_code,
) )
sys.exit(1) sys.exit(1)
for data in result.json(): for data in result.json():
if data["Prefix"] != PREFIX: if data["Prefix"] != args.prefix:
continue continue
if data["Distribution"] != distribution: if data["Distribution"] != distribution:
continue continue
if data["SourceKind"] != "snapshot": if data["SourceKind"] != "snapshot":
print( logging.error(
f"The distribution {distribution} currently published on " "The distribution %s currently published on prefix '%s' do not sourcing packages "
f'prefix "{PREFIX}" do not sourcing packages from snapshot(s) ' "from snapshot(s) but from %s.",
f'but from {data["SourceKind"]}.' distribution,
args.prefix,
data["SourceKind"],
) )
sys.exit(1) sys.exit(1)
return [source for source in data["Sources"] if source["Component"] != REPO_COMPONENT] return [source for source in data["Sources"] if source["Component"] != args.repo_component]
print( logging.error(
f"Distribution {distribution} seem not currently published on prefix " "Distribution %s seem not currently published on prefix '%s'. Please manually publish it "
f'"{PREFIX}". Please manually publish it a first time before using ' "a first time before using %s.",
f"{os.path.basename(sys.argv[0])}." distribution,
args.prefix,
os.path.basename(sys.argv[0]),
) )
sys.exit(1) sys.exit(1)
def upload_file(package_name, filepath): def upload_file(session, package_name, filepath):
"""Upload a file using Aptly API""" """Upload a file using Aptly API"""
url = f"{API_URL}/files/{package_name}" url = f"{args.api_url}/files/{package_name}"
with open(filepath, "rb") as file_desc: with open(filepath, "rb") as file_desc:
result = session.post(url, files={"file": file_desc}) result = session.post(url, files={"file": file_desc})
return ( return (
@ -186,60 +116,142 @@ def upload_file(package_name, filepath):
) )
def include_file(repo_name, package_name, changes_file): def include_changes_file(repo_name, package_name, changes_file):
"""Include a changes file using Aptly API""" """Include a changes file using Aptly API"""
url = f"{API_URL}/repos/{repo_name}/include/{package_name}/" f"{os.path.basename(changes_file)}" url = (
f"{args.api_url}/repos/{repo_name}/include/{package_name}/"
f"{os.path.basename(changes_file)}"
)
result = session.post(url) result = session.post(url)
data = result.json() data = result.json()
if data.get("FailedFiles"): if data.get("FailedFiles"):
print() logging.error(
print(f"Some error occurred including {changes_file}:") "Some error occurred including %s:\nFailed files:\n - %s\nWarnings: - %s",
print("Failed files:") changes_file,
for failed_file in data["FailedFiles"]: "\n - ".join(data["FailedFiles"]),
print(f" - {os.path.basename(failed_file)}") "\n - ".join(data.get("Report", {}).get("Warnings"))
if data.get("Report", {}).get("Warnings"): if data.get("Report", {}).get("Warnings")
print("Warnings:") else "No warning",
print(" - %s" % "\n - ".join(data["Report"]["Warnings"])) )
print()
return False return False
if not (result.status_code == 200 and data.get("Report", {}).get("Added")): if not (result.status_code == 200 and data.get("Report", {}).get("Added")):
print(f"Unknown error occurred including {changes_file}" "See APTLY API logs for details.") logging.error(
"Unknown error occurred including %s. See APTLY API logs for details.", changes_file
)
return False return False
return True return True
for changes_file in changes_files: def include_deb_package(repo_name, path):
print(f"Handle changes file {changes_file}:") """Include a changes file using Aptly API"""
url = f"{args.api_url}/repos/{repo_name}/file/{os.path.basename(path)}"
result = session.post(url)
if result.status_code < 200 or result.status_code > 299:
data = result.json()
logging.error("Failed to include Debian package %s: %s", path, data.get("error"))
return False
return True
def list_changes_and_deb_packages(args):
"""List changes files according to specified script arguments"""
changes_files_regex = (
# pylint: disable=consider-using-f-string
re.compile(r"^%s_.*\.changes$" % args.source_package_name, re.I)
if args.source_package_name
else re.compile(r"^.*\.changes$", re.I)
)
deb_packages_regex = re.compile(r"^.*\.deb$", re.I)
changes_files = []
deb_packages = []
try:
for filename in os.listdir(args.path):
filepath = os.path.join(args.path, filename)
if not os.path.isfile(filepath):
continue
if changes_files_regex.match(filename):
changes_files.append(filepath)
elif deb_packages_regex.match(filename):
deb_packages.append(filepath)
except FileNotFoundError:
logging.error('Specified directory path "%s" not found', args.path)
sys.exit(1)
except NotADirectoryError:
logging.error('Specified path "%s" is not a directory', args.path)
sys.exit(1)
return changes_files, deb_packages
def handle_changes_file(args, session, changes_file):
"""Handle one changes file"""
logging.info("Handle changes file %s:", changes_file)
package_name, distribution, filepaths = parse_changes_file(changes_file) package_name, distribution, filepaths = parse_changes_file(changes_file)
filepaths += [changes_file] logging.info(" Detected package: %s (distribution=%s)", package_name, distribution)
repo_name = get_repo_name(distribution) logging.debug(" Included files:\n %s", "\n ".join(filepaths))
filepaths.append(changes_file)
repo_name = get_repo_name(args, distribution)
other_components_sources = get_published_distribution_other_components_sources(distribution) other_components_sources = get_published_distribution_other_components_sources(
args, distribution
)
print(" - Upload files:") logging.info(" Uploading files:")
for filepath in filepaths: for filepath in filepaths:
if not upload_file(package_name, filepath): if not upload_file(session, package_name, filepath):
print(f" - {filepath}: fail to upload file. See APTLY API logs " "for details.") logging.error("Failed to upload %s. See APTLY API logs for details.", filepath)
sys.exit(1) sys.exit(1)
else: logging.info(" %s", filepath)
print(f" - {filepath}") included_in_changes_files[filepath] = changes_file
print(f" - Include changes file {changes_file}:") logging.info(" Including changes file %s...", changes_file)
if include_file(repo_name, package_name, changes_file): if include_changes_file(repo_name, package_name, changes_file):
print(" - Changes file included") logging.info(" Changes file %s included", changes_file)
else: else:
sys.exit(1) sys.exit(1)
return distribution, repo_name, other_components_sources
def handle_deb_package(args, session, path):
"""Handle one debian package"""
logging.info("Handle Debian package %s:", path)
package_name, version, distribution = parse_deb_package(path)
logging.info(
" Detected package: %s (version=%s, distribution=%s)", package_name, version, distribution
)
repo_name = get_repo_name(args, distribution)
other_components_sources = get_published_distribution_other_components_sources(
args, distribution
)
logging.info(" Uploading Debian package %s...", path)
if not upload_file(session, package_name, path):
logging.error("Failed to upload file. See APTLY API logs for details.")
sys.exit(1)
logging.info(" Debian package %s uploaded.", path)
logging.info(" Adding Debian package %s...", path)
if not include_deb_package(repo_name, path):
logging.error("Failed to add file. See APTLY API logs for details.")
sys.exit(1)
logging.info(" Debian package %s added.", path)
return distribution, repo_name, other_components_sources
def publish_repo(args, session, distribution, repo_name, other_components_sources):
"""Publish repository of specified distributions"""
# Create a snapshot of the repository # Create a snapshot of the repository
snap_name = datetime.datetime.now().strftime(f"%Y%m%d-%H%M%S_{repo_name}") snap_name = datetime.datetime.now().strftime(f"%Y%m%d-%H%M%S_{repo_name}")
print(f'Create new snapshot "{snap_name}" of repository "{repo_name}"') logging.info('Create new snapshot "%s" of repository "%s"', snap_name, repo_name)
url = f"{API_URL}/repos/{repo_name}/snapshots" url = f"{args.api_url}/repos/{repo_name}/snapshots"
payload = {"Name": snap_name} payload = {"Name": snap_name}
result = session.post(url, json=payload) result = session.post(url, json=payload)
try: try:
data = result.json() data = result.json()
except Exception: # pylint: disable=broad-except except ValueError:
data = {} data = {}
error = ( error = (
result.status_code < 200 result.status_code < 200
@ -248,33 +260,119 @@ for changes_file in changes_files:
or not data.get("CreatedAt") or not data.get("CreatedAt")
) )
if error: if error:
print( logging.error(
f'Fail to create snapshot "{snap_name}" of repository ' 'Failed to create snapshot "%s" of repository "%s". See APTLY API logs for details.',
f'"{repo_name}". See APTLY API logs for details.' snap_name,
repo_name,
) )
sys.exit(1) sys.exit(1)
# Update published snapshot of the distribution # Update published snapshot of the distribution
print( logging.info(
f'Update published snapshot of distribution "{distribution}" to ' "Update published snapshot of distribution '%s' to '%s' (prefix: '%s', component: '%s')",
f'"{snap_name}" (prefix: {PREFIX}, component: {REPO_COMPONENT})' distribution,
snap_name,
args.prefix,
args.repo_component,
) )
if other_components_sources: if other_components_sources:
print("Note: keep other currently published components:") logging.info("Note: keep other currently published components:")
for source in other_components_sources: for source in other_components_sources:
print(f'- {source["Component"]}: {source["Name"]}') logging.info(" %s: %s", source["Component"], source["Name"])
url = f"{API_URL}/publish/:{PREFIX}/{distribution}" url = f"{args.api_url}/publish/:{args.prefix}/{distribution}"
payload = { payload = {
"Snapshots": other_components_sources + [{"Component": REPO_COMPONENT, "Name": snap_name}], "Snapshots": other_components_sources
"ForceOverwrite": FORCE_OVERWRITE, + [{"Component": args.repo_component, "Name": snap_name}],
"ForceOverwrite": args.force_overwrite,
} }
result = session.put(url, json=payload) result = session.put(url, json=payload)
if result.status_code < 200 or result.status_code > 299: if result.status_code < 200 or result.status_code > 299:
print( logging.error(
"Fail to update published snapshot of distribution " "Failed to update published snapshot of distribution '%s' to '%s' "
f'"{distribution}" to "{snap_name}" (prefix: {PREFIX}, ' "(prefix: '%s', component: '%s'). See APTLY API logs for details.",
f"component: {REPO_COMPONENT}). See APTLY API logs for details." distribution,
snap_name,
args.prefix,
args.repo_component,
) )
sys.exit(1) sys.exit(1)
return True
print("Done.")
def from_env(name, default=None):
"""Retrieve a parameter from environment"""
for var in (f"PLUGIN_{name}", f"APTLY_{name}", f"APT_{name}"):
if var in os.environ:
return os.environ[var]
return default
parser = argparse.ArgumentParser()
parser.add_argument("--api-url", help="APTLY API URL", default=from_env("API_URL"))
parser.add_argument("--api-username", help="APTLY API username", default=from_env("API_USERNAME"))
parser.add_argument("--api-password", help="APTLY API password", default=from_env("API_PASSWORD"))
parser.add_argument("--max-retries", default=from_env("MAX_RETRIES"))
parser.add_argument("--repo-name", default=from_env("REPO_NAME"))
parser.add_argument("--prefix", default=from_env("PREFIX", "."))
parser.add_argument("--repo-component", default=from_env("REPO_COMPONENT", "main"))
parser.add_argument("--path", default=from_env("PATH", "dist"))
parser.add_argument("--source-package-name", default=from_env("SOURCE_PACKAGE_NAME"))
parser.add_argument(
"--force-overwrite",
action="store_true",
default=from_env("FORCE_OVERWRITE", "false").lower() in ["1", "true", "yes", "on"],
)
parser.add_argument("-d", "--debug", action="store_true", help="Enable debug mode")
args = parser.parse_args()
# Handle parameters from environment
if not args.api_url:
parser.error("API URL not provided")
if not args.api_username:
parser.error("API username not provided")
if not args.api_password:
parser.error("API password not provided")
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO, format="%(levelname)s - %(message)s"
)
# Initialize Aptly API client
session = Session()
session.auth = (args.api_username, args.api_password)
if args.max_retries:
retries = Retry(total=int(args.max_retries), status_forcelist=list(range(500, 600)))
session.mount(args.api_url, HTTPAdapter(max_retries=retries))
to_publish = []
changes_files, deb_packages = list_changes_and_deb_packages(args)
if not changes_files and not deb_packages:
logging.error("No changes file or Debian package found in %s", args.path)
sys.exit(1)
for changes_file in changes_files:
pub = handle_changes_file(args, session, changes_file)
if pub not in to_publish:
to_publish.append(pub)
for deb_package in deb_packages:
if deb_package in included_in_changes_files:
logging.debug(
"Debian file %s already handled by %s",
deb_package,
included_in_changes_files[deb_package],
)
continue
pub = handle_deb_package(args, session, deb_package)
if pub not in to_publish:
to_publish.append(pub)
for pub_args in to_publish:
publish_repo(args, session, *pub_args)
logging.info("Done.")