Source code for ciowarehouse2.views.browse

"""View callables to browse into one or several warehouses."""

from __future__ import annotations

from colander import SchemaNode, Mapping

from pyramid.request import Request
from pyramid.response import FileResponse
from pyramid.view import view_config
from pyramid.httpexceptions import HTTPForbidden

from chrysalio.lib.utils import size_label, age
from chrysalio.lib.form import Form, get_action
from chrysalio.lib.i18n import translate_field
from chrysalio.views import BaseView
from chrysalio.models.dbgroup import DBGroup
from cioservice.models.dbjob import DBJob
from ..routes import route_thumbnail, route_icon
from ..lib.utils import ONLY4GROUPS_ALL, isodt2str, isodt2age
from ..lib.utils import scope_query, files2response, ciopaths2absolute_paths
from ..lib.utils import query_in
from ..lib.ciopath import CioPath
from ..lib.field import display_fields
from ..lib.warehouse import Warehouse
from ..lib.file_paging import PAGE_SIZES, FilePaging
from ..lib.file_filter import OPERATORS, FileFilter
from ..lib.clipboard import clipboard_copy, clipboard_paste
from ..lib.wfile import WFile
from ..lib.wjob import WJob
from ..models.dbsharing import DBSharing
from ..models.dbseed import DBSeed
from ..lib.i18n import _


