aptly-publish/aptly-publish
Benjamin Renard ce4d6a65e3 Improvments to correctly handle prefix, component name & distribution to compute repository and snapshot name
The repository name could now be computed from prefix, distribution and
component names. Format:
  {prefix}-{distribution}-{component}

Note: if the default prefix is specified ("."), it will not be used to
compute the repository name.

So, all works is now done by itering on detected changes files. Foreach
of then:
 - the changes file is parsed to detect the source package name, the
   distribution and included files
 - the repository name is computed (if not specified in environment)
 - the current published distribution is retreived using APTLY publish
   API to:
   - check it was already manally published a first time
   - check it used a snapshot kind of sources
   - retreive other components source snapshot
 - Upload the changes file and all its included files using APTLY File
   Upload API in a directory named as the source package
 - Include the changes file using APTLY Local Repos API
 - Compute a snapshot name for the repository based on the current date
   and the repository name. Format: YYYYMMDD-HHMMSS-{repository name}
 - Create a snapshot of the repository using APTLY Local Repos API
 - Update the published distribution with this new snapshot as source of
   the specified component (default: main) and keeping other components
   source snapshot.
2022-12-13 20:14:01 +01:00

306 lines
9.5 KiB
Python
Executable file

#!/usr/bin/python3
""""
Entrypoint of a Woodpecker CI docker image plugin that permit to publish Debian
packages on a Aptly repository using its API
"""
import datetime
import os
import re
import sys
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from debian_parser import PackagesParser
def from_env(name, default=None):
""" Retrieve a parameter from environment """
for var in (f'PLUGIN_{name}', f'APTLY_{name}'):
if var in os.environ:
return os.environ[var]
return default
# Handle parameters from environment
API_URL = from_env('API_URL', None)
if not API_URL:
print('API URL not provided')
sys.exit(1)
API_USERNAME = from_env('API_USERNAME', None)
if not API_USERNAME:
print('API username not provided')
sys.exit(1)
API_PASSWORD = from_env('API_PASSWORD', None)
if not API_PASSWORD:
print('API password not provided')
sys.exit(1)
MAX_RETRY = from_env('MAX_RETRIES', None)
REPO_NAME = from_env('REPO_NAME', None)
PREFIX = from_env('PREFIX', '.')
REPO_COMPONENT = from_env('REPO_COMPONENT', 'main')
INPUT_PATH = from_env('PATH', 'dist')
SOURCE_NAME = from_env('SOURCE_PACKAGE_NAME', None)
DISTRIBUTIONS = []
# List changes files
changes_files_regex = (
# pylint: disable=consider-using-f-string
re.compile(r'^%s_.*\.changes$' % SOURCE_NAME)
if SOURCE_NAME else
re.compile(r'^.*\.changes$')
)
changes_files = []
try:
for filename in os.listdir(INPUT_PATH):
filepath = os.path.join(INPUT_PATH, filename)
if not os.path.isfile(filepath):
continue
if changes_files_regex.match(filename):
changes_files.append(filepath)
except FileNotFoundError:
print(f'Specified directory path "{INPUT_PATH}" not found')
sys.exit(1)
except NotADirectoryError:
print(f'Specified path "{INPUT_PATH}" is not a directory')
sys.exit(1)
if not changes_files:
print(f'No changes file found in {INPUT_PATH}')
sys.exit(1)
# Initialize Aptly API client
session = Session()
session.auth = (API_USERNAME, API_PASSWORD)
if MAX_RETRY:
retries = Retry(
total=int(MAX_RETRY),
status_forcelist=list(range(500, 600))
)
session.mount(API_URL, HTTPAdapter(max_retries=retries))
def get_repo_name(dist):
""" Compute and retreive repository name """
if REPO_NAME:
return REPO_NAME
value = f'{dist}-{REPO_COMPONENT}'
if PREFIX != ".":
value = f'{PREFIX}-{value}'
return value
def parse_changes_file(filepath):
""" Parse changes file to detect distribution and included files """
dirpath = os.path.dirname(filepath)
with open(filepath, "r", encoding="utf-8") as file_desc:
changes_file = file_desc.read()
parser = PackagesParser(changes_file)
package_name = None
distribution = None
files = []
for infos in parser.parse():
for info in infos:
if info['tag'].lower() == 'files':
for line in info['value'].split(' '):
if not line:
continue
files.append(os.path.join(dirpath, line.split()[-1]))
if info['tag'].lower() == 'distribution':
if distribution:
print(
'More than one distribution found in changes file'
f'{os.path.basename(filepath)}.')
sys.exit(1)
distribution = info['value']
if info['tag'].lower() == 'source':
if package_name:
print(
'More than one source package name found in changes '
f'file {os.path.basename(filepath)}.')
sys.exit(1)
package_name = info['value']
if not package_name:
print(
'Fail to detect source package name from changes file '
f'{os.path.basename(filepath)}.')
sys.exit(1)
if not distribution:
print(
'Fail to detect distribution from changes file '
f'{os.path.basename(filepath)}.')
sys.exit(1)
if not files:
print(
'No included file found in changes file'
f'{os.path.basename(filepath)}.')
sys.exit(1)
return (package_name, distribution, files)
def get_published_distribution_other_components_sources(distribution):
""" Retreive current published distribution using Aptly API """
url = f'{API_URL}/publish'
result = session.get(url)
if result.status_code != 200:
print(
'Fail to retreive current published distribution '
f'{distribution} using Aptly API (HTTP code: {result.status_code})'
)
sys.exit(1)
for data in result.json():
if data['Prefix'] != PREFIX:
continue
if data['Distribution'] != distribution:
continue
if data['SourceKind'] != 'snapshot':
print(
f'The distribution {distribution} currently published on '
f'prefix "{PREFIX}" do not sourcing packages from snapshot(s) '
f'but from {data["SourceKind"]}.'
)
sys.exit(1)
return [
source for source in data['Sources']
if source['Component'] != REPO_COMPONENT
]
print(
f'Distribution {distribution} seem not currently published on prefix '
f'"{PREFIX}". Please manually publish it a first time before using '
f'{os.path.basename(sys.argv[0])}.'
)
sys.exit(1)
return False
def upload_file(package_name, filepath):
""" Upload a file using Aptly API """
url = f'{API_URL}/files/{package_name}'
with open(filepath, 'rb') as file_desc:
result = session.post(url, files={'file': file_desc})
return (
result.status_code == 200 and
f'{package_name}/{os.path.basename(filepath)}' in result.json()
)
def include_file(repo_name, package_name, changes_file):
""" Include a changes file using Aptly API """
url = (
f'{API_URL}/repos/{repo_name}/include/{package_name}/'
f'{os.path.basename(changes_file)}'
)
result = session.post(url)
data = result.json()
if data.get('FailedFiles'):
print()
print(f'Some error occurred including {changes_file}:')
print('Failed files:')
for failed_file in data['FailedFiles']:
print(f' - {os.path.basename(failed_file)}')
if data.get('Report', {}).get('Warnings'):
print('Warnings:')
print(' - %s' % '\n - '.join(data['Report']['Warnings']))
print()
return False
if not (
result.status_code == 200 and
data.get('Report', {}).get('Added')
):
print(
f'Unknown error occurred including {changes_file}'
'See APTLY API logs for details.')
return False
return True
for changes_file in changes_files:
print(f'Handle changes file {changes_file}:')
package_name, distribution, filepaths = parse_changes_file(changes_file)
filepaths += [changes_file]
repo_name = get_repo_name(distribution)
other_components_sources = \
get_published_distribution_other_components_sources(distribution)
print(' - Upload files:')
for filepath in filepaths:
if not upload_file(package_name, filepath):
print(
f' - {filepath}: fail to upload file. See APTLY API logs '
'for details.')
sys.exit(1)
else:
print(f' - {filepath}')
print(f' - Include changes file {changes_file}:')
if include_file(repo_name, package_name, changes_file):
print(' - Changes file included')
else:
sys.exit(1)
# Create a snapshot of the repository
snap_name = datetime.datetime.now().strftime(f'%Y%m%d-%H%M%S-{repo_name}')
print(f'Create new snapshot "{snap_name}" of repository "{repo_name}"')
url = f'{API_URL}/repos/{repo_name}/snapshots'
payload = {'Name': snap_name}
result = session.post(url, json=payload)
try:
data = result.json()
except Exception: # pylint: disable=broad-except
data = {}
error = (
result.status_code < 200 or
result.status_code > 299 or
data.get('Name') != snap_name or
not data.get('CreatedAt')
)
if error:
print(
f'Fail to create snapshot "{snap_name}" of repository '
f'"{repo_name}". See APTLY API logs for details.')
sys.exit(1)
# Update published snapshot of the distribution
print(
f'Update published snapshot of distribution "{distribution}" to '
f'"{snap_name}" (prefix: {PREFIX}, component: {REPO_COMPONENT})')
if other_components_sources:
print('Note: keep other currently published components:')
for source in other_components_sources:
print(f'- {source["Component"]}: {source["Name"]}')
url = f'{API_URL}/publish/:{PREFIX}/{distribution}'
payload = {
'Snapshots': other_components_sources + [
{
'Component': REPO_COMPONENT,
'Name': snap_name
}
]
}
result = session.put(url, json=payload)
if (
result.status_code < 200 or
result.status_code > 299
):
print(
'Fail to update published snapshot of distribution '
f'"{distribution}" to "{snap_name}" (prefix: {PREFIX}, '
f'component: {REPO_COMPONENT}). See APTLY API logs for details.')
sys.exit(1)
print("Done.")