Source code for ciowarehouse2.lib.backend

"""Backend management."""

from __future__ import annotations
from logging import getLogger
from os.path import dirname, join
from collections import namedtuple

from grpc import insecure_channel, RpcError

from chrysalio.lib.config import settings_get_namespace
from .i18n import _
from .warehouse import Warehouse
from ..proto.ciotantivy_pb2_grpc import CioTantivyStub
# pylint: disable = no-name-in-module
from ..proto.ciotantivy_pb2 import PingRequest, PingOptions
from ..proto.ciotantivy_pb2 import ResetRequest, ResetOptions
from ..proto.ciotantivy_pb2 import InspectRequest, InspectOptions, InspectReply
from ..proto.ciotantivy_pb2 import IndexRequest, IndexOptions
from ..proto.ciotantivy_pb2 import DeleteRequest, DeleteOptions
from ..proto.ciotantivy_pb2 import SearchRequest, SearchOptions, SearchReply
from ..proto.ciotantivy_pb2 import SuggestRequest, SuggestOptions
# pylint: enable = no-name-in-module

LOG = getLogger(__name__)
BACKEND_NAMESPACE = 'backend'
DEFAULT_BACKEND_HOST = 'localhost'
DEFAULT_BACKEND_PORT = '6540'
DEFAULT_SEARCH_LIMIT = 10
DEFAULT_SUGGEST_LIMIT = 12


