"""View callables to manage warehouses."""
from __future__ import annotations
from os.path import join, exists
from shutil import rmtree
from json import loads
from collections import OrderedDict
from sqlalchemy import desc
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import FlushError
from pyramid.request import Request
from pyramid.response import Response
from pyramid.view import view_config
from pyramid.httpexceptions import HTTPForbidden, HTTPFound, HTTPNotFound
from pyramid.security import NO_PERMISSION_REQUIRED
from chrysalio.lib.utils import make_id
from chrysalio.lib.form import get_action, Form
from chrysalio.lib.log import log_info
from chrysalio.lib.filter import Filter
from chrysalio.lib.paging import PAGE_SIZES, Paging
from chrysalio.lib.attachment import attachment_url, attachment_update
from chrysalio.lib.tabset import Tabset
from chrysalio.includes.themes import theme_static_prefix
from chrysalio.views import BaseView
from chrysalio.lib.restful import restful_login
from chrysalio.models.dbuser import DBUser
from chrysalio.models.dbgroup import DBGroup
from chrysalio.models.populate import web2db, db2web
from cioservice.models.dbjob import DBJob
from ..lib.i18n import _
from ..lib.ciopath import CioPath
from ..lib.utils import warehouses_favorite, warehouses_in_menu
from ..lib.warehouse import Warehouse
from ..relaxng import RELAXNG_CIOWAREHOUSE2
from ..models.populate import xml2db
from ..models.dbwarehouse import WAREHOUSE_ACCESSES, DBWarehouseCardField
from ..models.dbwarehouse import DBWarehouseListField, DBWarehouseMetaField
from ..models.dbwarehouse import DBWarehouseJob, DBWarehouseSeed
from ..models.dbwarehouse import DBWarehouse, DBWarehouseUser, DBWarehouseGroup
# =============================================================================
[docs]
class WarehouseView(BaseView):
"""Class to manage warehouse views.
:type request: pyramid.request.Request
:param request:
Current request.
"""
# -------------------------------------------------------------------------
def __init__(self, request: Request):
"""Constructor method."""
super().__init__(request)
if 'modules_off' not in self._request.registry or \
'ciowarehouse2' in self._request.registry['modules_off']:
raise HTTPForbidden(
comment=_('The module "CioWarehouse2" is not activated.'))
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='warehouse_index',
renderer='ciowarehouse2:Templates/warehouse_index.pt',
permission='warehouse-view')
@view_config(route_name='warehouse_index', renderer='json', xhr=True)
def index(self) -> dict | Response:
"""List all authorized warehouses."""
# Ajax
i_creator = self._request.has_permission('warehouse-create') \
or self._request.has_permission('warehouse-configure')
if self._request.is_xhr:
if i_creator:
self._import_warehouses()
return {}
# Action
action, items = self._index_action(i_creator)
if action[:4] == 'exp!':
response = self._warehouses2response(items)
if response:
return response
# Filter
paging_id = 'warehouse_index'
if i_creator:
pfilter = Filter(
self._request,
paging_id, (
('i18n_label', _('Label'), False, ''),
('warehouse_id', _('Identifier'), False, ''), (
'access', _('Access'), False,
[('', ' ')] + list(WAREHOUSE_ACCESSES.items()))),
remove=action[:4] == 'crm!' and action[4:] or None)
else:
pfilter = Filter(
self._request,
paging_id, (
('i18n_label', _('Label'), False, ''),
('warehouse_id', _('Identifier'), False, '')),
remove=action[:4] == 'crm!' and action[4:] or None)
# Favorites and "in_menu"
if 'ciowarehouse2' not in self._request.session:
self._request.session['ciowarehouse2'] = {}
if self._request.POST:
self._request.session['ciowarehouse2']['favorite_only'] = bool(
self._request.POST.get('favorite'))
favorites = warehouses_favorite(self._request)
in_menu = warehouses_in_menu(self._request)
# Paging
defaults = Paging.params(self._request, paging_id, '+warehouse_id')
defaults['favorite'] = favorites and \
self._request.session['ciowarehouse2'].get('favorite_only', True)
dbquery = pfilter.sql(
self._request.dbsession.query(DBWarehouse), 'wrh2_warehouses')
if not i_creator:
dbquery = dbquery.filter(
DBWarehouse.warehouse_id.in_(self._my_warehouse_ids()))
if defaults['favorite']:
dbquery = dbquery.filter(DBWarehouse.warehouse_id.in_(favorites))
oby = getattr(DBWarehouse, defaults['sort'][1:])
dbquery = dbquery.order_by(
desc(oby) if defaults['sort'][0] == '-' else oby)
paging = Paging(self._request, paging_id, dbquery, defaults)
paging.set_current_ids('warehouse_id')
if not defaults['favorite'] and pfilter.is_empty() \
and not self._request.has_permission('warehouse-edit') \
and not self._request.has_permission('warehouse-configure') \
and len(paging) == 1:
return HTTPFound(
self._request.route_path(
'browse', ciopath=paging[0].warehouse_id))
# Form & completed action
form = Form(self._request, defaults=defaults)
form.forget('filter_value')
if action and action[3] == '!':
action = ''
# Breadcrumbs & documentation
self._request.breadcrumbs(_('Warehouses'), 1)
self._request.documentation = '/warehouse/index'
return { # yapf: disable
'action': action,
'items': items,
'form': form,
'pfilter': pfilter,
'paging': paging,
'favorites': favorites,
'favorite_only': defaults['favorite'],
'in_menu': in_menu,
'i_creator': i_creator,
'i_editor': self._request.has_permission('warehouse-edit'),
'PAGE_SIZES': PAGE_SIZES,
'attachment_url': attachment_url,
'has_attachments': bool(
self._request.registry.settings.get('attachments')),
'download_max_size': self._request.registry[
'settings']['download-max-size']}
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='warehouse_index_filter', renderer='json', xhr=True)
def index_filter(self) -> dict:
"""Return a dictionary to autocomplete a filter field."""
return Filter.sql_autocomplete(self._request, DBWarehouse)
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='warehouse_view',
renderer='ciowarehouse2:Templates/warehouse_view.pt',
permission='warehouse-view')
def view(self) -> dict:
"""Show warehouse configuration."""
# Warehouse
i_creator = self._request.has_permission('warehouse-create')
i_editor = self._request.has_permission('warehouse-edit')
i_configurator = self._request.has_permission('warehouse-configure')
dbwarehouse = self._get_dbwarehouse(i_creator or i_configurator)
picture = self._request.registry.settings.get('attachments') and (
attachment_url(
self._request, dbwarehouse.attachments_dir,
dbwarehouse.attachments_key, dbwarehouse.picture)
or '{0}/ciowarehouse2/images/warehouse_picture.png'.format(
theme_static_prefix(self._request)))
# Action
action = get_action(self._request)[0]
if action == 'exp!':
action = self._warehouses2response((dbwarehouse.warehouse_id, ))
if action:
return action
elif action == 'bld!' and i_creator:
self._force_refresh(dbwarehouse, i_configurator, True)
elif action == 'rfh!' and i_editor:
self._force_refresh(dbwarehouse, i_configurator, False)
# User paging
user_paging, defaults, user_filter = self._user_paging(
action, dbwarehouse)
# Form
form = Form(self._request, defaults=defaults)
form.forget('filter_value')
# Breadcrumbs & documentation
label = dbwarehouse.label(self._request)
self._request.breadcrumbs(
_('Warehouse "${l}"', {'l': label}),
replace=self._request.route_path(
'warehouse_edit', warehouse_id=dbwarehouse.warehouse_id))
self._request.documentation = '/admin-warehouse/view'
return { # yapf: disable
'form': form,
'label': label,
'user_filter': user_filter,
'user_paging': user_paging,
'tabset': Tabset(
self._request, 'tabWarehouse',
dbwarehouse.settings_tabs(self._request)),
'dbwarehouse': dbwarehouse,
'picture': picture,
'navigator': Paging.navigator(
self._request, 'warehouse_index', dbwarehouse.warehouse_id,
self._request.route_path(
'warehouse_view', warehouse_id='_ID_')),
'i_editor': i_editor,
'i_creator': i_creator
}
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='warehouse_create',
renderer='ciowarehouse2:Templates/warehouse_edit.pt',
permission='warehouse-edit')
@view_config(
route_name='warehouse_edit',
renderer='ciowarehouse2:Templates/warehouse_edit.pt',
permission='warehouse-edit')
@view_config(
route_name='warehouse_edit',
renderer='json',
xhr=True,
permission='warehouse-edit')
def edit(self) -> dict:
"""Create or edit a warehouse."""
# Rights
i_creator = self._request.has_permission('warehouse-create')
i_configurator = self._request.has_permission('warehouse-configure')
dbwarehouse = self._get_dbwarehouse(i_creator or i_configurator) \
if 'warehouse_id' in self._request.matchdict else None
if dbwarehouse is None and not i_creator and not i_configurator:
raise HTTPForbidden()
# Ajax
if self._request.is_xhr:
if dbwarehouse is not None:
dbwarehouse.attachments_key, dbwarehouse.picture = \
attachment_update(
self._request, dbwarehouse.attachments_dir,
dbwarehouse.attachments_key,
self._request.POST['picture'],
replace=dbwarehouse.picture,
prefix=str(dbwarehouse.warehouse_id)[:12])
log_info(
self._request, 'warehouse_update_picture',
dbwarehouse.warehouse_id)
return {}
# User paging
action = get_action(self._request)[0]
user_paging, defaults, user_filter = self._user_paging(action)
# Form
jobs = OrderedDict([ # yapf: disable
(k.job_id, (k.label(self._request), k.description(self._request)))
for k in self._request.dbsession.query(
DBJob).order_by(desc('priority'))
if not k.areas or
'ciowarehouse2.browse' in [i.area_id for i in k.areas]])
groups = {
k.group_id: (k.label(self._request), k.description(self._request))
for k in self._request.dbsession.query(DBGroup)
}
form = Form(
self._request,
*DBWarehouse.settings_schema(
self._request, defaults, groups, jobs, dbwarehouse),
obj=dbwarehouse,
force_defaults=dbwarehouse is not None)
form.forget('filter_value')
# Action
if action == 'pct!' and dbwarehouse is not None:
dbwarehouse.attachments_key, dbwarehouse.picture = \
attachment_update(
self._request, dbwarehouse.attachments_dir,
dbwarehouse.attachments_key, self._request.POST['picture'],
replace=dbwarehouse.picture,
prefix=str(dbwarehouse.warehouse_id)[:12])
log_info(
self._request, 'warehouse_update_picture',
dbwarehouse.warehouse_id)
elif action == 'sav!' and form.validate():
dbwarehouse = self._save(dbwarehouse, groups, jobs, form.values)
if dbwarehouse is not None:
if 'warehouse_id' not in self._request.matchdict:
self._request.breadcrumbs.pop()
log_info(
self._request, 'warehouse_id' in self._request.matchdict
and 'warehouse_edit' or 'warehouse_create',
dbwarehouse.warehouse_id)
return HTTPFound(
self._request.route_path(
'warehouse_view',
warehouse_id=dbwarehouse.warehouse_id))
if form.has_error():
self._request.session.flash(_('Correct errors.'), 'alert')
# Picture
if dbwarehouse and self._request.registry.settings.get('attachments'):
picture = attachment_url(
self._request, dbwarehouse.attachments_dir,
dbwarehouse.attachments_key, dbwarehouse.picture) or \
'{0}/ciowarehouse2/images/warehouse_picture.png'.format(
theme_static_prefix(self._request))
else:
picture = None
# Breadcrumbs & documentation
label = dbwarehouse and dbwarehouse.label(self._request)
if not dbwarehouse:
self._request.breadcrumbs(_('Warehouse Creation'))
else:
self._request.breadcrumbs(
_('Warehouse "${l}" Edition', {'l': label}),
replace=self._request.route_path(
'warehouse_view', warehouse_id=dbwarehouse.warehouse_id))
self._request.documentation = '/admin-warehouse/edit'
return { # yapf:disable
'form': form,
'dbwarehouse': dbwarehouse or DBWarehouse,
'action': action,
'picture': picture,
'label': label,
'user_filter': user_filter,
'user_paging': user_paging,
'groups': groups,
'jobs': jobs,
'tabset': Tabset(
self._request, 'tabWarehouse',
DBWarehouse.settings_tabs(self._request))
}
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='warehouse_refresh',
renderer='json',
permission=NO_PERMISSION_REQUIRED,
require_csrf=False)
def refresh(self) -> dict:
"""Launch a refresh action."""
# Find files to refresh by warehouse
ciopaths_by_warehouse: dict[str | None, list[CioPath]] = {}
for target in loads(self._request.params.get('ciopaths', '[]')):
ciopath = CioPath.from_str(target)
if ciopath.wid not in ciopaths_by_warehouse:
ciopaths_by_warehouse[ciopath.wid] = [ciopath]
else:
ciopaths_by_warehouse[ciopath.wid].append(ciopath)
# Refresh each warehouse
for warehouse_id, ciopaths in ciopaths_by_warehouse.items():
warehouse, err = self._restful_warehouse(warehouse_id)
if warehouse is None or err is not None:
return {'error': err}
err = warehouse.refresh(
self._request,
ciopaths=ciopaths,
now=self._request.params.get('now') == 'True',
force=self._request.params.get('force') == 'True',
in_thread=self._request.params.get('in_thread') == 'True')
if err is not None:
return {'error': self._request.localizer.translate(err)}
return {'error': None}
# -------------------------------------------------------------------------
def _index_action(self, i_creator: bool) -> tuple[str, tuple]:
"""Execute actions for index view.
:param bool i_creator:
``True`` if the user can create a warehouse.
:rtype: tuple
:return:
A tuple such as ``(action, items)``.
"""
action, items = get_action(self._request)
if action == 'imp!' and i_creator:
self._import_warehouses()
elif action[:4] == 'del!' and i_creator:
self._delete_warehouses(items)
elif action[:4] == 'fav!':
self._toggle_favorite(action[4:])
elif action[:4] == 'mnu!':
self._toggle_in_menu(action[4:])
return action, items
# -------------------------------------------------------------------------
def _my_warehouse_ids(self) -> set:
"""Return a list of authorized warehouse IDs.
:rtype: set
"""
user_id = self._request.session['user']['user_id']
warehouse_ids = {
k[0]
for k in self._request.dbsession.query(DBWarehouse.warehouse_id).
filter(DBWarehouse.access.in_(('free', 'readonly')))
}
warehouse_ids |= {
k[0]
for k in self._request.dbsession.query(
DBWarehouseUser.warehouse_id).filter_by(
user_id=user_id).filter(
DBWarehouseUser.file_rights.isnot(None))
}
warehouse_ids |= {
k[0]
for k in self._request.dbsession.query(
DBWarehouseGroup.warehouse_id).filter(
DBWarehouseGroup.group_id.in_(
self._request.session['user']['groups']))
}
return warehouse_ids
# -------------------------------------------------------------------------
def _get_dbwarehouse(self, i_creator: bool) -> DBWarehouse:
"""Return the SqlAlchemy object of the selected warehouse or raise
an HTTPNotFound exception.
:param bool i_creator:
``True`` if the user can create a warehouse.
:rtype: .models.dbwarehouse.DBWarehouse
"""
dbwarehouse = self._request.dbsession.query(DBWarehouse).filter_by(
warehouse_id=self._request.matchdict['warehouse_id']).first()
if dbwarehouse is None:
raise HTTPNotFound()
if not i_creator and \
dbwarehouse.warehouse_id not in self._my_warehouse_ids():
raise HTTPForbidden()
return dbwarehouse
# -------------------------------------------------------------------------
def _delete_warehouses(self, warehouse_ids: list):
"""Delete warehouses.
:param list warehouse_ids:
List of warehouse IDs to delete.
"""
deleted = []
attachments = self._request.registry.settings.get('attachments')
ciowarehouse2 = self._request.registry['modules']['ciowarehouse2']
for dbwarehouse in self._request.dbsession.query(DBWarehouse).filter(
DBWarehouse.warehouse_id.in_(warehouse_ids)):
# Remove files
warehouse = ciowarehouse2.warehouse(
self._request, dbwarehouse.warehouse_id)
if warehouse is not None:
ciowarehouse2.backend.delete(warehouse)
ciowarehouse2.warehouse_forget(
self._request, dbwarehouse.warehouse_id)
if exists(warehouse.root):
rmtree(warehouse.root)
# Remove attachments
if attachments and dbwarehouse.attachments_key:
attachment = join(
attachments, dbwarehouse.attachments_dir,
dbwarehouse.attachments_key)
if exists(attachment):
rmtree(attachment)
# Remove from database
deleted.append(dbwarehouse.warehouse_id)
self._request.dbsession.delete(dbwarehouse)
if deleted:
log_info(self._request, 'warehouse_delete', ' '.join(deleted))
# -------------------------------------------------------------------------
def _import_warehouses(self):
"""Import warehouses."""
# Get current IDs
warehouse_ids = {
k[0]
for k in self._request.dbsession.query(DBWarehouse.warehouse_id)
}
# Update database
web2db(self._request, xml2db, 'warehouse', relaxngs={ # yapf: disable
'{{{0}}}{1}'.format(
RELAXNG_CIOWAREHOUSE2['namespace'],
RELAXNG_CIOWAREHOUSE2['root']):
RELAXNG_CIOWAREHOUSE2['file']})
# Get new IDs
warehouse_ids = {
k[0]
for k in self._request.dbsession.query(DBWarehouse.warehouse_id)
} - warehouse_ids
if not warehouse_ids:
return
log_info(self._request, 'warehouse_import', ' '.join(warehouse_ids))
# Clone warehouses
ciowarehouse2 = self._request.registry['modules']['ciowarehouse2']
for warehouse_id in warehouse_ids:
dbwarehouse = self._request.dbsession.query(DBWarehouse).filter_by(
warehouse_id=warehouse_id).first()
if dbwarehouse is None:
continue
warehouse = Warehouse(
self._request.registry, dbwarehouse, ciowarehouse2.locations)
error = warehouse.vcs.clone()
if error:
self._request.session.flash(error, 'alert')
else:
ciowarehouse2.backend.index(warehouse)
# -------------------------------------------------------------------------
def _warehouses2response(self, warehouse_ids: tuple) -> Response | None:
"""Export warehouses as an XML file embedded in a Pyramid response.
:param tuple warehouse_ids:
List of warehouse IDs to export.
:rtype: :class:`pyramid.response.Response` or ``''``
"""
dbitems = tuple(
self._request.dbsession.query(DBWarehouse).filter(
DBWarehouse.warehouse_id.in_(warehouse_ids)).order_by(
'warehouse_id'))
if not dbitems:
return ''
filename = '{0}.{1}.xml'.format(
len(dbitems) == 1 and dbitems[0].warehouse_id
or make_id(self._request.registry['settings']['title'], 'token'),
DBWarehouse.suffix)
log_info(
self._request, 'warehouse_export',
' '.join([k.warehouse_id for k in dbitems]))
return db2web(self._request, dbitems, filename, RELAXNG_CIOWAREHOUSE2)
# -------------------------------------------------------------------------
def _restful_warehouse(
self,
warehouse_id: str | None = None
) -> tuple[Warehouse | None, str | None]:
"""Return a warehouse called by a restful request.
:param str warehouse_id:
Possible warehouse ID.
:rtype: tuple
:return:
A tuple like ``(warehouse, error)``.
"""
ciowarehouse2 = self._request.registry['modules']['ciowarehouse2']
err = restful_login(
self._request, ciowarehouse2.restful.get('key', '-'),
ciowarehouse2.restful.get('token_ttl'))
if err is not None:
return None, err
if warehouse_id is None:
warehouse_id = self._request.matchdict['warehouse_id']
warehouse = ciowarehouse2.warehouse(self._request, warehouse_id)
if warehouse is None:
return None, self._request.localizer.translate(
_('Unknown warehouse "${w}"', {'w': warehouse_id}))
return warehouse, None
# -------------------------------------------------------------------------
def _toggle_favorite(self, warehouse_id: str):
"""Toggle the favorite state of the warehouse.
:param str warehouse_id:
ID of the warehouse.
"""
dbsession = self._request.dbsession
user_id = self._request.session['user']['user_id']
dbwarehouse_user = dbsession.query(DBWarehouseUser).filter_by(
warehouse_id=warehouse_id, user_id=user_id).first()
if dbwarehouse_user is None:
dbsession.add(
DBWarehouseUser(
warehouse_id=warehouse_id, user_id=user_id, favorite=True))
elif dbwarehouse_user.favorite \
and dbwarehouse_user.file_rights is None \
and dbwarehouse_user.meta_rights is None \
and dbwarehouse_user.in_menu is None:
dbsession.delete(dbwarehouse_user)
else:
dbwarehouse_user.favorite = not dbwarehouse_user.favorite
if 'favorites' in self._request.session:
del self._request.session['favorites']
# -------------------------------------------------------------------------
def _toggle_in_menu(self, warehouse_id: str):
"""Toggle the `in_menu`` state of the warehouse.
:param str warehouse_id:
ID of the warehouse.
"""
dbsession = self._request.dbsession
user_id = self._request.session['user']['user_id']
dbwarehouse_user = dbsession.query(DBWarehouseUser).filter_by(
warehouse_id=warehouse_id, user_id=user_id).first()
if dbwarehouse_user is None:
dbsession.add(
DBWarehouseUser(
warehouse_id=warehouse_id, user_id=user_id, in_menu=True))
elif dbwarehouse_user.in_menu \
and dbwarehouse_user.file_rights is None \
and dbwarehouse_user.meta_rights is None \
and dbwarehouse_user.favorite is None:
dbsession.delete(dbwarehouse_user)
else:
dbwarehouse_user.in_menu = not dbwarehouse_user.in_menu
if 'in_menu' in self._request.session:
del self._request.session['in_menu']
# -------------------------------------------------------------------------
def _force_refresh(
self, dbwarehouse: DBWarehouse, i_configurator: bool, force: bool):
"""Call the back end to index and create thumbnails.
:type dbwarehouse: .models.dbwarehouse.DBWarehouse
:param dbwarehouse:
SQLAlchemy object for the warehouse.
:param bool i_configurator:
``True`` if the user can configure the warehouse.
:param bool force:
If ``True``, force refreshing even if the source is older than the
index.
"""
ciowarehouse2 = self._request.registry['modules']['ciowarehouse2']
warehouse = ciowarehouse2.warehouse(
self._request, dbwarehouse.warehouse_id)
if warehouse is None and (force or not i_configurator):
self._request.session.flash(_('Unable to refresh!'), 'alert')
return
if warehouse is None:
warehouse = Warehouse(
self._request.registry, dbwarehouse, ciowarehouse2.locations)
err = warehouse.refresh(
self._request, now=True, force=force, in_thread=force)
if err is not None:
self._request.session.flash(err, 'alert')
else:
self._request.session.flash(
_('Full rebuild fired…') if force else _('Refreshment done.'))
ciowarehouse2.cache_clear(self._request, warehouse.uid)
# -------------------------------------------------------------------------
def _save(
self, dbwarehouse: DBWarehouse | None, groups: dict, jobs: dict,
values: dict) -> DBWarehouse | None:
"""Save a warehouse configuration.
:type dbwarehouse: .models.dbwarehouse.DBWarehouse
:param dbwarehouse:
Warehouse to save.
:param dict groups:
A dictionary such as ``{group_id: (label, description),...}``.
:param dict jobs:
A dictionary such as ``{job_id: (label, description),...}``.
:param dict values:
Form values.
:rtype: :class:`.models.dbwarehouse.DBWarehouse` instance or ``None``
"""
creation = dbwarehouse is None
dbwarehouse = dbwarehouse or DBWarehouse()
# Update warehouse
record = {
k: values[k]
for k in values if k[:4] not in ( # yapf: disable
'fmta', 'fcrd', 'flst', 'fsnp', 'sed:', 'job:', 'usr:',
'ufil', 'umta', 'fav:', 'mnu:', 'grp:', 'gfil', 'gmta')
}
if not creation:
record['warehouse_id'] = dbwarehouse.warehouse_id
record['location'] = dbwarehouse.location
err = dbwarehouse.record_format(record)
if err:
self._request.session.flash(err, 'alert')
return None
record.update({ # yapf: disable
k: None
for k in values if not values[k] and hasattr(DBWarehouse, k)})
if not record.get('download_max_size'):
record['download_max_size'] = 0
for field in record:
if getattr(dbwarehouse, field) != record[field]:
setattr(dbwarehouse, field, record[field])
# Save
if creation:
try:
self._request.dbsession.add(dbwarehouse)
self._request.dbsession.flush()
except (IntegrityError, FlushError):
self._request.session.flash(
_('This warehouse already exists.'), 'alert')
return None
warehouse = Warehouse(
self._request.registry, dbwarehouse,
self._request.registry['modules']['ciowarehouse2'].locations)
err = warehouse.vcs.clone()
if err:
self._request.session.flash(err, 'alert')
# Update fields
self._fields_update(dbwarehouse, values)
# Update seeds
self._seeds_update(dbwarehouse, values)
# Update jobs
self._jobs_update(dbwarehouse, jobs, values)
# Update users
self._users_update(dbwarehouse)
# Update groups
self._groups_update(dbwarehouse, groups, values)
# Clean up session and favorites
self._request.registry['modules']['ciowarehouse2'].warehouse_forget(
self._request, dbwarehouse.warehouse_id)
if 'favorites' in self._request.session:
del self._request.session['favorites']
return dbwarehouse
# -------------------------------------------------------------------------
def _fields_update(self, dbwarehouse: DBWarehouse, values: dict):
"""Update warehouse fields: cards, list and metadata.
:type dbwarehouse: .models.dbwarehouse.DBWarehouse
:param dbwarehouse:
Warehouse to save.
:param dict values:
Form values.
"""
fields = self._request.registry['fields']
for env in (('cardfields', 'in_cards', 'fcrd', DBWarehouseCardField),
('listfields', 'in_list', 'flst', DBWarehouseListField),
('metafields', 'in_meta', 'fmta', DBWarehouseMetaField)):
warehouse_fields = {
k.field_id: k
for k in getattr(dbwarehouse, env[0])
}
position = 0
for field_id in sorted(fields,
key=lambda k: fields[k]['position']):
if fields[field_id][env[1]] < 0:
continue
value = values[f'{env[2]}:{field_id}']
if value and field_id not in warehouse_fields:
position += 1
if env[0] == 'metafields':
dbitem = env[3](
warehouse_id=dbwarehouse.warehouse_id,
field_id=field_id,
position=position)
else:
dbitem = env[3](
warehouse_id=dbwarehouse.warehouse_id,
field_id=field_id,
position=position,
classes=fields[field_id]['classes'])
self._request.dbsession.add(dbitem)
elif not value and field_id in warehouse_fields:
self._request.dbsession.delete(
self._request.dbsession.query(env[3]).filter_by(
warehouse_id=dbwarehouse.warehouse_id,
field_id=field_id).first())
elif field_id in warehouse_fields:
position += 1
# -------------------------------------------------------------------------
def _seeds_update(self, dbwarehouse: DBWarehouse, values: dict):
"""Update warehouse seeds.
:type dbwarehouse: .models.dbwarehouse.DBWarehouse
:param dbwarehouse:
Warehouse to save.
:param dict values:
Form values.
"""
warehouse_seeds = {k.seed_id: k for k in dbwarehouse.seeds}
for seed_id in self._request.registry['seeds']:
value = values[f'sed:{seed_id}']
if value and seed_id not in warehouse_seeds:
self._request.dbsession.add(
DBWarehouseSeed(
warehouse_id=dbwarehouse.warehouse_id,
seed_id=seed_id))
elif not value and seed_id in warehouse_seeds:
self._request.dbsession.delete(
self._request.dbsession.query(DBWarehouseSeed).filter_by(
warehouse_id=dbwarehouse.warehouse_id,
seed_id=seed_id).first())
# -------------------------------------------------------------------------
def _jobs_update(self, dbwarehouse: DBWarehouse, jobs: dict, values: dict):
"""Update warehouse jobs.
:type dbwarehouse: .models.dbwarehouse.DBWarehouse
:param dbwarehouse:
Warehouse to save.
:param dict jobs:
A dictionary such as ``{job_id: (label, description),...}``.
:param dict values:
Form values.
"""
warehouse_jobs = {k.job_id: k for k in dbwarehouse.jobs}
for job_id in jobs:
value = values[f'job:{job_id}']
if value and job_id not in warehouse_jobs:
self._request.dbsession.add(
DBWarehouseJob(
warehouse_id=dbwarehouse.warehouse_id, job_id=job_id))
elif not value and job_id in warehouse_jobs:
self._request.dbsession.delete(
self._request.dbsession.query(DBWarehouseJob).filter_by(
warehouse_id=dbwarehouse.warehouse_id,
job_id=job_id).first())
# -------------------------------------------------------------------------
def _user_paging(
self,
action: str,
dbwarehouse: DBWarehouse | None = None) -> tuple:
"""Return a paging object for users.
:param str action:
Current action.
:type dbwarehouse: .models.dbwarhouse.DBWarehouse
:param dbwarehouse: (optional)
If not ``None``, users are only users of the warehouse.
:rtype: tuple
:return:
A tuple such as ``(user_paging, defaults, user_filter)``.
"""
# Filter
paging_id = 'warehouse_users'
ufilter = Filter(
self._request,
paging_id, (
('login', _('Login'), False, None),
('last_name', _('Last name'), False, None),
('email', _('Email'), False, None), (
'status', _('Status'), False,
[('', ' ')] + list(DBUser.status_labels.items()))),
(('status', '=', 'active'), ),
remove=action[:4] == 'crm!' and action[4:] or None)
# Database query
defaults = Paging.params(
self._request, paging_id, '+last_name', default_display='list')
dbquery = ufilter.sql(
self._request.dbsession.query(
DBUser.user_id, DBUser.login, DBUser.first_name,
DBUser.last_name, DBUser.honorific, DBUser.email,
DBUser.email_hidden, DBUser.status, DBUser.last_login,
DBUser.attachments_key, DBUser.picture), 'users')
if dbwarehouse is not None:
dbquery = dbquery.filter(
DBUser.user_id.in_([k.user_id for k in dbwarehouse.users]))
oby = getattr(DBUser, defaults['sort'][1:])
dbquery = dbquery.order_by(
desc(oby) if defaults['sort'][0] == '-' else oby)
return Paging(self._request, paging_id, dbquery, defaults), \
dict(defaults), ufilter
# -------------------------------------------------------------------------
def _users_update(self, dbwarehouse: DBWarehouse):
"""Update the list of warehouse users.
:type dbwarehouse: .models.dbwarhouse.DBWarehouse
:param dbwarehouse:
SQLAlchemy object for the current warehouse.
"""
values: dict[int, list] = {}
for value in self._request.POST:
if value[:4] in ('set:', 'usr:', 'fav:', 'mnu:'):
user_id = int(value[4:])
value = value[:3]
else:
continue
if user_id not in values:
values[user_id] = []
if value != 'set':
values[user_id].append(value)
for user_id, value_list in values.items():
dbwarehouse_user = self._request.dbsession.query(
DBWarehouseUser).filter_by(
warehouse_id=dbwarehouse.warehouse_id,
user_id=user_id).first()
if 'usr' not in value_list and 'fav' not in value_list:
if dbwarehouse_user is not None:
self._request.dbsession.delete(dbwarehouse_user)
continue
if dbwarehouse_user is None:
dbwarehouse_user = DBWarehouseUser(
warehouse_id=dbwarehouse.warehouse_id, user_id=user_id)
self._request.dbsession.add(dbwarehouse_user)
file_rights = self._request.POST.get(f'ufil:{user_id}') \
if 'usr' in value_list else None
file_rights = 'reader' \
if not file_rights and 'usr' in value_list \
else file_rights
meta_rights = 'writer' if file_rights == 'writer-admin' else (
self._request.POST.get(f'umta:{user_id}')
if file_rights else None)
meta_rights = 'reader' if file_rights and not meta_rights \
else meta_rights
dbwarehouse_user.file_rights = file_rights or None
dbwarehouse_user.meta_rights = meta_rights or None
dbwarehouse_user.favorite = 'fav' in value_list
dbwarehouse_user.in_menu = 'mnu' in value_list
# -------------------------------------------------------------------------
def _groups_update(
self, dbwarehouse: DBWarehouse, groups: dict, values: dict):
"""Update the list of groups.
:type dbwarehouse: .models.dbwarhouse.DBWarehouse
:param dbwarehouse:
SQLAlchemy object for the current warehouse.
:param dict groups:
A dictionary such as ``{group_id: (label, description),...}``.
:param dict values:
Form values.
"""
warehouse_groups = {k.group_id: k for k in dbwarehouse.groups}
for group_id in sorted(groups):
value = values[f'grp:{group_id}']
if not value:
if group_id in warehouse_groups:
self._request.dbsession.delete(
self._request.dbsession.query(
DBWarehouseGroup).filter_by(
warehouse_id=dbwarehouse.warehouse_id,
group_id=group_id).first())
continue
dbwarehouse_group = warehouse_groups.get(group_id)
if dbwarehouse_group is None:
dbwarehouse_group = DBWarehouseGroup(
warehouse_id=dbwarehouse.warehouse_id, group_id=group_id)
self._request.dbsession.add(dbwarehouse_group)
file_rights = self._request.POST.get(
f'gfil:{group_id}') or 'reader'
dbwarehouse_group.file_rights = file_rights
dbwarehouse_group.meta_rights = \
'writer' if file_rights == 'writer-admin' else \
self._request.POST.get(f'gmta:{group_id}') or 'reader'