Source code for ciowarehouse2.views.warehouse

"""View callables to manage warehouses."""

from __future__ import annotations
from os.path import join, exists
from shutil import rmtree
from json import loads
from collections import OrderedDict

from sqlalchemy import desc
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import FlushError

from pyramid.request import Request
from pyramid.response import Response
from pyramid.view import view_config
from pyramid.httpexceptions import HTTPForbidden, HTTPFound, HTTPNotFound
from pyramid.security import NO_PERMISSION_REQUIRED

from chrysalio.lib.utils import make_id
from chrysalio.lib.form import get_action, Form
from chrysalio.lib.log import log_info
from chrysalio.lib.filter import Filter
from chrysalio.lib.paging import PAGE_SIZES, Paging
from chrysalio.lib.attachment import attachment_url, attachment_update
from chrysalio.lib.tabset import Tabset
from chrysalio.includes.themes import theme_static_prefix
from chrysalio.views import BaseView
from chrysalio.lib.restful import restful_login
from chrysalio.models.dbuser import DBUser
from chrysalio.models.dbgroup import DBGroup
from chrysalio.models.populate import web2db, db2web
from cioservice.models.dbjob import DBJob
from ..lib.i18n import _
from ..lib.ciopath import CioPath
from ..lib.utils import warehouses_favorite, warehouses_in_menu
from ..lib.warehouse import Warehouse
from ..relaxng import RELAXNG_CIOWAREHOUSE2
from ..models.populate import xml2db
from ..models.dbwarehouse import WAREHOUSE_ACCESSES, DBWarehouseCardField
from ..models.dbwarehouse import DBWarehouseListField, DBWarehouseMetaField
from ..models.dbwarehouse import DBWarehouseJob, DBWarehouseSeed
from ..models.dbwarehouse import DBWarehouse, DBWarehouseUser, DBWarehouseGroup