# =============================================================================
[docs] class CioBackend(): """Class to manage backend calls. :type settings: pyramid.registry.Registry.settings :param settings: Settings object. :param str uid: UID of the current site (ex. `ciodemosite`). """ # ------------------------------------------------------------------------- def __init__(self, settings: dict, uid: str): """Constructor method.""" self.uid = uid backend = settings_get_namespace(settings, BACKEND_NAMESPACE) if not backend.get('host'): backend['host'] = DEFAULT_BACKEND_HOST if not backend.get('port'): backend['port'] = DEFAULT_BACKEND_PORT self._target = f"{backend['host']}:{backend['port']}" self._test = backend.get('port') == "0" # -------------------------------------------------------------------------
[docs] def ping(self, pending: bool = False, running: bool = False) -> str | None: """Ping the backend server. :param bool pending: (optional) Return pending state. :param bool running: (optional) Return running state sorted by warehouse. :rtype: pyramid.i18n.TranslationString """ if self._test: return None with insecure_channel(self._target) as channel: options = PingOptions(pending=pending, running=running) stub = CioTantivyStub(channel) try: reply = stub.Ping( PingRequest(tenant=self.uid, options=options)) except RpcError: return _('Backend ${target} is not active for "${tid}"', { 'target': self._target, 'tid': self.uid}) # yapf: disable if reply.error: return _('Backend error: ${e}', {'e': reply.info}) return None
# -------------------------------------------------------------------------
[docs] def inspect( self, segments: bool = False) -> tuple[InspectReply | None, str | None]: """Inspect the index. :rtype: tuple :return: A tuple such as ``(reply, error)``. """ if self._test: test_dir = join(dirname(__file__), '..', '..', '__Tests') homes = namedtuple('Homes', 'thumbnails icons')( # type: ignore thumbnails=join(test_dir, 'Thumbnails'), icons=join(test_dir, 'Icons')) return namedtuple('Reply', 'homes fields')( # type: ignore homes=homes, fields=()), None # yapf: disable with insecure_channel(self._target) as channel: stub = CioTantivyStub(channel) options = InspectOptions(segments=segments) try: reply = stub.Inspect( InspectRequest(tenant=self.uid, options=options)) except RpcError as err: # pylint: disable = no-member return None, err.details() if reply.error: return None, f'{reply.info} [{reply.error}]' return reply, None
# -------------------------------------------------------------------------
[docs] def reset(self, not_thumbs: bool = False) -> str | None: """Reset the index and remove thumbnails. :param bool not_thumbs: (optional) Do not remove thumbnails. :rtype: pyramid.i18n.TranslationString """ if self._test: return None with insecure_channel(self._target) as channel: options = ResetOptions(not_thumbs=not_thumbs) stub = CioTantivyStub(channel) try: reply = stub.Reset( ResetRequest(tenant=self.uid, options=options)) except RpcError as err: # pylint: disable = no-member return err.details() if reply.error: return f'{reply.info} [{reply.error}]' return None
# -------------------------------------------------------------------------
[docs] def index( self, warehouse: Warehouse, paths: list | tuple | None = None, force: bool = False, skip_thumbs: bool = False, no_thumbs: bool = False) -> str | None: """Index paths inside a given warehouse. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse. :param list paths: (optional) Relative paths inside the warehouse to index. :param bool force: (optional) Force indexing. :param bool no_thumbs: (optional) Skip thumbnails creation. :rtype: pyramid.i18n.TranslationString """ if self._test: return None with insecure_channel(self._target) as channel: stub = CioTantivyStub(channel) options = IndexOptions( skip_thumbs=skip_thumbs, no_thumbs=no_thumbs, force=force) try: reply = stub.Index( IndexRequest( tenant=self.uid, root=warehouse.root, paths=paths or (), options=options)) except RpcError as err: # pylint: disable = no-member return err.details() except TypeError as err: return str(err) if reply.error: return f'{reply.info} [{reply.error}]' return None
# -------------------------------------------------------------------------
[docs] def delete( self, warehouse: Warehouse, paths: list | tuple | None = None, not_thumbs: bool = False) -> str | None: """Delete index entries of a given warehouse. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse. :param list paths: (optional) Relative paths inside the warehouse. :param bool not_thumbs: (optional) Do not remove thumbnails. :rtype: pyramid.i18n.TranslationString """ if self._test: return None with insecure_channel(self._target) as channel: options = DeleteOptions(not_thumbs=not_thumbs) stub = CioTantivyStub(channel) try: reply = stub.Delete( DeleteRequest( tenant=self.uid, warehouse=warehouse.uid, paths=paths or (), options=options)) except RpcError as err: # pylint: disable = no-member return err.details() except TypeError as err: return str(err) if reply.error: return f'{reply.info} [{reply.error}]' return None
# -------------------------------------------------------------------------
[docs] def search( self, query: str, limit: int = DEFAULT_SEARCH_LIMIT, offset: int = 0, sort_by: str | None = None, reverse: bool = False, snippets: int = 0) -> tuple[SearchReply | None, str | None]: """Search files. :param str query: Query. :param int limit: (default = 10) Maximum of returned hits. :param int offset: (default = 0) Offset of the range of returned hits. :param str sort_by: (default = None) Name of the field for sorting the result. :param bool reverse: (default = False) Reverse the sort. :param int snippets: (default = 0) Length of the context to show. :rtype: tuple :return: A tuple such as ``(reply, error)`` """ # pylint: disable = too-many-arguments, too-many-positional-arguments if self._test or not query: return namedtuple( # type: ignore 'Reply', 'offset total hits snippets')( offset=0, total=0, hits=(), snippets=()), None # yapf: disable with insecure_channel(self._target) as channel: stub = CioTantivyStub(channel) options = SearchOptions( limit=limit, offset=offset, sort_by=sort_by, reverse=reverse, snippets=snippets) try: reply = stub.Search( SearchRequest( tenant=self.uid, query=query, options=options)) except RpcError as err: # pylint: disable = no-member return None, err.details() if reply.error: return None, f'{reply.info} [{reply.error}]' return reply, None
# -------------------------------------------------------------------------
[docs] def suggest( self, bootstrap: str, scope_query: str | None = None, no_file_names: bool = False, limit: int = DEFAULT_SUGGEST_LIMIT ) -> tuple[list | None, str | None]: """Suggest queries. :param str bootstrap: Terms to initiate the suggestion. :param str scope_query: Query to limit the scope of suggestions. :param bool no_file_names: (default = False) Exclude file names from suggestions. :param int limit: (default = 10) Maximum of returned suggestions. :rtype: tuple :return: A tuple such as ``(suggestions, error)`` """ if self._test or not bootstrap: return [], None with insecure_channel(self._target) as channel: stub = CioTantivyStub(channel) options = SuggestOptions( scope=scope_query, no_file_names=no_file_names, limit=limit) try: reply = stub.Suggest( SuggestRequest( tenant=self.uid, bootstrap=bootstrap, options=options)) except RpcError as err: # pylint: disable = no-member return None, err.details() if reply.error: return None, f'{reply.info} [{reply.error}]' return reply.suggestions, None