Source code for ciowarehouse2.lib.wfile

"""Warehouse file class."""

from __future__ import annotations
from os import makedirs
from os.path import join, dirname
from datetime import datetime

from webob.compat import cgi_FieldStorage as FieldStorage
from pyramid.request import Request

from chrysalio.lib.utils import make_digest
from ..models.dbsharing import DBSharing, DBSharingFile
from .ciopath import CioPath
from .warehouse import Warehouse
from .file_paging import FilePaging
from .file_info import info_get_root, info_save, info_add_sharing
from .i18n import _


# =============================================================================
[docs] class WFile(): """Class to operate on files in a warehouse or in a result of search. :type request: pyramid.request.Request :param request: Current request. """ # ------------------------------------------------------------------------- def __init__(self, request: Request): """Constructor method.""" self._request = request # -------------------------------------------------------------------------
[docs] def upload(self, warehouse: Warehouse, directory_ciopath: CioPath): """Import several files in the current directory. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object if exists. :type directory_ciopath: .lib.ciopath.CioPath :param directory_ciopath: `CioPath` of the current directory . """ # Locked warehouse locked, locker = warehouse.lock( self._request.session['user']['name'], directory_ciopath) if not locked: self._request.session.flash( _('The directory is locked by "${n}"!', {'n': locker}), 'alert') return # Unauthorized warehouse ciowarehouse2 = self._request.registry['modules']['ciowarehouse2'] if not ciowarehouse2.warehouse_file_writer(self._request, warehouse): self._request.session.flash( _('You cannot import files here.'), 'alert') warehouse.unlock(directory_ciopath) return # Retrieve files ciopaths = set() for field_storage in self._request.POST.getall('file'): if isinstance(field_storage, FieldStorage): ciopath = self._upload( warehouse, directory_ciopath, field_storage) if ciopath: ciopaths.add(ciopath) if not ciopaths: warehouse.unlock(directory_ciopath) return # Add, commit and refresh warehouse.vcs.pull() for ciopath in ciopaths: warehouse.vcs.add(ciopath) warehouse.unlock(directory_ciopath) warehouse.commit_and_refresh(self._request, ciopaths, _('Upload')) if not directory_ciopath.is_root(): directory_ciopath.touch(warehouse.root) warehouse.refresh(self._request, [directory_ciopath], now=True) if not self._request.session.peek_flash('alert'): self._request.session.flash( _('Upload successfully complete!'), 'refresh')
# ------------------------------------------------------------------------- @classmethod def _upload( cls, warehouse: Warehouse, directory_ciopath: CioPath, field_storage: FieldStorage) -> CioPath | None: """Import one file in the current directory. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object. :type directory_ciopath: .lib.ciopath.CioPath :param directory_ciopath: `CioPath` of the current directory . :param field_storage: Object containing the file coming from the Web. :rtype: str :return: Name of imported file or ``None``. """ filename = warehouse.file_normalize(field_storage.filename) if filename is None: field_storage.file.close() return None ciopath = CioPath( directory_ciopath.wid, join(directory_ciopath.path, filename)) filename = ciopath.absolute_path(warehouse.root) if filename is not None: makedirs(dirname(filename), exist_ok=True) with open(filename, 'wb') as hdl: hdl.write(field_storage.file.read()) field_storage.file.close() return ciopath # -------------------------------------------------------------------------
[docs] def remove(self, targets: tuple[str]): """Remove files. :param tuple targets: List of `CioType`ǁ`CioPath`. """ # Find files to remove ciopaths_by_warehouse: dict[str | None, list[CioPath]] = {} for target in targets: ciopath = CioPath.from_str(target.partition('ǁ')[2]) if ciopath.wid not in ciopaths_by_warehouse: ciopaths_by_warehouse[ciopath.wid] = [ciopath] else: ciopaths_by_warehouse[ciopath.wid].append(ciopath) # Browse warehouses and remove files ciowarehouse2 = self._request.registry['modules']['ciowarehouse2'] for warehouse_id, ciopaths in ciopaths_by_warehouse.items(): warehouse = ciowarehouse2.warehouse(self._request, warehouse_id) if warehouse is None: continue if not ciowarehouse2.warehouse_file_writer( # yapf: disable self._request, warehouse): self._request.session.flash(_( # yapf: disable 'You cannot remove a file from "${l}".', {'l': warehouse.label(self._request)}), 'alert') return warehouse.vcs.pull() need_refresh = set() for ciopath in ciopaths: warehouse.vcs.remove(ciopath) warehouse.unlock(ciopath) need_refresh.add(ciopath.parent().touch(warehouse.root)) err = warehouse.vcs.commit( self._request.localizer.translate(_('Deletion of the file')), self._request.session['user']['name'], self._request.session['user']['email']) if err: # pragma: nocover self._request.session.flash(err, 'alert') ciowarehouse2.backend.delete(warehouse, [k.path for k in ciopaths]) warehouse.refresh(self._request, need_refresh, now=True)
# -------------------------------------------------------------------------
[docs] def new_file( self, warehouse: Warehouse | None, directory_ciopath: CioPath, action: str, values: dict) -> CioPath | None: """Create a new file or a new directory. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object. :param form: Current form. :type directory_ciopath: .lib.ciopath.CioPath :param directory_ciopath: `CioPath` of the current directory, possibly an empty one. :param dict values: Values needed by the seeder. :rtype: .lib.ciopath.CioPath """ # Get seeder and check seed if warehouse is None or directory_ciopath.wid != warehouse.uid: return None seed_id, seeder_id = action[4:].partition('ǁ')[0::2] seeder = self._request.registry['seeders'].get(seeder_id) if seed_id not in self._request.registry['seeds'] or seeder is None: return None # Create the file return seeder.new_file( self._request, warehouse, directory_ciopath, values, seed_id)
# -------------------------------------------------------------------------
[docs] def sharing_create( self, targets: tuple[str], values: dict, paging: FilePaging): """Save a sharing. :param tuple targets: Tuple of `CioType`ǁ`CioPath`. :param dict values: Values such as ``message``, ``password1`` and ``expiration``. :type paging: .lib.file_paging.FilePaging :param paging: Current `FilePaging`. """ ciowarehouse2 = self._request.registry['modules']['ciowarehouse2'] panel = self._request.registry['panels']['filepanel'] sharing_id, ciopaths_by_warehouse = self._sharing_updatedb( targets, values) for warehouse_id, ciopaths in ciopaths_by_warehouse.items(): warehouse = ciowarehouse2.warehouse(self._request, warehouse_id) if warehouse is None: continue panel_ciopath = panel.ciopath(self._request) for ciopath in ciopaths: # Update info file root_elt = info_get_root(warehouse.root, ciopath) info_add_sharing(root_elt, sharing_id) info_save(root_elt, warehouse.root, ciopath, self._request) # Update file paging pfile = paging.get_pfile(ciopath) if pfile is not None: pfile['shared'] = True # Update file panel if ciopath == panel_ciopath: panel_values = panel.values(self._request) panel_values['sharings'].append(( # yapf: disable sharing_id, values.get('message'), bool(values.get('password1')), values.get('expiration'))) panel.set_values(self._request, panel_values) warehouse.vcs.add() warehouse.commit_and_refresh( self._request, ciopaths, _('Sharing creation'), force=True) self._request.session.flash( _( 'The sharing link is: ${l}', { 'l': self._request.route_url( 'sharing_download', sharing_id=sharing_id) }), 'persistent')
# ------------------------------------------------------------------------- def _sharing_updatedb( self, targets: tuple[str], values: dict) -> tuple[str, dict[str, list[CioPath]]]: """Update the sharing SQL table. :param tuple targets: Tuple of `CioType`ǁ`CioPath`. :param dict values: Values such as ``message``, ``password1`` and ``expiration``. :rtype: tuple """ # Create the sharing record sharing_id = make_digest(datetime.now().isoformat()) dbsharing = DBSharing( sharing_id=sharing_id, message=values.get('message'), expiration=values.get('expiration')) dbsharing.set_password(values.get('password1')) # Add files ciopaths_by_warehouse: dict[str, list[CioPath]] = {} for target in targets: ciopath = CioPath.from_str(target.partition('ǁ')[2]) if ciopath.wid is not None: if ciopath.wid not in ciopaths_by_warehouse: ciopaths_by_warehouse[ciopath.wid] = [] ciopaths_by_warehouse[ciopath.wid].append(ciopath) dbsharing.files.append(DBSharingFile(ciopath=str(ciopath))) # Upadte database self._request.dbsession.add(dbsharing) return sharing_id, ciopaths_by_warehouse