aptly-publish/aptly-publish
Benjamin Renard 65a6445ce7 Use underscore instead of dash to compute repository/snapshot name
Dash are commonly used in distribution, prefix and component name.
Prefer using underscore to easily split distribution/prefix/component
from names.
2022-12-14 12:49:04 +01:00

304 lines
9.5 KiB
Python
Executable file

#!/usr/bin/python3
""""
Entrypoint of a Woodpecker CI docker image plugin that permit to publish Debian
packages on a Aptly repository using its API
"""
import datetime
import os
import re
import sys
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from debian_parser import PackagesParser
def from_env(name, default=None):
""" Retrieve a parameter from environment """
for var in (f'PLUGIN_{name}', f'APTLY_{name}'):
if var in os.environ:
return os.environ[var]
return default
# Handle parameters from environment
API_URL = from_env('API_URL', None)
if not API_URL:
print('API URL not provided')
sys.exit(1)
API_USERNAME = from_env('API_USERNAME', None)
if not API_USERNAME:
print('API username not provided')
sys.exit(1)
API_PASSWORD = from_env('API_PASSWORD', None)
if not API_PASSWORD:
print('API password not provided')
sys.exit(1)
MAX_RETRY = from_env('MAX_RETRIES', None)
REPO_NAME = from_env('REPO_NAME', None)
PREFIX = from_env('PREFIX', '.')
REPO_COMPONENT = from_env('REPO_COMPONENT', 'main')
INPUT_PATH = from_env('PATH', 'dist')
SOURCE_NAME = from_env('SOURCE_PACKAGE_NAME', None)
# List changes files
changes_files_regex = (
# pylint: disable=consider-using-f-string
re.compile(r'^%s_.*\.changes$' % SOURCE_NAME)
if SOURCE_NAME else
re.compile(r'^.*\.changes$')
)
changes_files = []
try:
for filename in os.listdir(INPUT_PATH):
filepath = os.path.join(INPUT_PATH, filename)
if not os.path.isfile(filepath):
continue
if changes_files_regex.match(filename):
changes_files.append(filepath)
except FileNotFoundError:
print(f'Specified directory path "{INPUT_PATH}" not found')
sys.exit(1)
except NotADirectoryError:
print(f'Specified path "{INPUT_PATH}" is not a directory')
sys.exit(1)
if not changes_files:
print(f'No changes file found in {INPUT_PATH}')
sys.exit(1)
# Initialize Aptly API client
session = Session()
session.auth = (API_USERNAME, API_PASSWORD)
if MAX_RETRY:
retries = Retry(
total=int(MAX_RETRY),
status_forcelist=list(range(500, 600))
)
session.mount(API_URL, HTTPAdapter(max_retries=retries))
def get_repo_name(dist):
""" Compute and retreive repository name """
if REPO_NAME:
return REPO_NAME
value = f'{dist}_{REPO_COMPONENT}'
if PREFIX != ".":
value = f'{PREFIX}_{value}'
return value
def parse_changes_file(filepath):
""" Parse changes file to detect distribution and included files """
dirpath = os.path.dirname(filepath)
with open(filepath, "r", encoding="utf-8") as file_desc:
changes_file = file_desc.read()
parser = PackagesParser(changes_file)
package_name = None
distribution = None
files = []
for infos in parser.parse():
for info in infos:
if info['tag'].lower() == 'files':
for line in info['value'].split(' '):
if not line:
continue
files.append(os.path.join(dirpath, line.split()[-1]))
if info['tag'].lower() == 'distribution':
if distribution:
print(
'More than one distribution found in changes file'
f'{os.path.basename(filepath)}.')
sys.exit(1)
distribution = info['value']
if info['tag'].lower() == 'source':
if package_name:
print(
'More than one source package name found in changes '
f'file {os.path.basename(filepath)}.')
sys.exit(1)
package_name = info['value']
if not package_name:
print(
'Fail to detect source package name from changes file '
f'{os.path.basename(filepath)}.')
sys.exit(1)
if not distribution:
print(
'Fail to detect distribution from changes file '
f'{os.path.basename(filepath)}.')
sys.exit(1)
if not files:
print(
'No included file found in changes file'
f'{os.path.basename(filepath)}.')
sys.exit(1)
return (package_name, distribution, files)
def get_published_distribution_other_components_sources(distribution):
""" Retreive current published distribution using Aptly API """
url = f'{API_URL}/publish'
result = session.get(url)
if result.status_code != 200:
print(
'Fail to retreive current published distribution '
f'{distribution} using Aptly API (HTTP code: {result.status_code})'
)
sys.exit(1)
for data in result.json():
if data['Prefix'] != PREFIX:
continue
if data['Distribution'] != distribution:
continue
if data['SourceKind'] != 'snapshot':
print(
f'The distribution {distribution} currently published on '
f'prefix "{PREFIX}" do not sourcing packages from snapshot(s) '
f'but from {data["SourceKind"]}.'
)
sys.exit(1)
return [
source for source in data['Sources']
if source['Component'] != REPO_COMPONENT
]
print(
f'Distribution {distribution} seem not currently published on prefix '
f'"{PREFIX}". Please manually publish it a first time before using '
f'{os.path.basename(sys.argv[0])}.'
)
sys.exit(1)
return False
def upload_file(package_name, filepath):
""" Upload a file using Aptly API """
url = f'{API_URL}/files/{package_name}'
with open(filepath, 'rb') as file_desc:
result = session.post(url, files={'file': file_desc})
return (
result.status_code == 200 and
f'{package_name}/{os.path.basename(filepath)}' in result.json()
)
def include_file(repo_name, package_name, changes_file):
""" Include a changes file using Aptly API """
url = (
f'{API_URL}/repos/{repo_name}/include/{package_name}/'
f'{os.path.basename(changes_file)}'
)
result = session.post(url)
data = result.json()
if data.get('FailedFiles'):
print()
print(f'Some error occurred including {changes_file}:')
print('Failed files:')
for failed_file in data['FailedFiles']:
print(f' - {os.path.basename(failed_file)}')
if data.get('Report', {}).get('Warnings'):
print('Warnings:')
print(' - %s' % '\n - '.join(data['Report']['Warnings']))
print()
return False
if not (
result.status_code == 200 and
data.get('Report', {}).get('Added')
):
print(
f'Unknown error occurred including {changes_file}'
'See APTLY API logs for details.')
return False
return True
for changes_file in changes_files:
print(f'Handle changes file {changes_file}:')
package_name, distribution, filepaths = parse_changes_file(changes_file)
filepaths += [changes_file]
repo_name = get_repo_name(distribution)
other_components_sources = \
get_published_distribution_other_components_sources(distribution)
print(' - Upload files:')
for filepath in filepaths:
if not upload_file(package_name, filepath):
print(
f' - {filepath}: fail to upload file. See APTLY API logs '
'for details.')
sys.exit(1)
else:
print(f' - {filepath}')
print(f' - Include changes file {changes_file}:')
if include_file(repo_name, package_name, changes_file):
print(' - Changes file included')
else:
sys.exit(1)
# Create a snapshot of the repository
snap_name = datetime.datetime.now().strftime(f'%Y%m%d-%H%M%S_{repo_name}')
print(f'Create new snapshot "{snap_name}" of repository "{repo_name}"')
url = f'{API_URL}/repos/{repo_name}/snapshots'
payload = {'Name': snap_name}
result = session.post(url, json=payload)
try:
data = result.json()
except Exception: # pylint: disable=broad-except
data = {}
error = (
result.status_code < 200 or
result.status_code > 299 or
data.get('Name') != snap_name or
not data.get('CreatedAt')
)
if error:
print(
f'Fail to create snapshot "{snap_name}" of repository '
f'"{repo_name}". See APTLY API logs for details.')
sys.exit(1)
# Update published snapshot of the distribution
print(
f'Update published snapshot of distribution "{distribution}" to '
f'"{snap_name}" (prefix: {PREFIX}, component: {REPO_COMPONENT})')
if other_components_sources:
print('Note: keep other currently published components:')
for source in other_components_sources:
print(f'- {source["Component"]}: {source["Name"]}')
url = f'{API_URL}/publish/:{PREFIX}/{distribution}'
payload = {
'Snapshots': other_components_sources + [
{
'Component': REPO_COMPONENT,
'Name': snap_name
}
]
}
result = session.put(url, json=payload)
if (
result.status_code < 200 or
result.status_code > 299
):
print(
'Fail to update published snapshot of distribution '
f'"{distribution}" to "{snap_name}" (prefix: {PREFIX}, '
f'component: {REPO_COMPONENT}). See APTLY API logs for details.')
sys.exit(1)
print("Done.")