aptly-publish/entrypoint.py

212 lines
6 KiB
Python

#!/usr/bin/python3
""""
Entrypoint of a Woodpecker CI docker image plugin that permit to publish Debian
packages on a Aptly repository using its API
"""
import datetime
import os
import re
import sys
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from debian_parser import PackagesParser
# Handle parameters from environment
API_URL = os.environ.get('PLUGIN_API_URL', None)
if not API_URL:
print('API URL not provided')
sys.exit(1)
API_USERNAME = os.environ.get('PLUGIN_API_USERNAME', None)
if not API_USERNAME:
print('API username not provided')
sys.exit(1)
API_PASSWORD = os.environ.get('PLUGIN_API_PASSWORD', None)
if not API_PASSWORD:
print('API password not provided')
sys.exit(1)
MAX_RETRY = os.environ.get('PLUGIN_MAX_RETRIES', None)
REPO_NAME = os.environ.get('PLUGIN_REPO_NAME', 'stable')
REPO_COMPONENT = os.environ.get('PLUGIN_REPO_COMPONENT', 'main')
DIST = os.environ.get('PLUGIN_PATH', 'dist')
SOURCE_NAME = os.environ.get('PLUGIN_SOURCE_PACKAGE_NAME', None)
# List changes files
changes_files_regex = (
# pylint: disable=consider-using-f-string
re.compile(r'^%s_.*\.changes$' % SOURCE_NAME)
if SOURCE_NAME else
re.compile(r'^.*\.changes$')
)
changes_files = []
try:
for filename in os.listdir(DIST):
filepath = os.path.join(DIST, filename)
if not os.path.isfile(filepath):
continue
if changes_files_regex.match(filename):
changes_files.append(filepath)
except FileNotFoundError:
print(f'Specified directory path "{DIST}" not found')
sys.exit(1)
except NotADirectoryError:
print(f'Specified path "{DIST}" is not a directory')
sys.exit(1)
if not changes_files:
print(f'No changes file found in {DIST}')
sys.exit(1)
# Initialize Aptly API client
session = Session()
session.auth = (API_USERNAME, API_PASSWORD)
if MAX_RETRY:
retries = Retry(
total=int(MAX_RETRY),
status_forcelist=list(range(500, 600))
)
session.mount(API_URL, HTTPAdapter(max_retries=retries))
# List and upload files from changes files
def list_files_in_changes_file(filepath):
""" List files included by a changes file """
dirpath = os.path.dirname(filepath)
with open(filepath, "r", encoding="utf-8") as file_desc:
changes_file = file_desc.read()
parser = PackagesParser(changes_file)
files = []
for infos in parser.parse():
for info in infos:
if info['tag'].lower() != 'files':
continue
for line in info['value'].split(' '):
if not line:
continue
files.append(os.path.join(dirpath, line.split()[-1]))
return files
def changes_file2package_name(changes_file):
""" Retrieve package name from changes file name """
return os.path.basename(changes_file).split('_')[0]
def upload_file(package_name, filepath):
""" Upload a file using Aptly API """
url = f'{API_URL}/files/{package_name}'
with open(filepath, 'rb') as file_desc:
result = session.post(url, files={'file': file_desc})
return (
result.status_code == 200 and
f'{package_name}/{os.path.basename(filepath)}' in result.json()
)
def include_file(package_name, changes_file):
""" Include a changes file using Aptly API """
url = (
f'{API_URL}/repos/{REPO_NAME}/include/{package_name}/'
f'{os.path.basename(changes_file)}'
)
result = session.post(url)
data = result.json()
if data.get('FailedFiles'):
print()
print(f'Some error occurred including {changes_file}:')
print('Failed files:')
for failed_file in data['FailedFiles']:
print(f' - {os.path.basename(failed_file)}')
if data.get('Report', {}).get('Warnings'):
print('Warnings:')
print(' - %s' % '\n - '.join(data['Report']['Warnings']))
print()
return False
if not (
result.status_code == 200 and
data.get('Report', {}).get('Added')
):
print(f'Unknown error occurred including {changes_file}')
return False
return True
for changes_file in changes_files:
package_name = changes_file2package_name(changes_file)
print(f'Handle changes file {changes_file}:')
filepaths = [changes_file] + list_files_in_changes_file(changes_file)
print(' - Upload files:')
for filepath in filepaths:
if not upload_file(package_name, filepath):
print(
f' - {filepath}: fail to upload file, pass this changes '
'file'
)
sys.exit(1)
else:
print(f' - {filepath}')
print(f' - Include changes file {changes_file}:')
if include_file(package_name, changes_file):
print(' - Changes file included')
else:
sys.exit(1)
# Create a snapshot of the repository
snap_name = datetime.datetime.now().strftime(f'%Y%m%d-%H%M%S_{REPO_NAME}')
print(f'Create new snapshot "{snap_name}" of repository "{REPO_NAME}"')
url = f'{API_URL}/repos/{REPO_NAME}/snapshots'
payload = {'Name': snap_name}
result = session.post(url, json=payload)
try:
data = result.json()
except Exception: # pylint: disable=broad-except
data = {}
error = (
result.status_code < 200 or
result.status_code > 299 or
data.get('Name') != snap_name or
not data.get('CreatedAt')
)
if error:
print(f'Fail to create snapshot "{snap_name}" of repository "{REPO_NAME}"')
sys.exit(1)
# Update published snapshot of repository
print(
f'Update published snapshot of repository "{REPO_NAME}" to "{snap_name}"')
url = f'{API_URL}/publish/:./{REPO_NAME}'
payload = {
'Snapshots': [
{
'Component': REPO_COMPONENT,
'Name': snap_name
}
]
}
result = session.put(url, json=payload)
if (
result.status_code < 200 or
result.status_code > 299
):
print(
f'Fail to update published snapshot of repository "{REPO_NAME}" to '
f'"{snap_name}"')
sys.exit(1)
print("Done.")