"""Warehouse file class."""
from __future__ import annotations
from os import makedirs
from os.path import join, dirname
from datetime import datetime
from webob.compat import cgi_FieldStorage as FieldStorage
from pyramid.request import Request
from chrysalio.lib.utils import make_digest
from ..models.dbsharing import DBSharing, DBSharingFile
from .ciopath import CioPath
from .warehouse import Warehouse
from .file_paging import FilePaging
from .file_info import info_get_root, info_save, info_add_sharing
from .i18n import _
# =============================================================================
[docs]
class WFile():
"""Class to operate on files in a warehouse or in a result of search.
:type request: pyramid.request.Request
:param request:
Current request.
"""
# -------------------------------------------------------------------------
def __init__(self, request: Request):
"""Constructor method."""
self._request = request
# -------------------------------------------------------------------------
[docs]
def upload(self, warehouse: Warehouse, directory_ciopath: CioPath):
"""Import several files in the current directory.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object if exists.
:type directory_ciopath: .lib.ciopath.CioPath
:param directory_ciopath:
`CioPath` of the current directory .
"""
# Locked warehouse
locked, locker = warehouse.lock(
self._request.session['user']['name'], directory_ciopath)
if not locked:
self._request.session.flash(
_('The directory is locked by "${n}"!', {'n': locker}),
'alert')
return
# Unauthorized warehouse
ciowarehouse2 = self._request.registry['modules']['ciowarehouse2']
if not ciowarehouse2.warehouse_file_writer(self._request, warehouse):
self._request.session.flash(
_('You cannot import files here.'), 'alert')
warehouse.unlock(directory_ciopath)
return
# Retrieve files
ciopaths = set()
for field_storage in self._request.POST.getall('file'):
if isinstance(field_storage, FieldStorage):
ciopath = self._upload(
warehouse, directory_ciopath, field_storage)
if ciopath:
ciopaths.add(ciopath)
if not ciopaths:
warehouse.unlock(directory_ciopath)
return
# Add, commit and refresh
warehouse.vcs.pull()
for ciopath in ciopaths:
warehouse.vcs.add(ciopath)
warehouse.unlock(directory_ciopath)
warehouse.commit_and_refresh(self._request, ciopaths, _('Upload'))
if not directory_ciopath.is_root():
directory_ciopath.touch(warehouse.root)
warehouse.refresh(self._request, [directory_ciopath], now=True)
if not self._request.session.peek_flash('alert'):
self._request.session.flash(
_('Upload successfully complete!'), 'refresh')
# -------------------------------------------------------------------------
@classmethod
def _upload(
cls, warehouse: Warehouse, directory_ciopath: CioPath,
field_storage: FieldStorage) -> CioPath | None:
"""Import one file in the current directory.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object.
:type directory_ciopath: .lib.ciopath.CioPath
:param directory_ciopath:
`CioPath` of the current directory .
:param field_storage:
Object containing the file coming from the Web.
:rtype: str
:return:
Name of imported file or ``None``.
"""
filename = warehouse.file_normalize(field_storage.filename)
if filename is None:
field_storage.file.close()
return None
ciopath = CioPath(
directory_ciopath.wid, join(directory_ciopath.path, filename))
filename = ciopath.absolute_path(warehouse.root)
if filename is not None:
makedirs(dirname(filename), exist_ok=True)
with open(filename, 'wb') as hdl:
hdl.write(field_storage.file.read())
field_storage.file.close()
return ciopath
# -------------------------------------------------------------------------
[docs]
def remove(self, targets: tuple[str]):
"""Remove files.
:param tuple targets:
List of `CioType`ǁ`CioPath`.
"""
# Find files to remove
ciopaths_by_warehouse: dict[str | None, list[CioPath]] = {}
for target in targets:
ciopath = CioPath.from_str(target.partition('ǁ')[2])
if ciopath.wid not in ciopaths_by_warehouse:
ciopaths_by_warehouse[ciopath.wid] = [ciopath]
else:
ciopaths_by_warehouse[ciopath.wid].append(ciopath)
# Browse warehouses and remove files
ciowarehouse2 = self._request.registry['modules']['ciowarehouse2']
for warehouse_id, ciopaths in ciopaths_by_warehouse.items():
warehouse = ciowarehouse2.warehouse(self._request, warehouse_id)
if warehouse is None:
continue
if not ciowarehouse2.warehouse_file_writer( # yapf: disable
self._request, warehouse):
self._request.session.flash(_( # yapf: disable
'You cannot remove a file from "${l}".',
{'l': warehouse.label(self._request)}), 'alert')
return
warehouse.vcs.pull()
need_refresh = set()
for ciopath in ciopaths:
warehouse.vcs.remove(ciopath)
warehouse.unlock(ciopath)
need_refresh.add(ciopath.parent().touch(warehouse.root))
err = warehouse.vcs.commit(
self._request.localizer.translate(_('Deletion of the file')),
self._request.session['user']['name'],
self._request.session['user']['email'])
if err: # pragma: nocover
self._request.session.flash(err, 'alert')
ciowarehouse2.backend.delete(warehouse, [k.path for k in ciopaths])
warehouse.refresh(self._request, need_refresh, now=True)
# -------------------------------------------------------------------------
[docs]
def new_file(
self, warehouse: Warehouse | None, directory_ciopath: CioPath,
action: str, values: dict) -> CioPath | None:
"""Create a new file or a new directory.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object.
:param form:
Current form.
:type directory_ciopath: .lib.ciopath.CioPath
:param directory_ciopath:
`CioPath` of the current directory, possibly an empty one.
:param dict values:
Values needed by the seeder.
:rtype: .lib.ciopath.CioPath
"""
# Get seeder and check seed
if warehouse is None or directory_ciopath.wid != warehouse.uid:
return None
seed_id, seeder_id = action[4:].partition('ǁ')[0::2]
seeder = self._request.registry['seeders'].get(seeder_id)
if seed_id not in self._request.registry['seeds'] or seeder is None:
return None
# Create the file
return seeder.new_file(
self._request, warehouse, directory_ciopath, values, seed_id)
# -------------------------------------------------------------------------
[docs]
def sharing_create(
self, targets: tuple[str], values: dict, paging: FilePaging):
"""Save a sharing.
:param tuple targets:
Tuple of `CioType`ǁ`CioPath`.
:param dict values:
Values such as ``message``, ``password1`` and ``expiration``.
:type paging: .lib.file_paging.FilePaging
:param paging:
Current `FilePaging`.
"""
ciowarehouse2 = self._request.registry['modules']['ciowarehouse2']
panel = self._request.registry['panels']['filepanel']
sharing_id, ciopaths_by_warehouse = self._sharing_updatedb(
targets, values)
for warehouse_id, ciopaths in ciopaths_by_warehouse.items():
warehouse = ciowarehouse2.warehouse(self._request, warehouse_id)
if warehouse is None:
continue
panel_ciopath = panel.ciopath(self._request)
for ciopath in ciopaths:
# Update info file
root_elt = info_get_root(warehouse.root, ciopath)
info_add_sharing(root_elt, sharing_id)
info_save(root_elt, warehouse.root, ciopath, self._request)
# Update file paging
pfile = paging.get_pfile(ciopath)
if pfile is not None:
pfile['shared'] = True
# Update file panel
if ciopath == panel_ciopath:
panel_values = panel.values(self._request)
panel_values['sharings'].append(( # yapf: disable
sharing_id, values.get('message'),
bool(values.get('password1')),
values.get('expiration')))
panel.set_values(self._request, panel_values)
warehouse.vcs.add()
warehouse.commit_and_refresh(
self._request, ciopaths, _('Sharing creation'), force=True)
self._request.session.flash(
_(
'The sharing link is: ${l}', {
'l':
self._request.route_url(
'sharing_download', sharing_id=sharing_id)
}), 'persistent')
# -------------------------------------------------------------------------
def _sharing_updatedb(
self, targets: tuple[str],
values: dict) -> tuple[str, dict[str, list[CioPath]]]:
"""Update the sharing SQL table.
:param tuple targets:
Tuple of `CioType`ǁ`CioPath`.
:param dict values:
Values such as ``message``, ``password1`` and ``expiration``.
:rtype: tuple
"""
# Create the sharing record
sharing_id = make_digest(datetime.now().isoformat())
dbsharing = DBSharing(
sharing_id=sharing_id,
message=values.get('message'),
expiration=values.get('expiration'))
dbsharing.set_password(values.get('password1'))
# Add files
ciopaths_by_warehouse: dict[str, list[CioPath]] = {}
for target in targets:
ciopath = CioPath.from_str(target.partition('ǁ')[2])
if ciopath.wid is not None:
if ciopath.wid not in ciopaths_by_warehouse:
ciopaths_by_warehouse[ciopath.wid] = []
ciopaths_by_warehouse[ciopath.wid].append(ciopath)
dbsharing.files.append(DBSharingFile(ciopath=str(ciopath)))
# Upadte database
self._request.dbsession.add(dbsharing)
return sharing_id, ciopaths_by_warehouse