aptly-publish/aptly-publish

434 lines
15 KiB
Python
Executable file

#!/usr/bin/python3
"""
Entrypoint of a Woodpecker CI docker image plugin that permit to publish Debian
packages on a Aptly repository using its API
"""
import argparse
import datetime
import logging
import os
import re
import sys
from debian.deb822 import Changes
from debian.debfile import DebFile
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
included_in_changes_files = {}
def get_repo_name(args, dist):
"""Compute and retrieve repository name"""
if args.repo_name:
return args.repo_name
value = f"{dist}_{args.repo_component}"
if args.prefix != ".":
value = f"{args.prefix}_{value}"
return value
def parse_changes_file(filepath):
"""Parse changes file to detect source package name, distribution and included files"""
dirpath = os.path.dirname(filepath)
with open(filepath, encoding="utf-8") as file_desc:
changes = Changes(file_desc)
package_name = changes["Source"]
distribution = changes["Distribution"]
files = [os.path.join(dirpath, f["name"]) for f in changes["files"]]
if not package_name:
logging.error(
"Failed to detect source package name from changes file %s.", os.path.basename(filepath)
)
sys.exit(1)
if not distribution:
logging.error(
"Failed to detect distribution from changes file %s.", os.path.basename(filepath)
)
sys.exit(1)
if not files:
logging.error("No included file found in changes file %s.", os.path.basename(filepath))
sys.exit(1)
return package_name, distribution, files
def parse_deb_package(filepath):
"""Parse Debian package to detect package name and distribution"""
with open(filepath, "rb") as f:
deb_package = DebFile(fileobj=f)
# Lire les métadonnées
control = deb_package.debcontrol()
changelog = deb_package.changelog()
return control["Package"], control["Version"], changelog.distributions
def get_published_distribution_other_components_sources(args, distribution):
"""Retrieve current published distribution using Aptly API"""
url = f"{args.api_url}/publish"
result = session.get(url)
if result.status_code != 200:
logging.error(
"Failed to retrieve current published distribution %s using Aptly API (HTTP code: %s)",
distribution,
result.status_code,
)
sys.exit(1)
data = result.json()
logging.debug(
"get_published_distribution_other_components_sources(%s): API return (%d): %s",
distribution,
result.status_code,
data,
)
for publish in data:
if publish["Prefix"] != args.prefix:
continue
if publish["Distribution"] != distribution:
continue
if publish["SourceKind"] != "snapshot":
logging.error(
"The distribution %s currently published on prefix '%s' do not sourcing packages "
"from snapshot(s) but from %s.",
distribution,
args.prefix,
publish["SourceKind"],
)
sys.exit(1)
return [
source for source in publish["Sources"] if source["Component"] != args.repo_component
]
logging.error(
"Distribution %s seem not currently published on prefix '%s'. Please manually publish it "
"a first time before using %s.",
distribution,
args.prefix,
os.path.basename(sys.argv[0]),
)
sys.exit(1)
def upload_file(session, package_name, filepath):
"""Upload a file using Aptly API"""
url = f"{args.api_url}/files/{package_name}"
with open(filepath, "rb") as file_desc:
result = session.post(url, files={"file": file_desc})
data = result.json()
logging.debug(
"upload_file(%s, %s): API return (%d): %s", package_name, filepath, result.status_code, data
)
return result.status_code == 200 and f"{package_name}/{os.path.basename(filepath)}" in data
def include_changes_file(repo_name, package_name, changes_file):
"""Include a changes file using Aptly API"""
url = (
f"{args.api_url}/repos/{repo_name}/include/{package_name}/"
f"{os.path.basename(changes_file)}"
)
result = session.post(url)
data = result.json()
logging.debug(
"include_changes_file(%s, %s, %s): API return (%d): %s",
repo_name,
package_name,
changes_file,
result.status_code,
data,
)
if data.get("FailedFiles"):
logging.error(
"Some error occurred including %s:\nFailed files:\n - %s\nWarnings: - %s",
changes_file,
"\n - ".join(data["FailedFiles"]),
"\n - ".join(data.get("Report", {}).get("Warnings"))
if data.get("Report", {}).get("Warnings")
else "No warning",
)
return False
if not (result.status_code == 200 and data.get("Report", {}).get("Added")):
logging.error(
"Unknown error occurred including %s. See APTLY API logs for details.", changes_file
)
return False
return True
def include_deb_package(repo_name, package_name, path):
"""Include a changes file using Aptly API"""
url = f"{args.api_url}/repos/{repo_name}/file/{package_name}/{os.path.basename(path)}"
result = session.post(url)
data = result.json()
logging.debug(
"include_deb_package(%s, %s, %s): API return (%d): %s",
repo_name,
package_name,
path,
result.status_code,
data,
)
if data.get("FailedFiles"):
logging.error(
"Some error occurred including %s:\nFailed files:\n - %s\nWarnings: - %s",
path,
"\n - ".join(data["FailedFiles"]),
"\n - ".join(data.get("Report", {}).get("Warnings"))
if data.get("Report", {}).get("Warnings")
else "No warning",
)
return False
if not (result.status_code == 200 and data.get("Report", {}).get("Added")):
logging.error("Unknown error occurred including %s. See APTLY API logs for details.", path)
return False
return True
def list_changes_and_deb_packages(args):
"""List changes files according to specified script arguments"""
changes_files_regex = (
# pylint: disable=consider-using-f-string
re.compile(r"^%s_.*\.changes$" % args.source_package_name, re.I)
if args.source_package_name
else re.compile(r"^.*\.changes$", re.I)
)
deb_packages_regex = re.compile(r"^.*\.deb$", re.I)
changes_files = []
deb_packages = []
try:
for filename in os.listdir(args.path):
filepath = os.path.join(args.path, filename)
if not os.path.isfile(filepath):
continue
if changes_files_regex.match(filename):
changes_files.append(filepath)
elif deb_packages_regex.match(filename):
deb_packages.append(filepath)
except FileNotFoundError:
logging.error('Specified directory path "%s" not found', args.path)
sys.exit(1)
except NotADirectoryError:
logging.error('Specified path "%s" is not a directory', args.path)
sys.exit(1)
return changes_files, deb_packages
def handle_changes_file(args, session, changes_file):
"""Handle one changes file"""
logging.info("Handle changes file %s:", changes_file)
package_name, distribution, filepaths = parse_changes_file(changes_file)
logging.info(" Detected package: %s (distribution=%s)", package_name, distribution)
logging.debug(" Included files:\n %s", "\n ".join(filepaths))
filepaths.append(changes_file)
repo_name = get_repo_name(args, distribution)
other_components_sources = get_published_distribution_other_components_sources(
args, distribution
)
logging.info(" Uploading files:")
for filepath in filepaths:
if not upload_file(session, package_name, filepath):
logging.error("Failed to upload %s. See APTLY API logs for details.", filepath)
sys.exit(1)
logging.info(" %s", filepath)
included_in_changes_files[filepath] = changes_file
logging.info(" Including changes file %s...", changes_file)
if include_changes_file(repo_name, package_name, changes_file):
logging.info(" Changes file %s included", changes_file)
else:
sys.exit(1)
return distribution, repo_name, other_components_sources
def handle_deb_package(args, session, path):
"""Handle one debian package"""
logging.info("Handle Debian package %s:", path)
package_name, version, distribution = parse_deb_package(path)
logging.info(
" Detected package: %s (version=%s, distribution=%s)", package_name, version, distribution
)
repo_name = get_repo_name(args, distribution)
other_components_sources = get_published_distribution_other_components_sources(
args, distribution
)
logging.info(" Uploading Debian package %s...", path)
if not upload_file(session, package_name, path):
logging.error("Failed to upload file. See APTLY API logs for details.")
sys.exit(1)
logging.info(" Debian package %s uploaded.", path)
logging.info(" Adding Debian package %s...", path)
if not include_deb_package(repo_name, package_name, path):
logging.error("Failed to add file. See APTLY API logs for details.")
sys.exit(1)
logging.info(" Debian package %s added.", path)
return distribution, repo_name, other_components_sources
def publish_repo(args, session, distribution, repo_name, other_components_sources):
"""Publish repository of specified distributions"""
# Create a snapshot of the repository
snap_name = datetime.datetime.now().strftime(f"%Y%m%d-%H%M%S_{repo_name}")
logging.info('Create new snapshot "%s" of repository "%s"', snap_name, repo_name)
url = f"{args.api_url}/repos/{repo_name}/snapshots"
payload = {"Name": snap_name}
result = session.post(url, json=payload)
try:
data = result.json()
logging.debug(
"publish_repo(%s, %s): API return (%d): %s",
distribution,
repo_name,
result.status_code,
data,
)
except ValueError:
logging.debug(
"publish_repo(%s, %s): API return (%d), fail to decode JSON result: '%s'",
distribution,
repo_name,
result.status_code,
result.text,
)
data = {}
error = (
result.status_code < 200
or result.status_code > 299
or data.get("Name") != snap_name
or not data.get("CreatedAt")
)
if error:
logging.error(
'Failed to create snapshot "%s" of repository "%s". See APTLY API logs for details.',
snap_name,
repo_name,
)
sys.exit(1)
# Update published snapshot of the distribution
logging.info(
"Update published snapshot of distribution '%s' to '%s' (prefix: '%s', component: '%s')",
distribution,
snap_name,
args.prefix,
args.repo_component,
)
if other_components_sources:
logging.info("Note: keep other currently published components:")
for source in other_components_sources:
logging.info(" %s: %s", source["Component"], source["Name"])
url = f"{args.api_url}/publish/:{args.prefix}/{distribution}"
payload = {
"Snapshots": other_components_sources
+ [{"Component": args.repo_component, "Name": snap_name}],
"ForceOverwrite": args.force_overwrite,
}
result = session.put(url, json=payload)
if result.status_code < 200 or result.status_code > 299:
logging.error(
"Failed to update published snapshot of distribution '%s' to '%s' "
"(prefix: '%s', component: '%s'). See APTLY API logs for details.",
distribution,
snap_name,
args.prefix,
args.repo_component,
)
sys.exit(1)
return True
def from_env(name, default=None):
"""Retrieve a parameter from environment"""
for var in (f"PLUGIN_{name}", f"APTLY_{name}", f"APT_{name}"):
if var in os.environ:
return os.environ[var]
return default
parser = argparse.ArgumentParser()
parser.add_argument("--api-url", help="APTLY API URL", default=from_env("API_URL"))
parser.add_argument("--api-username", help="APTLY API username", default=from_env("API_USERNAME"))
parser.add_argument("--api-password", help="APTLY API password", default=from_env("API_PASSWORD"))
parser.add_argument("--max-retries", default=from_env("MAX_RETRIES"))
parser.add_argument("--repo-name", default=from_env("REPO_NAME"))
parser.add_argument("--prefix", default=from_env("PREFIX", "."))
parser.add_argument("--repo-component", default=from_env("REPO_COMPONENT", "main"))
parser.add_argument("--path", default=from_env("PATH", "dist"))
parser.add_argument("--source-package-name", default=from_env("SOURCE_PACKAGE_NAME"))
parser.add_argument(
"--force-overwrite",
action="store_true",
default=from_env("FORCE_OVERWRITE", "false").lower() in ["1", "true", "yes", "on"],
)
parser.add_argument(
"-d",
"--debug",
action="store_true",
help="Enable debug mode",
default=from_env("DEBUG", "false").lower() in ["1", "true", "yes", "on"],
)
args = parser.parse_args()
# Handle parameters from environment
if not args.api_url:
parser.error("API URL not provided")
if not args.api_username:
parser.error("API username not provided")
if not args.api_password:
parser.error("API password not provided")
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO, format="%(levelname)s - %(message)s"
)
# Initialize Aptly API client
session = Session()
session.auth = (args.api_username, args.api_password)
if args.max_retries:
retries = Retry(total=int(args.max_retries), status_forcelist=list(range(500, 600)))
session.mount(args.api_url, HTTPAdapter(max_retries=retries))
to_publish = []
changes_files, deb_packages = list_changes_and_deb_packages(args)
if not changes_files and not deb_packages:
logging.error("No changes file or Debian package found in %s", args.path)
sys.exit(1)
for changes_file in changes_files:
pub = handle_changes_file(args, session, changes_file)
if pub not in to_publish:
to_publish.append(pub)
for deb_package in deb_packages:
if deb_package in included_in_changes_files:
logging.debug(
"Debian file %s already handled by %s",
deb_package,
included_in_changes_files[deb_package],
)
continue
pub = handle_deb_package(args, session, deb_package)
if pub not in to_publish:
to_publish.append(pub)
for pub_args in to_publish:
publish_repo(args, session, *pub_args)
logging.info("Done.")