"""View callables to browse into one or several warehouses."""
from __future__ import annotations
from colander import SchemaNode, Mapping
from pyramid.request import Request
from pyramid.response import FileResponse
from pyramid.view import view_config
from pyramid.httpexceptions import HTTPForbidden
from chrysalio.lib.utils import size_label, age
from chrysalio.lib.form import Form, get_action
from chrysalio.lib.i18n import translate_field
from chrysalio.views import BaseView
from chrysalio.models.dbgroup import DBGroup
from cioservice.models.dbjob import DBJob
from ..routes import route_thumbnail, route_icon
from ..lib.utils import ONLY4GROUPS_ALL, isodt2str, isodt2age
from ..lib.utils import scope_query, files2response, ciopaths2absolute_paths
from ..lib.utils import query_in
from ..lib.ciopath import CioPath
from ..lib.field import display_fields
from ..lib.warehouse import Warehouse
from ..lib.file_paging import PAGE_SIZES, FilePaging
from ..lib.file_filter import OPERATORS, FileFilter
from ..lib.clipboard import clipboard_copy, clipboard_paste
from ..lib.wfile import WFile
from ..lib.wjob import WJob
from ..models.dbsharing import DBSharing
from ..models.dbseed import DBSeed
from ..lib.i18n import _
# =============================================================================
[docs]
class BrowseView(BaseView):
"""Class to manage warehouse browsing.
:type request: pyramid.request.Request
:param request:
Current request.
"""
# -------------------------------------------------------------------------
def __init__(self, request: Request):
"""Constructor method."""
super().__init__(request)
self._file_panel = request.registry['panels']['filepanel']
self._wfile = WFile(request)
self._wjob = WJob(request)
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='browse', renderer='ciowarehouse2:Templates/browse.pt')
@view_config(
route_name='glance', renderer='ciowarehouse2:Templates/browse.pt')
@view_config(route_name='browse', renderer='json', xhr=True)
def browse(self) -> dict | FileResponse:
"""Browse a warehouse."""
# Warehouse
directory_ciopath = CioPath.from_request(self._request, True)
ciowarehouse2 = self._request.registry['modules']['ciowarehouse2']
warehouse = ciowarehouse2.warehouse(
self._request, directory_ciopath.wid)
if warehouse is None:
raise HTTPForbidden(comment=_('This warehouse is not accessible!'))
# Warehouse refreshing and sharings purge
warehouse.refresh(self._request, now=False, in_thread=True)
DBSharing.purge_expired(self._request, self._request.dbsession)
# Form
action, targets = get_action(self._request)
form, defaults, groups = self._form( # yapf: disable
warehouse, directory_ciopath, action)
# Action before `FilePaging`
i_writer = ciowarehouse2.warehouse_file_writer(
self._request, warehouse)
action, response = self._action_before_paging(
warehouse, directory_ciopath, form, action, targets, i_writer)
if response is not None:
return response
# File filter and file paging
file_filter = FileFilter(
self._request,
str(directory_ciopath) if directory_ciopath else 'warehouses',
remove=action[4:] if action[:4] == 'crm!' and action[4:] else None,
suggest_only=True)
paging = self._paging(
warehouse, directory_ciopath, defaults, file_filter)
# Action after `FilePaging`
action = self._action_after_paging(form, action, targets, paging)
action = self._file_panel.action_after_paging(
self._request, warehouse, form, action, paging)
# Breadcrumbs & documentation
self._request.breadcrumbs(
warehouse.label(self._request), root_chunks=3)
self._request.documentation = '/warehouse/browse'
return {
'PAGE_SIZES': PAGE_SIZES,
'OPERATORS': OPERATORS,
#
'translate_field': translate_field,
'age': age,
'isodt2str': isodt2str,
'isodt2age': isodt2age,
'size_label': size_label,
'route_thumbnail': route_thumbnail,
'route_icon': route_icon,
#
'action': action,
'targets': targets,
'form': form,
'warehouse': warehouse.uid,
'i_writer': i_writer,
'seeders': self._request.registry['seeders'],
'seeds': warehouse.seeds(self._request),
'jobs': self._wjob.available(warehouse),
'download_max_size': warehouse.download_max_size,
'trail': warehouse.file_trail(self._request, directory_ciopath),
'file_panel': self._file_panel,
'reopen_panel': self._file_panel.reopen_panel(self._request),
'file_filter': file_filter,
'groups': groups,
'fields': self._request.registry['fields'],
'cardfields': display_fields(self._request, 'cards', warehouse),
'listfields': display_fields(self._request, 'list', warehouse),
'paging': paging
}
# -------------------------------------------------------------------------
def _form(
self,
warehouse: Warehouse,
directory_ciopath: CioPath,
action: str,
) -> tuple[Form, dict, list[tuple[str, str]] | None]:
"""Return a form with its default values and a list of groups.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object.
:type directory_ciopath: .lib.ciopath.CioPath
:param directory_ciopath:
CioPath of the current directory, possibly and empty one.
:param str action:
Current action.
:rtype: tuple
"""
# Groups
groups = None if action[:3] != 'grp' else [
(str(k.group_id), str(k.label(self._request)))
for k in self._request.dbsession.query(DBGroup)
]
# Default
defaults = FilePaging.params(
self._request, directory_ciopath, default_sort='+file_name')
defaults['scope'] = 'directory'
# Schema
schema: SchemaNode | None = None
force_defaults = False
if action[:3] == 'ren':
schema, force_defaults = self._file_panel.schema_rename_file(
self._request, defaults, action)
elif action[:3] == 'grp':
schema, force_defaults = self._file_panel.schema_only4groups(
self._request, defaults, action, groups)
elif action[:3] == 'mta':
schema, force_defaults = self._file_panel.schema_update_metadata(
self._request, defaults, action)
elif action[:3] == 'shr':
schema, force_defaults = self._file_panel.schema_sharing(
self._request, defaults, action)
elif action[:3] == 'sed':
schema, force_defaults = self._schema_seed(
warehouse, directory_ciopath, defaults, action)
elif action[:3] == 'job':
schema, force_defaults = self._schema_job(
warehouse, defaults, action)
# Form
form = Form(
self._request,
schema=schema,
defaults=defaults,
force_defaults=force_defaults)
form.forget('filter_value')
if action and action[3] == '!' and form.validate():
form.forget('#')
return form, defaults, groups
# -------------------------------------------------------------------------
def _schema_seed(
self, warehouse: Warehouse, directory_ciopath: CioPath,
defaults: dict, action: str) -> tuple[SchemaNode | None, bool]:
"""Colander schema for a seed."""
seeder = self._request.registry['seeders'].get(
action[4:].partition('ǁ')[2])
if seeder is not None:
schema = SchemaNode(Mapping())
dbseed = self._request.dbsession.query(DBSeed).filter_by(
seed_id=action[4:].partition('ǁ')[0]).first()
path = directory_ciopath.absolute_path(warehouse.root)
seeder.values_schema(schema, defaults, dbseed, True, path)
return schema, True
return None, False
# -------------------------------------------------------------------------
def _schema_job(self, warehouse: Warehouse, defaults: dict,
action: str) -> tuple[SchemaNode | None, bool]:
"""Colander schema for a job."""
job = warehouse.job(self._request, action[4:])
if job is not None:
service = self._request.registry['services'][job['service_id']]
dbjob = self._request.dbsession.query(DBJob).filter_by(
job_id=action[4:]).first()
schema = SchemaNode(Mapping())
service.values_schema(schema, defaults, dbjob, False)
return schema, True
return None, False
# -------------------------------------------------------------------------
def _action_before_paging(
self, warehouse: Warehouse, directory_ciopath: CioPath, form: Form,
action: str, targets: tuple[str],
i_writer: bool) -> tuple[str, FileResponse | dict | None]:
"""Execute an action before loading the `FilePaging`.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object if exists.
:type directory_ciopath: .lib.ciopath.CioPath
:param directory_ciopath:
`CioPath` of the current directory, possibly an empty one.
:type form: chrysalio.lib.form.Form
:param form:
Current form.
:param str action:
Current action.
:param tuple targets:
Targets for the action.
:param bool i_writer:
``True`` if the user is authorized to write into the warehouse.
:rtype: tuple
:return:
A tuple such as ``(action, response)``.
"""
# pylint: disable = too-many-arguments, too-many-positional-arguments
# pylint: disable = too-many-return-statements
# Upload files via Ajax
if self._request.is_xhr:
if i_writer:
self._wfile.upload(warehouse, directory_ciopath)
return '', {}
# Upload files
if action == 'imp!':
if i_writer:
self._wfile.upload(warehouse, directory_ciopath)
return '', None
# Download
if action[:4] == 'dnl!':
common_path = directory_ciopath.absolute_path(warehouse.root)
response = files2response(
self._request,
ciopaths2absolute_paths(
self._request, [k.partition('ǁ')[2] for k in targets]),
common_path)
return '', response
# Remove
if action[:4] == 'rmv!':
self._wfile.remove(targets)
return '', None
# Paste
if action == 'pst!':
if i_writer:
clipboard_paste(self._request, warehouse, directory_ciopath)
return '', None
# Create a new file or a new directory
if action[:4] == 'sed!' and warehouse is not None and i_writer:
if not form.validate():
return action.replace('!', '?'), None
self._wfile.new_file(
warehouse, directory_ciopath, action, form.values)
return '', None
# Job
if action[:3] == 'job':
action, build_id = self._wjob.prepare(warehouse, form, action)
if build_id is not None and action and action[3] == '!':
self._wjob.run(build_id)
return action, None
return action, None
# -------------------------------------------------------------------------
def _action_after_paging(
self, form: Form, action: str, targets: tuple[str],
paging: FilePaging) -> str | None:
"""Execute an action after loading of the `FilePaging`.
:type form: chrysalio.lib.form.Form
:param form:
Current form.
:param str action:
Current action.
:param tuple targets:
Tuple of `CioType`ǁ`CioPath`, targets for the action.
:type paging: .lib.file_paging.FilePaging
:param paging:
Current `FilePaging`.
:rtype: str
"""
# Copy/cut
if action[:4] in ('cpy!', 'cut!'):
clipboard_copy(
self._request, paging, targets, action[:4] == 'cut!')
return ''
# Sharing
if action[:4] == 'shr!':
if form.validate():
self._wfile.sharing_create(targets, form.values, paging)
return ''
return 'shr?#'
return action
# -------------------------------------------------------------------------
def _paging(
self, warehouse: Warehouse, directory_ciopath: CioPath,
defaults: dict, file_filter: FileFilter | None) -> FilePaging:
"""Return the current paging.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Warehouse if exists.
:type directory_ciopath: .lib.ciopath.CioPath
:param directory_ciopath:
CioPath of the current directory, possibly an empty one.
:param dict defaults:
Default values for the paging.
:type file_filter: .lib.file_filter.FileFilter
:param file_filter:
Current file filter.
:rtype: tuple
"""
# Create scope query
has_filter = file_filter is not None and not file_filter.is_empty()
query = scope_query(
self._request, defaults['scope'], directory_ciopath, has_filter)
if query is None:
return FilePaging(self._request, directory_ciopath, '', defaults)
# Create query on access rights
ciowarehouse2 = self._request.registry['modules']['ciowarehouse2']
if not ciowarehouse2.warehouse_admin(self._request, warehouse) \
and not self._request.has_permission('warehouse-create'):
user_groups = (ONLY4GROUPS_ALL, ) + \
self._request.session['user']['groups']
query = f"{query} AND {query_in('only4groups', user_groups)}"
# Add filter
if file_filter is not None and has_filter:
query = f'{query} AND {file_filter.query()}'
# Create paging
paging = FilePaging(
self._request, directory_ciopath, query, defaults, True)
return paging