# =============================================================================
[docs] class WarehouseView(BaseView): """Class to manage warehouse views. :type request: pyramid.request.Request :param request: Current request. """ # ------------------------------------------------------------------------- def __init__(self, request: Request): """Constructor method.""" super().__init__(request) if 'modules_off' not in self._request.registry or \ 'ciowarehouse2' in self._request.registry['modules_off']: raise HTTPForbidden( comment=_('The module "CioWarehouse2" is not activated.')) # -------------------------------------------------------------------------
[docs] @view_config( route_name='warehouse_index', renderer='ciowarehouse2:Templates/warehouse_index.pt', permission='warehouse-view') @view_config(route_name='warehouse_index', renderer='json', xhr=True) def index(self) -> dict | Response: """List all authorized warehouses.""" # Ajax i_creator = self._request.has_permission('warehouse-create') \ or self._request.has_permission('warehouse-configure') if self._request.is_xhr: if i_creator: self._import_warehouses() return {} # Action action, items = self._index_action(i_creator) if action[:4] == 'exp!': response = self._warehouses2response(items) if response: return response # Filter paging_id = 'warehouse_index' if i_creator: pfilter = Filter( self._request, paging_id, ( ('i18n_label', _('Label'), False, ''), ('warehouse_id', _('Identifier'), False, ''), ( 'access', _('Access'), False, [('', ' ')] + list(WAREHOUSE_ACCESSES.items()))), remove=action[:4] == 'crm!' and action[4:] or None) else: pfilter = Filter( self._request, paging_id, ( ('i18n_label', _('Label'), False, ''), ('warehouse_id', _('Identifier'), False, '')), remove=action[:4] == 'crm!' and action[4:] or None) # Favorites and "in_menu" if 'ciowarehouse2' not in self._request.session: self._request.session['ciowarehouse2'] = {} if self._request.POST: self._request.session['ciowarehouse2']['favorite_only'] = bool( self._request.POST.get('favorite')) favorites = warehouses_favorite(self._request) in_menu = warehouses_in_menu(self._request) # Paging defaults = Paging.params(self._request, paging_id, '+warehouse_id') defaults['favorite'] = favorites and \ self._request.session['ciowarehouse2'].get('favorite_only', True) dbquery = pfilter.sql( self._request.dbsession.query(DBWarehouse), 'wrh2_warehouses') if not i_creator: dbquery = dbquery.filter( DBWarehouse.warehouse_id.in_(self._my_warehouse_ids())) if defaults['favorite']: dbquery = dbquery.filter(DBWarehouse.warehouse_id.in_(favorites)) oby = getattr(DBWarehouse, defaults['sort'][1:]) dbquery = dbquery.order_by( desc(oby) if defaults['sort'][0] == '-' else oby) paging = Paging(self._request, paging_id, dbquery, defaults) paging.set_current_ids('warehouse_id') if not defaults['favorite'] and pfilter.is_empty() \ and not self._request.has_permission('warehouse-edit') \ and not self._request.has_permission('warehouse-configure') \ and len(paging) == 1: return HTTPFound( self._request.route_path( 'browse', ciopath=paging[0].warehouse_id)) # Form & completed action form = Form(self._request, defaults=defaults) form.forget('filter_value') if action and action[3] == '!': action = '' # Breadcrumbs & documentation self._request.breadcrumbs(_('Warehouses'), 1) self._request.documentation = '/warehouse/index' return { # yapf: disable 'action': action, 'items': items, 'form': form, 'pfilter': pfilter, 'paging': paging, 'favorites': favorites, 'favorite_only': defaults['favorite'], 'in_menu': in_menu, 'i_creator': i_creator, 'i_editor': self._request.has_permission('warehouse-edit'), 'PAGE_SIZES': PAGE_SIZES, 'attachment_url': attachment_url, 'has_attachments': bool( self._request.registry.settings.get('attachments')), 'download_max_size': self._request.registry[ 'settings']['download-max-size']}
# -------------------------------------------------------------------------
[docs] @view_config( route_name='warehouse_index_filter', renderer='json', xhr=True) def index_filter(self) -> dict: """Return a dictionary to autocomplete a filter field.""" return Filter.sql_autocomplete(self._request, DBWarehouse)
# -------------------------------------------------------------------------
[docs] @view_config( route_name='warehouse_view', renderer='ciowarehouse2:Templates/warehouse_view.pt', permission='warehouse-view') def view(self) -> dict: """Show warehouse configuration.""" # Warehouse i_creator = self._request.has_permission('warehouse-create') i_editor = self._request.has_permission('warehouse-edit') i_configurator = self._request.has_permission('warehouse-configure') dbwarehouse = self._get_dbwarehouse(i_creator or i_configurator) picture = self._request.registry.settings.get('attachments') and ( attachment_url( self._request, dbwarehouse.attachments_dir, dbwarehouse.attachments_key, dbwarehouse.picture) or '{0}/ciowarehouse2/images/warehouse_picture.png'.format( theme_static_prefix(self._request))) # Action action = get_action(self._request)[0] if action == 'exp!': action = self._warehouses2response((dbwarehouse.warehouse_id, )) if action: return action elif action == 'bld!' and i_creator: self._force_refresh(dbwarehouse, i_configurator, True) elif action == 'rfh!' and i_editor: self._force_refresh(dbwarehouse, i_configurator, False) # User paging user_paging, defaults, user_filter = self._user_paging( action, dbwarehouse) # Form form = Form(self._request, defaults=defaults) form.forget('filter_value') # Breadcrumbs & documentation label = dbwarehouse.label(self._request) self._request.breadcrumbs( _('Warehouse "${l}"', {'l': label}), replace=self._request.route_path( 'warehouse_edit', warehouse_id=dbwarehouse.warehouse_id)) self._request.documentation = '/admin-warehouse/view' return { # yapf: disable 'form': form, 'label': label, 'user_filter': user_filter, 'user_paging': user_paging, 'tabset': Tabset( self._request, 'tabWarehouse', dbwarehouse.settings_tabs(self._request)), 'dbwarehouse': dbwarehouse, 'picture': picture, 'navigator': Paging.navigator( self._request, 'warehouse_index', dbwarehouse.warehouse_id, self._request.route_path( 'warehouse_view', warehouse_id='_ID_')), 'i_editor': i_editor, 'i_creator': i_creator }
# -------------------------------------------------------------------------
[docs] @view_config( route_name='warehouse_create', renderer='ciowarehouse2:Templates/warehouse_edit.pt', permission='warehouse-edit') @view_config( route_name='warehouse_edit', renderer='ciowarehouse2:Templates/warehouse_edit.pt', permission='warehouse-edit') @view_config( route_name='warehouse_edit', renderer='json', xhr=True, permission='warehouse-edit') def edit(self) -> dict: """Create or edit a warehouse.""" # Rights i_creator = self._request.has_permission('warehouse-create') i_configurator = self._request.has_permission('warehouse-configure') dbwarehouse = self._get_dbwarehouse(i_creator or i_configurator) \ if 'warehouse_id' in self._request.matchdict else None if dbwarehouse is None and not i_creator and not i_configurator: raise HTTPForbidden() # Ajax if self._request.is_xhr: if dbwarehouse is not None: dbwarehouse.attachments_key, dbwarehouse.picture = \ attachment_update( self._request, dbwarehouse.attachments_dir, dbwarehouse.attachments_key, self._request.POST['picture'], replace=dbwarehouse.picture, prefix=str(dbwarehouse.warehouse_id)[:12]) log_info( self._request, 'warehouse_update_picture', dbwarehouse.warehouse_id) return {} # User paging action = get_action(self._request)[0] user_paging, defaults, user_filter = self._user_paging(action) # Form jobs = OrderedDict([ # yapf: disable (k.job_id, (k.label(self._request), k.description(self._request))) for k in self._request.dbsession.query( DBJob).order_by(desc('priority')) if not k.areas or 'ciowarehouse2.browse' in [i.area_id for i in k.areas]]) groups = { k.group_id: (k.label(self._request), k.description(self._request)) for k in self._request.dbsession.query(DBGroup) } form = Form( self._request, *DBWarehouse.settings_schema( self._request, defaults, groups, jobs, dbwarehouse), obj=dbwarehouse, force_defaults=dbwarehouse is not None) form.forget('filter_value') # Action if action == 'pct!' and dbwarehouse is not None: dbwarehouse.attachments_key, dbwarehouse.picture = \ attachment_update( self._request, dbwarehouse.attachments_dir, dbwarehouse.attachments_key, self._request.POST['picture'], replace=dbwarehouse.picture, prefix=str(dbwarehouse.warehouse_id)[:12]) log_info( self._request, 'warehouse_update_picture', dbwarehouse.warehouse_id) elif action == 'sav!' and form.validate(): dbwarehouse = self._save(dbwarehouse, groups, jobs, form.values) if dbwarehouse is not None: if 'warehouse_id' not in self._request.matchdict: self._request.breadcrumbs.pop() log_info( self._request, 'warehouse_id' in self._request.matchdict and 'warehouse_edit' or 'warehouse_create', dbwarehouse.warehouse_id) return HTTPFound( self._request.route_path( 'warehouse_view', warehouse_id=dbwarehouse.warehouse_id)) if form.has_error(): self._request.session.flash(_('Correct errors.'), 'alert') # Picture if dbwarehouse and self._request.registry.settings.get('attachments'): picture = attachment_url( self._request, dbwarehouse.attachments_dir, dbwarehouse.attachments_key, dbwarehouse.picture) or \ '{0}/ciowarehouse2/images/warehouse_picture.png'.format( theme_static_prefix(self._request)) else: picture = None # Breadcrumbs & documentation label = dbwarehouse and dbwarehouse.label(self._request) if not dbwarehouse: self._request.breadcrumbs(_('Warehouse Creation')) else: self._request.breadcrumbs( _('Warehouse "${l}" Edition', {'l': label}), replace=self._request.route_path( 'warehouse_view', warehouse_id=dbwarehouse.warehouse_id)) self._request.documentation = '/admin-warehouse/edit' return { # yapf:disable 'form': form, 'dbwarehouse': dbwarehouse or DBWarehouse, 'action': action, 'picture': picture, 'label': label, 'user_filter': user_filter, 'user_paging': user_paging, 'groups': groups, 'jobs': jobs, 'tabset': Tabset( self._request, 'tabWarehouse', DBWarehouse.settings_tabs(self._request)) }
# -------------------------------------------------------------------------
[docs] @view_config( route_name='warehouse_refresh', renderer='json', permission=NO_PERMISSION_REQUIRED, require_csrf=False) def refresh(self) -> dict: """Launch a refresh action.""" # Find files to refresh by warehouse ciopaths_by_warehouse: dict[str | None, list[CioPath]] = {} for target in loads(self._request.params.get('ciopaths', '[]')): ciopath = CioPath.from_str(target) if ciopath.wid not in ciopaths_by_warehouse: ciopaths_by_warehouse[ciopath.wid] = [ciopath] else: ciopaths_by_warehouse[ciopath.wid].append(ciopath) # Refresh each warehouse for warehouse_id, ciopaths in ciopaths_by_warehouse.items(): warehouse, err = self._restful_warehouse(warehouse_id) if warehouse is None or err is not None: return {'error': err} err = warehouse.refresh( self._request, ciopaths=ciopaths, now=self._request.params.get('now') == 'True', force=self._request.params.get('force') == 'True', in_thread=self._request.params.get('in_thread') == 'True') if err is not None: return {'error': self._request.localizer.translate(err)} return {'error': None}
# ------------------------------------------------------------------------- def _index_action(self, i_creator: bool) -> tuple[str, tuple]: """Execute actions for index view. :param bool i_creator: ``True`` if the user can create a warehouse. :rtype: tuple :return: A tuple such as ``(action, items)``. """ action, items = get_action(self._request) if action == 'imp!' and i_creator: self._import_warehouses() elif action[:4] == 'del!' and i_creator: self._delete_warehouses(items) elif action[:4] == 'fav!': self._toggle_favorite(action[4:]) elif action[:4] == 'mnu!': self._toggle_in_menu(action[4:]) return action, items # ------------------------------------------------------------------------- def _my_warehouse_ids(self) -> set: """Return a list of authorized warehouse IDs. :rtype: set """ user_id = self._request.session['user']['user_id'] warehouse_ids = { k[0] for k in self._request.dbsession.query(DBWarehouse.warehouse_id). filter(DBWarehouse.access.in_(('free', 'readonly'))) } warehouse_ids |= { k[0] for k in self._request.dbsession.query( DBWarehouseUser.warehouse_id).filter_by( user_id=user_id).filter( DBWarehouseUser.file_rights.isnot(None)) } warehouse_ids |= { k[0] for k in self._request.dbsession.query( DBWarehouseGroup.warehouse_id).filter( DBWarehouseGroup.group_id.in_( self._request.session['user']['groups'])) } return warehouse_ids # ------------------------------------------------------------------------- def _get_dbwarehouse(self, i_creator: bool) -> DBWarehouse: """Return the SqlAlchemy object of the selected warehouse or raise an HTTPNotFound exception. :param bool i_creator: ``True`` if the user can create a warehouse. :rtype: .models.dbwarehouse.DBWarehouse """ dbwarehouse = self._request.dbsession.query(DBWarehouse).filter_by( warehouse_id=self._request.matchdict['warehouse_id']).first() if dbwarehouse is None: raise HTTPNotFound() if not i_creator and \ dbwarehouse.warehouse_id not in self._my_warehouse_ids(): raise HTTPForbidden() return dbwarehouse # ------------------------------------------------------------------------- def _delete_warehouses(self, warehouse_ids: list): """Delete warehouses. :param list warehouse_ids: List of warehouse IDs to delete. """ deleted = [] attachments = self._request.registry.settings.get('attachments') ciowarehouse2 = self._request.registry['modules']['ciowarehouse2'] for dbwarehouse in self._request.dbsession.query(DBWarehouse).filter( DBWarehouse.warehouse_id.in_(warehouse_ids)): # Remove files warehouse = ciowarehouse2.warehouse( self._request, dbwarehouse.warehouse_id) if warehouse is not None: ciowarehouse2.backend.delete(warehouse) ciowarehouse2.warehouse_forget( self._request, dbwarehouse.warehouse_id) if exists(warehouse.root): rmtree(warehouse.root) # Remove attachments if attachments and dbwarehouse.attachments_key: attachment = join( attachments, dbwarehouse.attachments_dir, dbwarehouse.attachments_key) if exists(attachment): rmtree(attachment) # Remove from database deleted.append(dbwarehouse.warehouse_id) self._request.dbsession.delete(dbwarehouse) if deleted: log_info(self._request, 'warehouse_delete', ' '.join(deleted)) # ------------------------------------------------------------------------- def _import_warehouses(self): """Import warehouses.""" # Get current IDs warehouse_ids = { k[0] for k in self._request.dbsession.query(DBWarehouse.warehouse_id) } # Update database web2db(self._request, xml2db, 'warehouse', relaxngs={ # yapf: disable '{{{0}}}{1}'.format( RELAXNG_CIOWAREHOUSE2['namespace'], RELAXNG_CIOWAREHOUSE2['root']): RELAXNG_CIOWAREHOUSE2['file']}) # Get new IDs warehouse_ids = { k[0] for k in self._request.dbsession.query(DBWarehouse.warehouse_id) } - warehouse_ids if not warehouse_ids: return log_info(self._request, 'warehouse_import', ' '.join(warehouse_ids)) # Clone warehouses ciowarehouse2 = self._request.registry['modules']['ciowarehouse2'] for warehouse_id in warehouse_ids: dbwarehouse = self._request.dbsession.query(DBWarehouse).filter_by( warehouse_id=warehouse_id).first() if dbwarehouse is None: continue warehouse = Warehouse( self._request.registry, dbwarehouse, ciowarehouse2.locations) error = warehouse.vcs.clone() if error: self._request.session.flash(error, 'alert') else: ciowarehouse2.backend.index(warehouse) # ------------------------------------------------------------------------- def _warehouses2response(self, warehouse_ids: tuple) -> Response | None: """Export warehouses as an XML file embedded in a Pyramid response. :param tuple warehouse_ids: List of warehouse IDs to export. :rtype: :class:`pyramid.response.Response` or ``''`` """ dbitems = tuple( self._request.dbsession.query(DBWarehouse).filter( DBWarehouse.warehouse_id.in_(warehouse_ids)).order_by( 'warehouse_id')) if not dbitems: return '' filename = '{0}.{1}.xml'.format( len(dbitems) == 1 and dbitems[0].warehouse_id or make_id(self._request.registry['settings']['title'], 'token'), DBWarehouse.suffix) log_info( self._request, 'warehouse_export', ' '.join([k.warehouse_id for k in dbitems])) return db2web(self._request, dbitems, filename, RELAXNG_CIOWAREHOUSE2) # ------------------------------------------------------------------------- def _restful_warehouse( self, warehouse_id: str | None = None ) -> tuple[Warehouse | None, str | None]: """Return a warehouse called by a restful request. :param str warehouse_id: Possible warehouse ID. :rtype: tuple :return: A tuple like ``(warehouse, error)``. """ ciowarehouse2 = self._request.registry['modules']['ciowarehouse2'] err = restful_login( self._request, ciowarehouse2.restful.get('key', '-'), ciowarehouse2.restful.get('token_ttl')) if err is not None: return None, err if warehouse_id is None: warehouse_id = self._request.matchdict['warehouse_id'] warehouse = ciowarehouse2.warehouse(self._request, warehouse_id) if warehouse is None: return None, self._request.localizer.translate( _('Unknown warehouse "${w}"', {'w': warehouse_id})) return warehouse, None # ------------------------------------------------------------------------- def _toggle_favorite(self, warehouse_id: str): """Toggle the favorite state of the warehouse. :param str warehouse_id: ID of the warehouse. """ dbsession = self._request.dbsession user_id = self._request.session['user']['user_id'] dbwarehouse_user = dbsession.query(DBWarehouseUser).filter_by( warehouse_id=warehouse_id, user_id=user_id).first() if dbwarehouse_user is None: dbsession.add( DBWarehouseUser( warehouse_id=warehouse_id, user_id=user_id, favorite=True)) elif dbwarehouse_user.favorite \ and dbwarehouse_user.file_rights is None \ and dbwarehouse_user.meta_rights is None \ and dbwarehouse_user.in_menu is None: dbsession.delete(dbwarehouse_user) else: dbwarehouse_user.favorite = not dbwarehouse_user.favorite if 'favorites' in self._request.session: del self._request.session['favorites'] # ------------------------------------------------------------------------- def _toggle_in_menu(self, warehouse_id: str): """Toggle the `in_menu`` state of the warehouse. :param str warehouse_id: ID of the warehouse. """ dbsession = self._request.dbsession user_id = self._request.session['user']['user_id'] dbwarehouse_user = dbsession.query(DBWarehouseUser).filter_by( warehouse_id=warehouse_id, user_id=user_id).first() if dbwarehouse_user is None: dbsession.add( DBWarehouseUser( warehouse_id=warehouse_id, user_id=user_id, in_menu=True)) elif dbwarehouse_user.in_menu \ and dbwarehouse_user.file_rights is None \ and dbwarehouse_user.meta_rights is None \ and dbwarehouse_user.favorite is None: dbsession.delete(dbwarehouse_user) else: dbwarehouse_user.in_menu = not dbwarehouse_user.in_menu if 'in_menu' in self._request.session: del self._request.session['in_menu'] # ------------------------------------------------------------------------- def _force_refresh( self, dbwarehouse: DBWarehouse, i_configurator: bool, force: bool): """Call the back end to index and create thumbnails. :type dbwarehouse: .models.dbwarehouse.DBWarehouse :param dbwarehouse: SQLAlchemy object for the warehouse. :param bool i_configurator: ``True`` if the user can configure the warehouse. :param bool force: If ``True``, force refreshing even if the source is older than the index. """ ciowarehouse2 = self._request.registry['modules']['ciowarehouse2'] warehouse = ciowarehouse2.warehouse( self._request, dbwarehouse.warehouse_id) if warehouse is None and (force or not i_configurator): self._request.session.flash(_('Unable to refresh!'), 'alert') return if warehouse is None: warehouse = Warehouse( self._request.registry, dbwarehouse, ciowarehouse2.locations) err = warehouse.refresh( self._request, now=True, force=force, in_thread=force) if err is not None: self._request.session.flash(err, 'alert') else: self._request.session.flash( _('Full rebuild fired…') if force else _('Refreshment done.')) ciowarehouse2.cache_clear(self._request, warehouse.uid) # ------------------------------------------------------------------------- def _save( self, dbwarehouse: DBWarehouse | None, groups: dict, jobs: dict, values: dict) -> DBWarehouse | None: """Save a warehouse configuration. :type dbwarehouse: .models.dbwarehouse.DBWarehouse :param dbwarehouse: Warehouse to save. :param dict groups: A dictionary such as ``{group_id: (label, description),...}``. :param dict jobs: A dictionary such as ``{job_id: (label, description),...}``. :param dict values: Form values. :rtype: :class:`.models.dbwarehouse.DBWarehouse` instance or ``None`` """ creation = dbwarehouse is None dbwarehouse = dbwarehouse or DBWarehouse() # Update warehouse record = { k: values[k] for k in values if k[:4] not in ( # yapf: disable 'fmta', 'fcrd', 'flst', 'fsnp', 'sed:', 'job:', 'usr:', 'ufil', 'umta', 'fav:', 'mnu:', 'grp:', 'gfil', 'gmta') } if not creation: record['warehouse_id'] = dbwarehouse.warehouse_id record['location'] = dbwarehouse.location err = dbwarehouse.record_format(record) if err: self._request.session.flash(err, 'alert') return None record.update({ # yapf: disable k: None for k in values if not values[k] and hasattr(DBWarehouse, k)}) if not record.get('download_max_size'): record['download_max_size'] = 0 for field in record: if getattr(dbwarehouse, field) != record[field]: setattr(dbwarehouse, field, record[field]) # Save if creation: try: self._request.dbsession.add(dbwarehouse) self._request.dbsession.flush() except (IntegrityError, FlushError): self._request.session.flash( _('This warehouse already exists.'), 'alert') return None warehouse = Warehouse( self._request.registry, dbwarehouse, self._request.registry['modules']['ciowarehouse2'].locations) err = warehouse.vcs.clone() if err: self._request.session.flash(err, 'alert') # Update fields self._fields_update(dbwarehouse, values) # Update seeds self._seeds_update(dbwarehouse, values) # Update jobs self._jobs_update(dbwarehouse, jobs, values) # Update users self._users_update(dbwarehouse) # Update groups self._groups_update(dbwarehouse, groups, values) # Clean up session and favorites self._request.registry['modules']['ciowarehouse2'].warehouse_forget( self._request, dbwarehouse.warehouse_id) if 'favorites' in self._request.session: del self._request.session['favorites'] return dbwarehouse # ------------------------------------------------------------------------- def _fields_update(self, dbwarehouse: DBWarehouse, values: dict): """Update warehouse fields: cards, list and metadata. :type dbwarehouse: .models.dbwarehouse.DBWarehouse :param dbwarehouse: Warehouse to save. :param dict values: Form values. """ fields = self._request.registry['fields'] for env in (('cardfields', 'in_cards', 'fcrd', DBWarehouseCardField), ('listfields', 'in_list', 'flst', DBWarehouseListField), ('metafields', 'in_meta', 'fmta', DBWarehouseMetaField)): warehouse_fields = { k.field_id: k for k in getattr(dbwarehouse, env[0]) } position = 0 for field_id in sorted(fields, key=lambda k: fields[k]['position']): if fields[field_id][env[1]] < 0: continue value = values[f'{env[2]}:{field_id}'] if value and field_id not in warehouse_fields: position += 1 if env[0] == 'metafields': dbitem = env[3]( warehouse_id=dbwarehouse.warehouse_id, field_id=field_id, position=position) else: dbitem = env[3]( warehouse_id=dbwarehouse.warehouse_id, field_id=field_id, position=position, classes=fields[field_id]['classes']) self._request.dbsession.add(dbitem) elif not value and field_id in warehouse_fields: self._request.dbsession.delete( self._request.dbsession.query(env[3]).filter_by( warehouse_id=dbwarehouse.warehouse_id, field_id=field_id).first()) elif field_id in warehouse_fields: position += 1 # ------------------------------------------------------------------------- def _seeds_update(self, dbwarehouse: DBWarehouse, values: dict): """Update warehouse seeds. :type dbwarehouse: .models.dbwarehouse.DBWarehouse :param dbwarehouse: Warehouse to save. :param dict values: Form values. """ warehouse_seeds = {k.seed_id: k for k in dbwarehouse.seeds} for seed_id in self._request.registry['seeds']: value = values[f'sed:{seed_id}'] if value and seed_id not in warehouse_seeds: self._request.dbsession.add( DBWarehouseSeed( warehouse_id=dbwarehouse.warehouse_id, seed_id=seed_id)) elif not value and seed_id in warehouse_seeds: self._request.dbsession.delete( self._request.dbsession.query(DBWarehouseSeed).filter_by( warehouse_id=dbwarehouse.warehouse_id, seed_id=seed_id).first()) # ------------------------------------------------------------------------- def _jobs_update(self, dbwarehouse: DBWarehouse, jobs: dict, values: dict): """Update warehouse jobs. :type dbwarehouse: .models.dbwarehouse.DBWarehouse :param dbwarehouse: Warehouse to save. :param dict jobs: A dictionary such as ``{job_id: (label, description),...}``. :param dict values: Form values. """ warehouse_jobs = {k.job_id: k for k in dbwarehouse.jobs} for job_id in jobs: value = values[f'job:{job_id}'] if value and job_id not in warehouse_jobs: self._request.dbsession.add( DBWarehouseJob( warehouse_id=dbwarehouse.warehouse_id, job_id=job_id)) elif not value and job_id in warehouse_jobs: self._request.dbsession.delete( self._request.dbsession.query(DBWarehouseJob).filter_by( warehouse_id=dbwarehouse.warehouse_id, job_id=job_id).first()) # ------------------------------------------------------------------------- def _user_paging( self, action: str, dbwarehouse: DBWarehouse | None = None) -> tuple: """Return a paging object for users. :param str action: Current action. :type dbwarehouse: .models.dbwarhouse.DBWarehouse :param dbwarehouse: (optional) If not ``None``, users are only users of the warehouse. :rtype: tuple :return: A tuple such as ``(user_paging, defaults, user_filter)``. """ # Filter paging_id = 'warehouse_users' ufilter = Filter( self._request, paging_id, ( ('login', _('Login'), False, None), ('last_name', _('Last name'), False, None), ('email', _('Email'), False, None), ( 'status', _('Status'), False, [('', ' ')] + list(DBUser.status_labels.items()))), (('status', '=', 'active'), ), remove=action[:4] == 'crm!' and action[4:] or None) # Database query defaults = Paging.params( self._request, paging_id, '+last_name', default_display='list') dbquery = ufilter.sql( self._request.dbsession.query( DBUser.user_id, DBUser.login, DBUser.first_name, DBUser.last_name, DBUser.honorific, DBUser.email, DBUser.email_hidden, DBUser.status, DBUser.last_login, DBUser.attachments_key, DBUser.picture), 'users') if dbwarehouse is not None: dbquery = dbquery.filter( DBUser.user_id.in_([k.user_id for k in dbwarehouse.users])) oby = getattr(DBUser, defaults['sort'][1:]) dbquery = dbquery.order_by( desc(oby) if defaults['sort'][0] == '-' else oby) return Paging(self._request, paging_id, dbquery, defaults), \ dict(defaults), ufilter # ------------------------------------------------------------------------- def _users_update(self, dbwarehouse: DBWarehouse): """Update the list of warehouse users. :type dbwarehouse: .models.dbwarhouse.DBWarehouse :param dbwarehouse: SQLAlchemy object for the current warehouse. """ values: dict[int, list] = {} for value in self._request.POST: if value[:4] in ('set:', 'usr:', 'fav:', 'mnu:'): user_id = int(value[4:]) value = value[:3] else: continue if user_id not in values: values[user_id] = [] if value != 'set': values[user_id].append(value) for user_id, value_list in values.items(): dbwarehouse_user = self._request.dbsession.query( DBWarehouseUser).filter_by( warehouse_id=dbwarehouse.warehouse_id, user_id=user_id).first() if 'usr' not in value_list and 'fav' not in value_list: if dbwarehouse_user is not None: self._request.dbsession.delete(dbwarehouse_user) continue if dbwarehouse_user is None: dbwarehouse_user = DBWarehouseUser( warehouse_id=dbwarehouse.warehouse_id, user_id=user_id) self._request.dbsession.add(dbwarehouse_user) file_rights = self._request.POST.get(f'ufil:{user_id}') \ if 'usr' in value_list else None file_rights = 'reader' \ if not file_rights and 'usr' in value_list \ else file_rights meta_rights = 'writer' if file_rights == 'writer-admin' else ( self._request.POST.get(f'umta:{user_id}') if file_rights else None) meta_rights = 'reader' if file_rights and not meta_rights \ else meta_rights dbwarehouse_user.file_rights = file_rights or None dbwarehouse_user.meta_rights = meta_rights or None dbwarehouse_user.favorite = 'fav' in value_list dbwarehouse_user.in_menu = 'mnu' in value_list # ------------------------------------------------------------------------- def _groups_update( self, dbwarehouse: DBWarehouse, groups: dict, values: dict): """Update the list of groups. :type dbwarehouse: .models.dbwarhouse.DBWarehouse :param dbwarehouse: SQLAlchemy object for the current warehouse. :param dict groups: A dictionary such as ``{group_id: (label, description),...}``. :param dict values: Form values. """ warehouse_groups = {k.group_id: k for k in dbwarehouse.groups} for group_id in sorted(groups): value = values[f'grp:{group_id}'] if not value: if group_id in warehouse_groups: self._request.dbsession.delete( self._request.dbsession.query( DBWarehouseGroup).filter_by( warehouse_id=dbwarehouse.warehouse_id, group_id=group_id).first()) continue dbwarehouse_group = warehouse_groups.get(group_id) if dbwarehouse_group is None: dbwarehouse_group = DBWarehouseGroup( warehouse_id=dbwarehouse.warehouse_id, group_id=group_id) self._request.dbsession.add(dbwarehouse_group) file_rights = self._request.POST.get( f'gfil:{group_id}') or 'reader' dbwarehouse_group.file_rights = file_rights dbwarehouse_group.meta_rights = \ 'writer' if file_rights == 'writer-admin' else \ self._request.POST.get(f'gmta:{group_id}') or 'reader'