aptly-publish/aptly-publish

281 lines
9.3 KiB
Python
Executable file

#!/usr/bin/python3
""""
Entrypoint of a Woodpecker CI docker image plugin that permit to publish Debian
packages on a Aptly repository using its API
"""
import datetime
import os
import re
import sys
from debian_parser import PackagesParser
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
def from_env(name, default=None):
"""Retrieve a parameter from environment"""
for var in (f"PLUGIN_{name}", f"APTLY_{name}"):
if var in os.environ:
return os.environ[var]
return default
# Handle parameters from environment
API_URL = from_env("API_URL", None)
if not API_URL:
print("API URL not provided")
sys.exit(1)
API_USERNAME = from_env("API_USERNAME", None)
if not API_USERNAME:
print("API username not provided")
sys.exit(1)
API_PASSWORD = from_env("API_PASSWORD", None)
if not API_PASSWORD:
print("API password not provided")
sys.exit(1)
MAX_RETRY = from_env("MAX_RETRIES", None)
REPO_NAME = from_env("REPO_NAME", None)
PREFIX = from_env("PREFIX", ".")
REPO_COMPONENT = from_env("REPO_COMPONENT", "main")
INPUT_PATH = from_env("PATH", "dist")
SOURCE_NAME = from_env("SOURCE_PACKAGE_NAME", None)
FORCE_OVERWRITE = from_env("FORCE_OVERWRITE", "false").lower() in ["1", "true", "yes"]
# List changes files
changes_files_regex = (
# pylint: disable=consider-using-f-string
re.compile(r"^%s_.*\.changes$" % SOURCE_NAME)
if SOURCE_NAME
else re.compile(r"^.*\.changes$")
)
changes_files = []
try:
for filename in os.listdir(INPUT_PATH):
filepath = os.path.join(INPUT_PATH, filename)
if not os.path.isfile(filepath):
continue
if changes_files_regex.match(filename):
changes_files.append(filepath)
except FileNotFoundError:
print(f'Specified directory path "{INPUT_PATH}" not found')
sys.exit(1)
except NotADirectoryError:
print(f'Specified path "{INPUT_PATH}" is not a directory')
sys.exit(1)
if not changes_files:
print(f"No changes file found in {INPUT_PATH}")
sys.exit(1)
# Initialize Aptly API client
session = Session()
session.auth = (API_USERNAME, API_PASSWORD)
if MAX_RETRY:
retries = Retry(total=int(MAX_RETRY), status_forcelist=list(range(500, 600)))
session.mount(API_URL, HTTPAdapter(max_retries=retries))
def get_repo_name(dist):
"""Compute and retrieve repository name"""
if REPO_NAME:
return REPO_NAME
value = f"{dist}_{REPO_COMPONENT}"
if PREFIX != ".":
value = f"{PREFIX}_{value}"
return value
def parse_changes_file(filepath):
"""Parse changes file to detect distribution and included files"""
dirpath = os.path.dirname(filepath)
with open(filepath, encoding="utf-8") as file_desc:
changes_file = file_desc.read()
parser = PackagesParser(changes_file)
package_name = None
distribution = None
files = []
for infos in parser.parse():
for info in infos:
if info["tag"].lower() == "files":
for line in info["value"].split(" "):
if not line:
continue
files.append(os.path.join(dirpath, line.split()[-1]))
if info["tag"].lower() == "distribution":
if distribution:
print(
"More than one distribution found in changes file"
f"{os.path.basename(filepath)}."
)
sys.exit(1)
distribution = info["value"]
if info["tag"].lower() == "source":
if package_name:
print(
"More than one source package name found in changes "
f"file {os.path.basename(filepath)}."
)
sys.exit(1)
package_name = info["value"]
if not package_name:
print(
"Fail to detect source package name from changes file " f"{os.path.basename(filepath)}."
)
sys.exit(1)
if not distribution:
print("Fail to detect distribution from changes file " f"{os.path.basename(filepath)}.")
sys.exit(1)
if not files:
print("No included file found in changes file" f"{os.path.basename(filepath)}.")
sys.exit(1)
return (package_name, distribution, files)
def get_published_distribution_other_components_sources(distribution):
"""Retrieve current published distribution using Aptly API"""
url = f"{API_URL}/publish"
result = session.get(url)
if result.status_code != 200:
print(
"Fail to retrieve current published distribution "
f"{distribution} using Aptly API (HTTP code: {result.status_code})"
)
sys.exit(1)
for data in result.json():
if data["Prefix"] != PREFIX:
continue
if data["Distribution"] != distribution:
continue
if data["SourceKind"] != "snapshot":
print(
f"The distribution {distribution} currently published on "
f'prefix "{PREFIX}" do not sourcing packages from snapshot(s) '
f'but from {data["SourceKind"]}.'
)
sys.exit(1)
return [source for source in data["Sources"] if source["Component"] != REPO_COMPONENT]
print(
f"Distribution {distribution} seem not currently published on prefix "
f'"{PREFIX}". Please manually publish it a first time before using '
f"{os.path.basename(sys.argv[0])}."
)
sys.exit(1)
def upload_file(package_name, filepath):
"""Upload a file using Aptly API"""
url = f"{API_URL}/files/{package_name}"
with open(filepath, "rb") as file_desc:
result = session.post(url, files={"file": file_desc})
return (
result.status_code == 200
and f"{package_name}/{os.path.basename(filepath)}" in result.json()
)
def include_file(repo_name, package_name, changes_file):
"""Include a changes file using Aptly API"""
url = f"{API_URL}/repos/{repo_name}/include/{package_name}/" f"{os.path.basename(changes_file)}"
result = session.post(url)
data = result.json()
if data.get("FailedFiles"):
print()
print(f"Some error occurred including {changes_file}:")
print("Failed files:")
for failed_file in data["FailedFiles"]:
print(f" - {os.path.basename(failed_file)}")
if data.get("Report", {}).get("Warnings"):
print("Warnings:")
print(" - %s" % "\n - ".join(data["Report"]["Warnings"]))
print()
return False
if not (result.status_code == 200 and data.get("Report", {}).get("Added")):
print(f"Unknown error occurred including {changes_file}" "See APTLY API logs for details.")
return False
return True
for changes_file in changes_files:
print(f"Handle changes file {changes_file}:")
package_name, distribution, filepaths = parse_changes_file(changes_file)
filepaths += [changes_file]
repo_name = get_repo_name(distribution)
other_components_sources = get_published_distribution_other_components_sources(distribution)
print(" - Upload files:")
for filepath in filepaths:
if not upload_file(package_name, filepath):
print(f" - {filepath}: fail to upload file. See APTLY API logs " "for details.")
sys.exit(1)
else:
print(f" - {filepath}")
print(f" - Include changes file {changes_file}:")
if include_file(repo_name, package_name, changes_file):
print(" - Changes file included")
else:
sys.exit(1)
# Create a snapshot of the repository
snap_name = datetime.datetime.now().strftime(f"%Y%m%d-%H%M%S_{repo_name}")
print(f'Create new snapshot "{snap_name}" of repository "{repo_name}"')
url = f"{API_URL}/repos/{repo_name}/snapshots"
payload = {"Name": snap_name}
result = session.post(url, json=payload)
try:
data = result.json()
except Exception: # pylint: disable=broad-except
data = {}
error = (
result.status_code < 200
or result.status_code > 299
or data.get("Name") != snap_name
or not data.get("CreatedAt")
)
if error:
print(
f'Fail to create snapshot "{snap_name}" of repository '
f'"{repo_name}". See APTLY API logs for details.'
)
sys.exit(1)
# Update published snapshot of the distribution
print(
f'Update published snapshot of distribution "{distribution}" to '
f'"{snap_name}" (prefix: {PREFIX}, component: {REPO_COMPONENT})'
)
if other_components_sources:
print("Note: keep other currently published components:")
for source in other_components_sources:
print(f'- {source["Component"]}: {source["Name"]}')
url = f"{API_URL}/publish/:{PREFIX}/{distribution}"
payload = {
"Snapshots": other_components_sources + [{"Component": REPO_COMPONENT, "Name": snap_name}],
"ForceOverwrite": FORCE_OVERWRITE,
}
result = session.put(url, json=payload)
if result.status_code < 200 or result.status_code > 299:
print(
"Fail to update published snapshot of distribution "
f'"{distribution}" to "{snap_name}" (prefix: {PREFIX}, '
f"component: {REPO_COMPONENT}). See APTLY API logs for details."
)
sys.exit(1)
print("Done.")