# =============================================================================
[docs] class BrowseView(BaseView): """Class to manage warehouse browsing. :type request: pyramid.request.Request :param request: Current request. """ # ------------------------------------------------------------------------- def __init__(self, request: Request): """Constructor method.""" super().__init__(request) self._file_panel = request.registry['panels']['filepanel'] self._wfile = WFile(request) self._wjob = WJob(request) # -------------------------------------------------------------------------
[docs] @view_config( route_name='browse', renderer='ciowarehouse2:Templates/browse.pt') @view_config( route_name='glance', renderer='ciowarehouse2:Templates/browse.pt') @view_config(route_name='browse', renderer='json', xhr=True) def browse(self) -> dict | FileResponse: """Browse a warehouse.""" # Warehouse directory_ciopath = CioPath.from_request(self._request, True) ciowarehouse2 = self._request.registry['modules']['ciowarehouse2'] warehouse = ciowarehouse2.warehouse( self._request, directory_ciopath.wid) if warehouse is None: raise HTTPForbidden(comment=_('This warehouse is not accessible!')) # Warehouse refreshing and sharings purge warehouse.refresh(self._request, now=False, in_thread=True) DBSharing.purge_expired(self._request, self._request.dbsession) # Form action, targets = get_action(self._request) form, defaults, groups = self._form( # yapf: disable warehouse, directory_ciopath, action) # Action before `FilePaging` i_writer = ciowarehouse2.warehouse_file_writer( self._request, warehouse) action, response = self._action_before_paging( warehouse, directory_ciopath, form, action, targets, i_writer) if response is not None: return response # File filter and file paging file_filter = FileFilter( self._request, str(directory_ciopath) if directory_ciopath else 'warehouses', remove=action[4:] if action[:4] == 'crm!' and action[4:] else None, suggest_only=True) paging = self._paging( warehouse, directory_ciopath, defaults, file_filter) # Action after `FilePaging` action = self._action_after_paging(form, action, targets, paging) action = self._file_panel.action_after_paging( self._request, warehouse, form, action, paging) # Breadcrumbs & documentation self._request.breadcrumbs( warehouse.label(self._request), root_chunks=3) self._request.documentation = '/warehouse/browse' return { 'PAGE_SIZES': PAGE_SIZES, 'OPERATORS': OPERATORS, # 'translate_field': translate_field, 'age': age, 'isodt2str': isodt2str, 'isodt2age': isodt2age, 'size_label': size_label, 'route_thumbnail': route_thumbnail, 'route_icon': route_icon, # 'action': action, 'targets': targets, 'form': form, 'warehouse': warehouse.uid, 'i_writer': i_writer, 'seeders': self._request.registry['seeders'], 'seeds': warehouse.seeds(self._request), 'jobs': self._wjob.available(warehouse), 'download_max_size': warehouse.download_max_size, 'trail': warehouse.file_trail(self._request, directory_ciopath), 'file_panel': self._file_panel, 'reopen_panel': self._file_panel.reopen_panel(self._request), 'file_filter': file_filter, 'groups': groups, 'fields': self._request.registry['fields'], 'cardfields': display_fields(self._request, 'cards', warehouse), 'listfields': display_fields(self._request, 'list', warehouse), 'paging': paging }
# ------------------------------------------------------------------------- def _form( self, warehouse: Warehouse, directory_ciopath: CioPath, action: str, ) -> tuple[Form, dict, list[tuple[str, str]] | None]: """Return a form with its default values and a list of groups. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object. :type directory_ciopath: .lib.ciopath.CioPath :param directory_ciopath: CioPath of the current directory, possibly and empty one. :param str action: Current action. :rtype: tuple """ # Groups groups = None if action[:3] != 'grp' else [ (str(k.group_id), str(k.label(self._request))) for k in self._request.dbsession.query(DBGroup) ] # Default defaults = FilePaging.params( self._request, directory_ciopath, default_sort='+file_name') defaults['scope'] = 'directory' # Schema schema: SchemaNode | None = None force_defaults = False if action[:3] == 'ren': schema, force_defaults = self._file_panel.schema_rename_file( self._request, defaults, action) elif action[:3] == 'grp': schema, force_defaults = self._file_panel.schema_only4groups( self._request, defaults, action, groups) elif action[:3] == 'mta': schema, force_defaults = self._file_panel.schema_update_metadata( self._request, defaults, action) elif action[:3] == 'shr': schema, force_defaults = self._file_panel.schema_sharing( self._request, defaults, action) elif action[:3] == 'sed': schema, force_defaults = self._schema_seed( warehouse, directory_ciopath, defaults, action) elif action[:3] == 'job': schema, force_defaults = self._schema_job( warehouse, defaults, action) # Form form = Form( self._request, schema=schema, defaults=defaults, force_defaults=force_defaults) form.forget('filter_value') if action and action[3] == '!' and form.validate(): form.forget('#') return form, defaults, groups # ------------------------------------------------------------------------- def _schema_seed( self, warehouse: Warehouse, directory_ciopath: CioPath, defaults: dict, action: str) -> tuple[SchemaNode | None, bool]: """Colander schema for a seed.""" seeder = self._request.registry['seeders'].get( action[4:].partition('ǁ')[2]) if seeder is not None: schema = SchemaNode(Mapping()) dbseed = self._request.dbsession.query(DBSeed).filter_by( seed_id=action[4:].partition('ǁ')[0]).first() path = directory_ciopath.absolute_path(warehouse.root) seeder.values_schema(schema, defaults, dbseed, True, path) return schema, True return None, False # ------------------------------------------------------------------------- def _schema_job(self, warehouse: Warehouse, defaults: dict, action: str) -> tuple[SchemaNode | None, bool]: """Colander schema for a job.""" job = warehouse.job(self._request, action[4:]) if job is not None: service = self._request.registry['services'][job['service_id']] dbjob = self._request.dbsession.query(DBJob).filter_by( job_id=action[4:]).first() schema = SchemaNode(Mapping()) service.values_schema(schema, defaults, dbjob, False) return schema, True return None, False # ------------------------------------------------------------------------- def _action_before_paging( self, warehouse: Warehouse, directory_ciopath: CioPath, form: Form, action: str, targets: tuple[str], i_writer: bool) -> tuple[str, FileResponse | dict | None]: """Execute an action before loading the `FilePaging`. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object if exists. :type directory_ciopath: .lib.ciopath.CioPath :param directory_ciopath: `CioPath` of the current directory, possibly an empty one. :type form: chrysalio.lib.form.Form :param form: Current form. :param str action: Current action. :param tuple targets: Targets for the action. :param bool i_writer: ``True`` if the user is authorized to write into the warehouse. :rtype: tuple :return: A tuple such as ``(action, response)``. """ # pylint: disable = too-many-arguments, too-many-positional-arguments # pylint: disable = too-many-return-statements # Upload files via Ajax if self._request.is_xhr: if i_writer: self._wfile.upload(warehouse, directory_ciopath) return '', {} # Upload files if action == 'imp!': if i_writer: self._wfile.upload(warehouse, directory_ciopath) return '', None # Download if action[:4] == 'dnl!': common_path = directory_ciopath.absolute_path(warehouse.root) response = files2response( self._request, ciopaths2absolute_paths( self._request, [k.partition('ǁ')[2] for k in targets]), common_path) return '', response # Remove if action[:4] == 'rmv!': self._wfile.remove(targets) return '', None # Paste if action == 'pst!': if i_writer: clipboard_paste(self._request, warehouse, directory_ciopath) return '', None # Create a new file or a new directory if action[:4] == 'sed!' and warehouse is not None and i_writer: if not form.validate(): return action.replace('!', '?'), None self._wfile.new_file( warehouse, directory_ciopath, action, form.values) return '', None # Job if action[:3] == 'job': action, build_id = self._wjob.prepare(warehouse, form, action) if build_id is not None and action and action[3] == '!': self._wjob.run(build_id) return action, None return action, None # ------------------------------------------------------------------------- def _action_after_paging( self, form: Form, action: str, targets: tuple[str], paging: FilePaging) -> str | None: """Execute an action after loading of the `FilePaging`. :type form: chrysalio.lib.form.Form :param form: Current form. :param str action: Current action. :param tuple targets: Tuple of `CioType`ǁ`CioPath`, targets for the action. :type paging: .lib.file_paging.FilePaging :param paging: Current `FilePaging`. :rtype: str """ # Copy/cut if action[:4] in ('cpy!', 'cut!'): clipboard_copy( self._request, paging, targets, action[:4] == 'cut!') return '' # Sharing if action[:4] == 'shr!': if form.validate(): self._wfile.sharing_create(targets, form.values, paging) return '' return 'shr?#' return action # ------------------------------------------------------------------------- def _paging( self, warehouse: Warehouse, directory_ciopath: CioPath, defaults: dict, file_filter: FileFilter | None) -> FilePaging: """Return the current paging. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Warehouse if exists. :type directory_ciopath: .lib.ciopath.CioPath :param directory_ciopath: CioPath of the current directory, possibly an empty one. :param dict defaults: Default values for the paging. :type file_filter: .lib.file_filter.FileFilter :param file_filter: Current file filter. :rtype: tuple """ # Create scope query has_filter = file_filter is not None and not file_filter.is_empty() query = scope_query( self._request, defaults['scope'], directory_ciopath, has_filter) if query is None: return FilePaging(self._request, directory_ciopath, '', defaults) # Create query on access rights ciowarehouse2 = self._request.registry['modules']['ciowarehouse2'] if not ciowarehouse2.warehouse_admin(self._request, warehouse) \ and not self._request.has_permission('warehouse-create'): user_groups = (ONLY4GROUPS_ALL, ) + \ self._request.session['user']['groups'] query = f"{query} AND {query_in('only4groups', user_groups)}" # Add filter if file_filter is not None and has_filter: query = f'{query} AND {file_filter.query()}' # Create paging paging = FilePaging( self._request, directory_ciopath, query, defaults, True) return paging