"""SQLAlchemy-powered model definitions for shared files."""
from __future__ import annotations
from datetime import datetime, date
from time import time
from bcrypt import hashpw, gensalt
from lxml import etree
from sqlalchemy import Column, ForeignKey, String, Date, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.orm.session import Session
from sqlalchemy.schema import Index
from pyramid.request import Request
from chrysalio.lib.utils import make_id
from chrysalio.models import DBDeclarativeClass
from chrysalio.models.dbbase import DBBaseClass
from ..relaxng import RELAXNG_CIOWAREHOUSE2
from ..lib.ciopath import CioPath
from ..lib.file_info import info_get_root, info_save, info_remove_sharing
from ..lib.i18n import _
SHARING_CHECK_PERIOD = 86400
# =============================================================================
[docs]
class DBSharing(DBDeclarativeClass, DBBaseClass):
"""SQLAlchemy-powered sharing class."""
suffix = 'cioshr2'
__tablename__ = 'wrh2_sharings'
__table_args__ = {'mysql_engine': 'InnoDB'}
__mapper_args__ = {'confirm_deleted_rows': False}
sharing_id = Column(String(48), primary_key=True)
message = Column(String(255))
password: str | Column = Column(String(64))
expiration = Column(Date)
creation = Column(DateTime, default=datetime.now)
files = relationship('DBSharingFile', cascade='all, delete')
# -------------------------------------------------------------------------
[docs]
def set_password(self, password: str | None):
"""Set the password, possibly hashing it.
:param str password:
Password to set. If it does not begin with ``$``, we use bcrypt
algorithm before setting.
"""
if not password:
return
if not password.startswith('$'):
self.password = hashpw( # yapf: disable
password.encode('utf8'), gensalt()).decode('utf8')
else:
self.password = password
# -------------------------------------------------------------------------
[docs]
def check_password(self, password: str) -> bool:
"""Check the validy of the given password.
:param str password:
Clear password to check.
:rtype: bool
"""
if password and self.password is not None:
expected = self.password.encode('utf8')
return expected == hashpw(password.encode('utf8'), expected)
return not bool(self.password)
# -------------------------------------------------------------------------
[docs]
@classmethod
def xml2db(
cls,
dbsession: Session,
sharing_elt: etree.Element,
error_if_exists: bool = True,
kwargs=None) -> str | None:
"""Load a shared file from a XML element.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type sharing_elt: lxml.etree.Element
:param sharing_elt:
Sharing XML element.
:param bool error_if_exists: (default=True)
It returns an error if sharing already exists.
:param dict kwargs: (optional)
Dictionary of keyword arguments.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None``.
"""
# Check if already exists
sharing_id = make_id(sharing_elt.get('id'), 'token', 48)
dbsharing = dbsession.query(cls).filter_by(
sharing_id=sharing_id).first()
if dbsharing is not None:
if error_if_exists:
return _('Sharing "${i}" already exists.', {'i': sharing_id})
return None
# Create sharing
record = cls.record_from_xml(sharing_id, sharing_elt)
cls.record_format(record)
dbsharing = cls(**record)
# Load files
found = False
namespaces = {'ns': RELAXNG_CIOWAREHOUSE2['namespace']}
warehouse_ids = kwargs.get('warehouse_ids', ()) if kwargs else ()
done = set()
for elt in sharing_elt.xpath('ns:ciopath', namespaces=namespaces):
ciopath = CioPath.from_str(elt.text.strip())
if ciopath.wid not in warehouse_ids:
continue
if ciopath not in done:
done.add(ciopath)
dbsharing.files.append(DBSharingFile(ciopath=str(ciopath)))
found = True
if found:
dbsession.add(dbsharing)
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def record_from_xml(cls, sharing_id: str, sharing_elt: etree.Element):
"""Convert a sharing XML element into a dictionary.
:param str sharing_id:
Sharing ID.
:type sharing_elt: lxml.etree.Element
:param sharing_elt:
Sharing XML element.
:rtype: dict
"""
namespace = RELAXNG_CIOWAREHOUSE2['namespace']
return {
'sharing_id': sharing_id,
'creation': sharing_elt.get('created'),
'message': sharing_elt.findtext(f'{{{namespace}}}message'),
'password': sharing_elt.findtext(f'{{{namespace}}}password'),
'expiration': sharing_elt.findtext(f'{{{namespace}}}expiration')
}
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
[docs]
def db2xml(self, dbsession: Session | None = None):
"""Serialize a sharing to a XML representation.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession: (optional)
SQLAlchemy session.
:rtype: lxml.etree.Element
"""
# pylint: disable = unused-argument
if not self.files:
return None
sharing_elt = etree.Element('sharing')
sharing_elt.set('id', self.sharing_id)
sharing_elt.set('created', self.creation.isoformat().partition('.')[0])
if self.message:
etree.SubElement(sharing_elt, 'message').text = self.message
if self.password:
etree.SubElement(sharing_elt, 'password').text = self.password
if self.expiration:
etree.SubElement(sharing_elt, 'expiration').text = \
self.expiration.isoformat()
for dbfile in self.files:
etree.SubElement(sharing_elt, 'ciopath').text = dbfile.ciopath
return sharing_elt
# -------------------------------------------------------------------------
[docs]
@classmethod
def delete(cls, request: Request, sharing_id: str) -> bool:
"""Delete a sharing.
:type request: pyramid.request.Request
:param request:
Current request.
:param str sharing_id:
ID of the sharing to delete.
:rtype: bool
"""
dbsharing = request.dbsession.query(cls).filter_by(
sharing_id=sharing_id).first()
if dbsharing is None:
return False
# Find files
ciopaths_by_warehouse: dict[str, list[CioPath]] = {}
for dbfile in dbsharing.files:
ciopath = CioPath.from_str(dbfile.ciopath)
if ciopath.wid:
if ciopath.wid not in ciopaths_by_warehouse:
ciopaths_by_warehouse[ciopath.wid] = []
ciopaths_by_warehouse[ciopath.wid].append(ciopath)
request.dbsession.delete(dbsharing)
# Update information files, commit and index
ciowarehouse2 = request.registry['modules']['ciowarehouse2']
for warehouse_id, ciopaths in ciopaths_by_warehouse.items():
warehouse = ciowarehouse2.warehouse(request, warehouse_id)
if warehouse is not None:
for ciopath in ciopaths:
root_elt = info_get_root(warehouse.root, ciopath)
info_remove_sharing(root_elt, sharing_id)
info_save(root_elt, warehouse.root, ciopath, request)
warehouse.vcs.add()
warehouse.commit_and_refresh(
request, ciopaths, _('Sharing removal'), force=True)
return True
# -------------------------------------------------------------------------
[docs]
@classmethod
def purge_expired(cls, request: Request, dbsession: Session):
"""Purge expired sharings.
:type request: pyramid.request.Request
:param request:
Current request.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
"""
if 'sharings' in request.registry and request.registry[
'sharings'] > time():
return
request.registry['sharings'] = time() + SHARING_CHECK_PERIOD
today = date.today()
for sharing_id in dbsession.query(
DBSharing.sharing_id).filter(cls.expiration < today):
cls.delete(request, sharing_id[0])
# =============================================================================
[docs]
class DBSharingFile(DBDeclarativeClass):
"""Class to link sharing token with their files (one-to-many)."""
# pylint: disable = too-few-public-methods
__tablename__ = 'wrh2_sharings_files'
__table_args__ = (
Index('warehouse_id', 'ciopath'), {
'mysql_engine': 'InnoDB'
})
sharing_id = Column(
String(48),
ForeignKey('wrh2_sharings.sharing_id', ondelete='CASCADE'),
primary_key=True)
ciopath = Column(String(255), primary_key=True)