"""Warehouse job class."""
from __future__ import annotations
from os.path import dirname
from collections import OrderedDict
from pyramid.httpexceptions import HTTPFound
from chrysalio.lib.i18n import translate_field
from chrysalio.lib.config import settings_get_list
from chrysalio.lib.utils import common_directory, make_digest, convert_value
from chrysalio.lib.utils import rst2html
from chrysalio.lib.form import Form
from chrysalio.helpers.literal import Literal
from cioservice.models.dbjob import DBJob
from .i18n import _, translate
from .utils import ciopaths2absolute_paths, build_callback
from .warehouse import Warehouse
# =============================================================================
[docs]
class WJob(object):
"""Class to manage jobs manually called in a warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
"""
# -------------------------------------------------------------------------
def __init__(self, request):
"""Constructor method."""
self._request = request
# -------------------------------------------------------------------------
[docs]
def available(
self,
warehouse: Warehouse,
caller: str = 'browse_view') -> OrderedDict | None:
"""Return a list of available jobs for the current warehouse.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object.
:param str caller: (default='browse_view')
Name of the entity which calls the warehouse.
:rtype: dict
:return:
A dictionary of tuples such as
``(icon, translated_label, service, context)``.
"""
jobs = warehouse.jobs(self._request) if warehouse else None
if not jobs:
return None
ciowarehouse2 = self._request.registry['modules']['ciowarehouse2']
access = ciowarehouse2.warehouse_access(self._request, warehouse)
file_writer = ciowarehouse2.warehouse_file_writer(
self._request, warehouse, access)
meta_writer = ciowarehouse2.warehouse_meta_writer(
self._request, warehouse, access)
groups = set(self._request.session['user']['groups'])
services = self._request.registry['services']
available = []
for job_id in sorted( # yapf: disable
jobs,
key=lambda k: jobs[k]['priority'], # type: ignore
reverse=True):
job = jobs[job_id]
if services[job['service_id']].authorized(
caller=caller, warehouse=warehouse, job=job,
file_writer=file_writer, meta_writer=meta_writer,
user_id=self._request.session['user']['user_id'],
groups=groups):
available.append((job_id, ( # yapf: disable
job['icon'],
translate_field(self._request, job['i18n_label']),
services[job['service_id']], job['context'])))
return OrderedDict(available)
# -------------------------------------------------------------------------
[docs]
def prepare(self, warehouse: Warehouse | None, form: Form,
action: str) -> tuple[str, str | None]:
"""Prepare a job and a build.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object.
:param form:
Current form.
:param str action:
Current action.
:rtype: tuple
:return:
A tuple such as ``(action, build_id)``.
"""
# Get job
job = self.get(warehouse, action[4:])
if warehouse is None or job is None:
return '', None
# Display dialog
if action[:4] == 'job?' and job['threaded'] != 'monitored':
return action, None
# Validate form
if action[:4] == 'job!' and job['threaded'] != 'monitored' \
and not form.validate():
return action.replace('!', '?'), None
# Get build parameters
abs_paths = ciopaths2absolute_paths(
self._request, [
k[1:].partition('ǁ')[2]
for k in self._request.POST if k[0] == '#'
])
params = self._build_parameters(warehouse, job, abs_paths)
if params is None:
return '', None
build_id = '{0}-{1}'.format(
job['job_id'], make_digest(','.join(sorted(params['files']))))
# Register into the build manager
error = self._request.registry['modules'][
'cioservice'].build_manager.register(build_id, job, params)
if error is not None:
self._request.session.flash(error, 'alert')
return '', None
# Possibly move to build page
if job['threaded'] == 'monitored':
raise HTTPFound(
self._request.route_path('build_view', build_id=build_id))
return action, build_id
# -------------------------------------------------------------------------
[docs]
def run(self, build_id: str):
"""Execute a job on a build.
:param str build_id:
ID of a registered build.
"""
# Get the build environment
build_manager = self._request.registry['modules'][
'cioservice'].build_manager
build_env = build_manager.build_env(build_id)
if build_env is None:
self._request.session.flash(
_('This build does not exist.'), 'alert')
return
# Execute the job in a thread
if build_env['job']['threaded'] == 'yes':
build_manager.clean_logs(self._request.dbsession)
error = build_manager.run(build_id, build_env)
if error:
self._request.session.flash(error, 'alert')
else:
self._request.session.flash(translate(_( # yapf: disable
'The action "${a}" has been launched.', {
'a': translate_field(
self._request, build_env['job']['i18n_label'])
}), request=self._request), 'refresh')
return
# Execute the job in the request
if build_env['status'] == 'running': # pragma: nocover
self._request.session.flash(_('Job already in progress.'), 'alert')
return
build_manager.clean_logs(self._request.dbsession)
build_manager.set_launch_time(build_id, build_env)
build_env['params']['dbsession'] = self._request.dbsession
service = self._request.registry['services'][ # yapf: disable
build_env['job']['service_id']]
result = service.run(service.make_build(build_id, build_env['params']))
# Commit and refresh
build_callback(
self._request.registry, build_env, result, self._request)
# Push result into a flash message
self._result2flash(result)
# -------------------------------------------------------------------------
[docs]
def get(
self,
warehouse: Warehouse | None,
job_id: str,
caller: str = 'browse_view') -> dict | None:
"""Retrieve a warehouse job checking if it is authorized in this
context.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object if exists.
:param str job_id:
Job ID.
:param str caller: (default='browse_view')
Name of the entity which calls the warehouse.
:rtype: dict
See: :meth:`.lib.warehouse.Warehouse.job`
"""
if warehouse is None:
return None
job = warehouse.job(self._request, job_id)
if job is None:
self._request.session.flash(
_('This action does not exist for this warehouse.'), 'alert')
return None
ciowarehouse2 = self._request.registry['modules']['ciowarehouse2']
access = ciowarehouse2.warehouse_access(self._request, warehouse)
if not self._request.registry['services'][
job['service_id']].authorized(
caller=caller, job=job, warehouse=warehouse,
file_writer=ciowarehouse2.warehouse_file_writer(
self._request, warehouse, access),
meta_writer=ciowarehouse2.warehouse_meta_writer(
self._request, warehouse, access)):
self._request.session.flash(
_('This action is not authorized in this context.'), 'alert')
return None
return job
# -------------------------------------------------------------------------
def _build_parameters(
self, warehouse: Warehouse, job: dict,
abs_paths: list[str]) -> dict | None:
"""Make up parameters for the build.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object if exists.
:param dict job:
Job dictionary.
:param list abs_paths:
Absolute paths to the targets for the action.
:rtype: dict
"""
params = {
'job_id': job['job_id'],
'context': job['context'],
'lang': self._request.locale_name,
'ttl': job['ttl'],
'settings': dict(job['settings']),
'files': abs_paths,
'caller': dict(self._request.session['user']),
'callback': 'ciowarehouse2'
}
# Files
service = self._request.registry['services'][job['service_id']]
service.select_files(params)
if service.need_files(job['context']) and not params['files']:
self._request.session.flash(
service.select_files_message(params), 'alert')
return None
# Values of variables
dbjob = self._request.dbsession.query(DBJob).filter_by(
job_id=job['job_id']).first()
if dbjob is None:
self._request.session.flash(
_('This action no longer exists.'), 'alert')
return None
variables = service.variables(dbjob.context)
params['values'] = {
k: variables[k]['default']
for k in variables if 'default' in variables[k]
}
for dbvalue in dbjob.values:
if dbvalue.variable in variables:
params['values'][dbvalue.variable] = convert_value(
variables[dbvalue.variable]['type'], dbvalue.value)
for variable in self._request.params:
if variable[:4] == 'val:' and variable[4:] in variables:
params['values'][variable[4:]] = convert_value(
variables[variable[4:]]['type'],
self._request.params[variable])
# Resources
params['resources'] = [
k.strip() for k in params['values'].get(
'__resources__', '').split(',') if k.strip()] \
+ list({dirname(k) for k in params['files']}) \
+ settings_get_list(job['settings'], 'resources')
# Settings
params['settings'] = {
k: job['settings'][k]
for k in job['settings'] if k not in ('files', 'resources')
}
# Input
if warehouse:
params['settings']['input.home.path'] = warehouse.root
params['settings']['input.home.id'] = warehouse.uid
params['settings']['input.home.url'] = self._request.route_path(
'glance', ciopath=[warehouse.uid, ''])
if not service.need_write_permission(job['context']):
return params
# Output
params['output'] = job['settings'].get('output')
if not params['output']:
params['output'] = common_directory(list(params['files']))
if warehouse:
params['settings']['output.home.path'] = warehouse.root
params['settings']['output.home.id'] = warehouse.uid
params['settings']['output.home.url'] = \
self._request.route_path(
'glance', ciopath=[warehouse.uid, ''])
return params
# -------------------------------------------------------------------------
def _result2flash(self, result: dict):
"""Add flash messages according to the result.
:param dict result:
Result dictionary.
"""
if 'errors' in result:
for k in result['errors']:
self._request.session.flash(
Literal(rst2html(self._request.localizer.translate(k))),
'alert')
elif 'warnings' in result:
for k in result['warnings']:
self._request.session.flash(
Literal(rst2html(self._request.localizer.translate(k))))
if 'infos' in result and 'errors' not in result:
for k in result['infos']:
self._request.session.flash(
Literal(rst2html(self._request.localizer.translate(k))))