Source code for ciowarehouse.views.sharing

"""View callables to manage shared files."""

from os.path import join, exists, basename, dirname, normpath, isdir

from colander import Mapping, SchemaNode, String

from pyramid.view import view_config
from pyramid.httpexceptions import HTTPNotFound
from pyramid.security import NO_PERMISSION_REQUIRED

from chrysalio.lib.utils import mimetype_get
from chrysalio.lib.form import get_action, Form
from chrysalio.includes.themes import theme_static_prefix
from ..lib.i18n import _
from ..lib.utils import HERE, THUMBNAILS_DIR, THUMBNAIL_LARGE, mimetype_url
from ..lib.utils import file2response, files2response, make_file_id
from ..models.dbsharing import DBSharing, DBSharingFile


# =============================================================================
[docs] class SharingView(object): """Class to manage shared files. :type request: pyramid.request.Request :param request: Current request. """ # ------------------------------------------------------------------------- def __init__(self, request): """Constructor method.""" self._request = request # -------------------------------------------------------------------------
[docs] @view_config( route_name='sharing_download', renderer='ciowarehouse:Templates/sharing_download.pt', permission=NO_PERMISSION_REQUIRED) @view_config( route_name='sharing_download_one', renderer='ciowarehouse:Templates/sharing_download.pt', permission=NO_PERMISSION_REQUIRED) def download(self): """Download shared files.""" self._request.registry['panels']['fileinfo'].close(self._request) DBSharing.purge_expired(self._request, self._request.dbsession) dbsharing = self._request.dbsession.query(DBSharing).filter_by( sharing_id=self._request.matchdict['sharing_id']).first() if dbsharing is None: raise HTTPNotFound(comment=_('This sharing does not exist!')) # Password if dbsharing.password and 'sharings' not in self._request.session: self._request.session['sharings'] = [] if dbsharing.password and \ dbsharing.sharing_id not in self._request.session['sharings']: schema = SchemaNode(Mapping()) schema.add(SchemaNode(String(), name='password')) form = Form(self._request, schema) if not form.validate() or \ not dbsharing.check_password(form.values['password']): if form.values.get('password'): form.set_error('password', _('Incorrect password')) return {'form': form, 'authorized': False} self._request.session['sharings'].append(dbsharing.sharing_id) # Find files files, previews = self._retrieve_files(dbsharing) count = len(files) if not count: DBSharing.delete(self._request, dbsharing.sharing_id) raise HTTPNotFound(comment=_('No more files in this sharing.')) # Action action = get_action(self._request)[0] if self._request.matchdict.get('file_id'): response = files2response(self._request, ( files.get(self._request.matchdict['file_id']),)) if response: return response elif action[:4] == 'dnl!': response = files2response(self._request, files.values()) if response: return response if action and action[3] == '!': action = '' button_title = _('Download ${n} files', {'n': count}) \ if count > 1 else _('Download') self._request.breadcrumbs(_('File Download')) return { 'form': Form(self._request), 'authorized': True, 'message': dbsharing.message, 'previews': previews, 'button_title': button_title, 'sharing_id': dbsharing.sharing_id}
# -------------------------------------------------------------------------
[docs] @view_config( route_name='sharing_preview', permission=NO_PERMISSION_REQUIRED) def preview(self): """Send a file preview.""" warehouse_id = self._request.matchdict.get('warehouse_id') dbfile = self._request.dbsession.query(DBSharingFile)\ .filter_by( sharing_id=self._request.matchdict['sharing_id'], file_id=make_file_id(join( warehouse_id, *self._request.matchdict['path']))).first() if dbfile is None: return file2response(self._request, join( dirname(__file__), '..', 'Static', 'Images', 'notfound.jpg'), 'noftound.jpg') root = self._request.registry['modules'][ 'ciowarehouse'].warehouse_root(self._request, warehouse_id) if root is None: return file2response(self._request, join( dirname(__file__), '..', 'Static', 'Images', 'notfound.jpg'), 'noftound.jpg') abs_file = join(root, dbfile.directory, dbfile.file_name) abs_thumb = join( root, THUMBNAILS_DIR, dbfile.directory, dbfile.file_name, HERE if isdir(abs_file) else '', '{0}.jpg'.format(THUMBNAIL_LARGE)) if not exists(abs_thumb): abs_thumb = join( root, THUMBNAILS_DIR, dbfile.directory, dbfile.file_name, '{0}.png'.format(THUMBNAIL_LARGE)) if not exists(abs_thumb): return file2response(self._request, join( dirname(__file__), '..', 'Static', 'Images', 'notfound.jpg'), 'noftound.jpg') return files2response(self._request, (abs_thumb,))
# ------------------------------------------------------------------------- def _retrieve_files(self, dbsharing): """Retrieve the list of shared files. :type dbsharing: .models.dbsharing.DBSharing :param dbsharing: SqlAlchemy object for this sharing. :rtype: tuple :return: a tuple such as ``(file_dict, preview_list)`` where ``preview_list`` is a list of tuples such as ``(sharing_preview, filename, file_id)``. """ files = {} previews = [] theme = theme_static_prefix(self._request) ciowarehouse = self._request.registry['modules']['ciowarehouse'] for dbsharing_file in dbsharing.files: root = ciowarehouse.warehouse_root( self._request, dbsharing_file.warehouse_id) if root is None: continue path = normpath(join( dbsharing_file.directory, dbsharing_file.file_name)) abs_path = join(root, path) if not exists(abs_path): continue file_id = make_file_id(join(dbsharing_file.warehouse_id, path)) files[file_id] = abs_path abs_thumb = join( root, THUMBNAILS_DIR, path, HERE if isdir(abs_path) else '', '{0}.jpg'.format(THUMBNAIL_LARGE)) if exists(abs_thumb): previews.append(( self._request.route_path( 'sharing_preview', sharing_id=dbsharing.sharing_id, warehouse_id=dbsharing_file.warehouse_id, path=path), basename(path), file_id)) elif exists('{0}.png'.format(abs_thumb[:-4])): previews.append(( self._request.route_path( 'sharing_preview', sharing_id=dbsharing.sharing_id, warehouse_id=dbsharing_file.warehouse_id, path=path), basename(path), file_id)) else: previews.append(( mimetype_url(theme, 'normal', mimetype_get(abs_path)[1]), basename(path), file_id)) return files, previews