Source code for ciowarehouse2.models.dbsharing

"""SQLAlchemy-powered model definitions for shared files."""

from __future__ import annotations
from datetime import datetime, date
from time import time
from bcrypt import hashpw, gensalt

from lxml import etree
from sqlalchemy import Column, ForeignKey, String, Date, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.orm.session import Session
from sqlalchemy.schema import Index

from pyramid.request import Request

from chrysalio.lib.utils import make_id
from chrysalio.models import DBDeclarativeClass
from chrysalio.models.dbbase import DBBaseClass
from ..relaxng import RELAXNG_CIOWAREHOUSE2
from ..lib.ciopath import CioPath
from ..lib.file_info import info_get_root, info_save, info_remove_sharing
from ..lib.i18n import _

SHARING_CHECK_PERIOD = 86400


# =============================================================================
[docs] class DBSharing(DBDeclarativeClass, DBBaseClass): """SQLAlchemy-powered sharing class.""" suffix = 'cioshr2' __tablename__ = 'wrh2_sharings' __table_args__ = {'mysql_engine': 'InnoDB'} __mapper_args__ = {'confirm_deleted_rows': False} sharing_id = Column(String(48), primary_key=True) message = Column(String(255)) password: str | Column = Column(String(64)) expiration = Column(Date) creation = Column(DateTime, default=datetime.now) files = relationship('DBSharingFile', cascade='all, delete') # -------------------------------------------------------------------------
[docs] def set_password(self, password: str | None): """Set the password, possibly hashing it. :param str password: Password to set. If it does not begin with ``$``, we use bcrypt algorithm before setting. """ if not password: return if not password.startswith('$'): self.password = hashpw( # yapf: disable password.encode('utf8'), gensalt()).decode('utf8') else: self.password = password
# -------------------------------------------------------------------------
[docs] def check_password(self, password: str) -> bool: """Check the validy of the given password. :param str password: Clear password to check. :rtype: bool """ if password and self.password is not None: expected = self.password.encode('utf8') return expected == hashpw(password.encode('utf8'), expected) return not bool(self.password)
# -------------------------------------------------------------------------
[docs] @classmethod def xml2db( cls, dbsession: Session, sharing_elt: etree.Element, error_if_exists: bool = True, kwargs=None) -> str | None: """Load a shared file from a XML element. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type sharing_elt: lxml.etree.Element :param sharing_elt: Sharing XML element. :param bool error_if_exists: (default=True) It returns an error if sharing already exists. :param dict kwargs: (optional) Dictionary of keyword arguments. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None``. """ # Check if already exists sharing_id = make_id(sharing_elt.get('id'), 'token', 48) dbsharing = dbsession.query(cls).filter_by( sharing_id=sharing_id).first() if dbsharing is not None: if error_if_exists: return _('Sharing "${i}" already exists.', {'i': sharing_id}) return None # Create sharing record = cls.record_from_xml(sharing_id, sharing_elt) cls.record_format(record) dbsharing = cls(**record) # Load files found = False namespaces = {'ns': RELAXNG_CIOWAREHOUSE2['namespace']} warehouse_ids = kwargs.get('warehouse_ids', ()) if kwargs else () done = set() for elt in sharing_elt.xpath('ns:ciopath', namespaces=namespaces): ciopath = CioPath.from_str(elt.text.strip()) if ciopath.wid not in warehouse_ids: continue if ciopath not in done: done.add(ciopath) dbsharing.files.append(DBSharingFile(ciopath=str(ciopath))) found = True if found: dbsession.add(dbsharing) return None
# -------------------------------------------------------------------------
[docs] @classmethod def record_from_xml(cls, sharing_id: str, sharing_elt: etree.Element): """Convert a sharing XML element into a dictionary. :param str sharing_id: Sharing ID. :type sharing_elt: lxml.etree.Element :param sharing_elt: Sharing XML element. :rtype: dict """ namespace = RELAXNG_CIOWAREHOUSE2['namespace'] return { 'sharing_id': sharing_id, 'creation': sharing_elt.get('created'), 'message': sharing_elt.findtext(f'{{{namespace}}}message'), 'password': sharing_elt.findtext(f'{{{namespace}}}password'), 'expiration': sharing_elt.findtext(f'{{{namespace}}}expiration') }
# -------------------------------------------------------------------------
[docs] @classmethod def record_format(cls, record: dict) -> str | None: """Check and possibly correct a record before inserting it in the database. :param dict record: Dictionary of values to check. :rtype: ``None`` or :class:`pyramid.i18n.TranslationString` :return: ``None`` or error message. """ for k in [i for i in record if record[i] is None]: del record[k] # Sharing ID if not record.get('sharing_id'): return _('Sharing without ID.') # Password password = record.get('password') if password and not password.startswith('$'): record['password'] = hashpw( # yapf: disable password.encode('utf8'), gensalt()).decode('utf8') # Creation if 'creation' in record and \ not isinstance(record['creation'], datetime): record['creation'] = datetime.strptime( record['creation'], '%Y-%m-%dT%H:%M:%S') # Expiration if 'expiration' in record and \ not isinstance(record['expiration'], date): record['expiration'] = datetime.strptime( record['expiration'], '%Y-%m-%d').date() return None
# -------------------------------------------------------------------------
[docs] def db2xml(self, dbsession: Session | None = None): """Serialize a sharing to a XML representation. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: (optional) SQLAlchemy session. :rtype: lxml.etree.Element """ # pylint: disable = unused-argument if not self.files: return None sharing_elt = etree.Element('sharing') sharing_elt.set('id', self.sharing_id) sharing_elt.set('created', self.creation.isoformat().partition('.')[0]) if self.message: etree.SubElement(sharing_elt, 'message').text = self.message if self.password: etree.SubElement(sharing_elt, 'password').text = self.password if self.expiration: etree.SubElement(sharing_elt, 'expiration').text = \ self.expiration.isoformat() for dbfile in self.files: etree.SubElement(sharing_elt, 'ciopath').text = dbfile.ciopath return sharing_elt
# -------------------------------------------------------------------------
[docs] @classmethod def delete(cls, request: Request, sharing_id: str) -> bool: """Delete a sharing. :type request: pyramid.request.Request :param request: Current request. :param str sharing_id: ID of the sharing to delete. :rtype: bool """ dbsharing = request.dbsession.query(cls).filter_by( sharing_id=sharing_id).first() if dbsharing is None: return False # Find files ciopaths_by_warehouse: dict[str, list[CioPath]] = {} for dbfile in dbsharing.files: ciopath = CioPath.from_str(dbfile.ciopath) if ciopath.wid: if ciopath.wid not in ciopaths_by_warehouse: ciopaths_by_warehouse[ciopath.wid] = [] ciopaths_by_warehouse[ciopath.wid].append(ciopath) request.dbsession.delete(dbsharing) # Update information files, commit and index ciowarehouse2 = request.registry['modules']['ciowarehouse2'] for warehouse_id, ciopaths in ciopaths_by_warehouse.items(): warehouse = ciowarehouse2.warehouse(request, warehouse_id) if warehouse is not None: for ciopath in ciopaths: root_elt = info_get_root(warehouse.root, ciopath) info_remove_sharing(root_elt, sharing_id) info_save(root_elt, warehouse.root, ciopath, request) warehouse.vcs.add() warehouse.commit_and_refresh( request, ciopaths, _('Sharing removal'), force=True) return True
# -------------------------------------------------------------------------
[docs] @classmethod def purge_expired(cls, request: Request, dbsession: Session): """Purge expired sharings. :type request: pyramid.request.Request :param request: Current request. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. """ if 'sharings' in request.registry and request.registry[ 'sharings'] > time(): return request.registry['sharings'] = time() + SHARING_CHECK_PERIOD today = date.today() for sharing_id in dbsession.query( DBSharing.sharing_id).filter(cls.expiration < today): cls.delete(request, sharing_id[0])
# =============================================================================
[docs] class DBSharingFile(DBDeclarativeClass): """Class to link sharing token with their files (one-to-many).""" # pylint: disable = too-few-public-methods __tablename__ = 'wrh2_sharings_files' __table_args__ = ( Index('warehouse_id', 'ciopath'), { 'mysql_engine': 'InnoDB' }) sharing_id = Column( String(48), ForeignKey('wrh2_sharings.sharing_id', ondelete='CASCADE'), primary_key=True) ciopath = Column(String(255), primary_key=True)