Source code for ciowarehouse2.lib.vcs_git

"""Class for warehouses with Git as VCS."""

from __future__ import annotations
from os import remove
from os.path import abspath, join, exists, splitext, relpath, isdir
from os.path import getmtime
from collections import namedtuple
from time import time
from re import compile as re_compile

from git.repo import Repo
from git.exc import GitCommandError
from git import Actor

from chrysalio.lib.utils import full_url  # , copy_content
from ..models.dbwarehouse import LOCK_TTL, DBWarehouse
from .utils import IGNORED_PREFIX, IGNORED_SUFFIX
from .ciopath import LOCAL_DIR, CioPath
from .vcs_none import VcsNone
from .i18n import _

GIT_LOCK_FILE = 'index.lock'
GIT_IGNORED_PATTERN = \
    f'\\{IGNORED_PREFIX}[^\\{IGNORED_SUFFIX}]+\\{IGNORED_SUFFIX}'


# =============================================================================
[docs] class VcsGit(): """Class to manage warehouses with Git as VCS. :param str uid: ID of this VCS i.e. ID of the corresponding warehouse. :param str root: Absolute path to the warehouse directory. :type dbwarehouse: .models.dbwarehouse.DBWarehouse :param dbwarehouse: (optional) DBWarehouse defining the warehouse managed by this VCS. :param str password: (optional) Clear password for clone/pull access. """ engine = 'git' # ------------------------------------------------------------------------- def __init__( self, uid: str, root: str, dbwarehouse: DBWarehouse, password: str | None = None): """Constructor method.""" self.uid = uid self.root = root self._url = str(dbwarehouse.vcs_url) if dbwarehouse.vcs_url else None self._user = str(dbwarehouse.vcs_user) \ if dbwarehouse.vcs_user else None self._password = password self._lock_ttl = int(dbwarehouse.lock_ttl) \ if dbwarehouse.lock_ttl else LOCK_TTL if self._url is not None and not self._url.startswith('http'): self._url = abspath(self._url) self._ignore = re_compile(GIT_IGNORED_PATTERN) # -------------------------------------------------------------------------
[docs] def init(self): """Initialize a local Git repository. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None`` if it succeeds. """ try: Repo.init(self.root) except (IOError, OSError) as error: return str(error) # pragma: nocover except GitCommandError as error: return ' '.join(error.stderr.split('\n')).strip() self._exclude_files() return None
# -------------------------------------------------------------------------
[docs] def is_dirty(self) -> bool: """Return ``True`` if the repository data has been modified. :rtype: bool """ try: repo = Repo.init(self.root) return repo.is_dirty() or bool(repo.untracked_files) except (IOError, OSError, GitCommandError): return False
# -------------------------------------------------------------------------
[docs] def clone(self) -> str | None: """Create a copy of an existing repository. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` """ if self._url is None: return self.init() fullurl = full_url(self._url, self._user, self._password) try: repo = Repo.clone_from(fullurl, self.root) except (IOError, OSError) as error: return str(error).replace(fullurl, self._url) # pragma: nocover except GitCommandError as error: return ' '.join(error.stderr.split('\n')).strip().replace( fullurl, self._url) repo.remotes.origin.set_url(self._url) self._exclude_files() return None
# -------------------------------------------------------------------------
[docs] def pull(self) -> str | None: """Pull from a remote repository or do nothing. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` """ self._fix_lock() if self._url is None: return None fullurl = full_url(self._url, self._user, self._password) try: repo = Repo.init(self.root) repo.remotes.origin.set_url(fullurl) repo.remotes.origin.pull() except (IOError, OSError) as error: return str(error).replace(fullurl, self._url) # pragma: nocover except GitCommandError as error: return ' '.join(error.stderr.split('\n')).strip().replace( fullurl, self._url) finally: repo.remotes.origin.set_url(self._url) return None
# -------------------------------------------------------------------------
[docs] def add(self, ciopath: CioPath | None = None) -> str | None: """Add new files in path ``path``. :type ciopath: .lib.ciopath.CioPath :param ciopath: (optional) `CioPath` of the file to add. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` """ # pylint: disable = too-many-return-statements, too-many-branches if not ciopath: ciopath = CioPath(self.uid) abs_path = ciopath.absolute_path(self.root) if ciopath.wid != self.uid or ciopath.path == '.git' or not abs_path: return _('git add: incorrect path') if self._ignore.search(ciopath.path) is not None \ or not exists(abs_path): return None # Add file repo = Repo.init(self.root) try: if ciopath.is_root(): repo.git.add('--all') else: repo.index.add((ciopath.path, )) except (IOError, OSError) as error: # pragma: nocover return str(error) except GitCommandError as error: # pragma: nocover return ' '.join(error.stderr.split('\n')).strip() if ciopath.is_root(): return None # Add information file abs_info = ciopath.absolute_info(self.root) if abs_info and exists(abs_info): try: repo.index.add((relpath(abs_info, self.root), )) except (IOError, OSError) as error: # pragma: nocover return str(error) except GitCommandError as error: # pragma: nocover return ' '.join(error.stderr.split('\n')).strip() if abs_info and ciopath.is_directory(): abs_info = splitext(abs_info)[0] if abs_info and exists(abs_info): try: repo.index.add((relpath(abs_info, self.root), )) except (IOError, OSError) as error: # pragma: nocover return str(error) except GitCommandError as error: # pragma: nocover return ' '.join(error.stderr.split('\n')).strip() return None
# -------------------------------------------------------------------------
[docs] def remove(self, ciopath: CioPath) -> str | None: """Remove a file or a directory. :type ciopath: .lib.ciopath.CioPath :param ciopath: `CioPath` of the file to remove. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` """ # pylint: disable = too-many-return-statements abs_path = ciopath.absolute_path(self.root) if ciopath.wid != self.uid or not abs_path: return _('git remove: incorrect path') if not exists(abs_path): return None # Remove information file abs_info = ciopath.absolute_info(self.root) repo = Repo.init(self.root) if abs_info and exists(abs_info): try: repo.index.remove( (relpath(abs_info, self.root), ), working_tree=True) except (IOError, OSError) as error: # pragma: nocover return str(error) except GitCommandError: # pragma: nocover return VcsNone(self.uid, self.root).remove(ciopath) if abs_info and ciopath.is_directory(): abs_info = splitext(abs_info)[0] if abs_info and exists(abs_info): try: repo.index.remove( (relpath(abs_info, self.root), ), working_tree=True, r=True) except (IOError, OSError) as error: # pragma: nocover return str(error) except GitCommandError as error: # pragma: nocover return ' '.join(error.stderr.split('\n')).strip() # Remove file if exists(abs_path): try: repo.index.remove( (relpath(abs_path, self.root), ), working_tree=True, r=True) except (IOError, OSError) as error: # pragma: nocover return str(error) except GitCommandError: # pragma: nocover return VcsNone(self.uid, self.root).remove(ciopath) return None
# -------------------------------------------------------------------------
[docs] def move( self, ciopath1: CioPath, ciopath2: CioPath, overwrite_ok: bool = False, mode: str = 'move') -> tuple[CioPath | None, str | None]: """Move or copy a file inside the warehouse. :type ciopath1: .lib.ciopath.CioPath :param ciopaht1: `CioPath` of the first file. :type ciopath2: .lib.ciopath.CioPath :param ciopath2: `CioPath` of the srecond file. :param bool overwrite_ok: (default=False) If ``True``, do nothing if files are the same. :param str mode: (``'move'``, ``'copy'``, ``'rename'``) The way the move must operate. :rtype: tuple :return: A tuple such as ``(final_ciopath2, error)``. """ # pylint: disable = too-many-return-statements, too-many-branches # Memorize paths to remove paths = [] if mode != 'copy': paths = [ciopath1.path] abs_info = ciopath1.absolute_info(self.root) if abs_info and exists(abs_info): paths.append(relpath(abs_info, self.root)) if abs_info and ciopath1.is_directory(): abs_info = splitext(abs_info)[0] if isdir(abs_info): paths.append(relpath(abs_info, self.root)) # Use VcsNone to move files ciopath, err = VcsNone(self.uid, self.root).move_( ciopath1, ciopath2, overwrite_ok, mode) if ciopath is None: return ciopath, err if self._ignore.search(ciopath.path) is not None: return ciopath, None # Remove source repo = Repo.init(self.root) if paths and ciopath1 != ciopath: try: repo.index.remove(paths, working_tree=True, r=True) except (IOError, OSError) as error: # pragma: nocover return None, str(error) except GitCommandError as error: # pragma: nocover return None, ' '.join(error.stderr.split('\n')).strip() # Add destination paths = [ciopath.path] abs_info = ciopath.absolute_info(self.root) if abs_info and exists(abs_info): paths.append(relpath(abs_info, self.root)) if abs_info and ciopath1.is_directory(): abs_info = splitext(abs_info)[0] if isdir(abs_info): paths.append(relpath(abs_info, self.root)) try: repo.index.add(paths) except (IOError, OSError) as error: # pragma: nocover return None, str(error) except GitCommandError as error: # pragma: nocover return None, ' '.join(error.stderr.split('\n')).strip() return ciopath, None
# -------------------------------------------------------------------------
[docs] def copy( self, ciopath1: CioPath, ciopath2: CioPath, overwrite_ok: bool = False) -> tuple[CioPath | None, str | None]: """Copy a file. :type ciopath1: .lib.ciopath.CioPath :param ciopaht1: `CioPath` of the first file. :type ciopath2: .lib.ciopath.CioPath :param ciopath2: `CioPath` of the srecond file. :param bool overwrite_ok: (default=False) If ``True``, do nothing if files are the same. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` """ return self.move( ciopath1, ciopath2, overwrite_ok=overwrite_ok, mode='copy')
# -------------------------------------------------------------------------
[docs] def commit(self, message: str, name: str, email: str = '') -> str | None: """Commit changes. :param str message: Message for mommit. :param str name: Name of the author. :param str email: (optional) Emial of the author. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None`` if it succeeds. """ if not message: return _('git commit: empty message') author: Actor = namedtuple('Actor', 'name email')( # type: ignore name=name, email=email) # yapf: disable try: repo = Repo.init(self.root) repo.index.commit(message, author=author) except (IOError, OSError, UnicodeEncodeError) as error: # pragma: nocover return str(error) except GitCommandError as error: # pragma: nocover return ' '.join(error.stderr.split('\n')).strip() return None
# -------------------------------------------------------------------------
[docs] def log(self, ciopath: CioPath, limit: int = 20) -> tuple: """Show revision history. :type ciopath: .lib.ciopath.CioPath :param ciopaht: `CioPath` of the file. :param init limit: (default=20) Maximum number of commits. :rtype: list :return: A list of tuples such as ``(commit_id, datetime, commiter_name, message)``. """ abs_path = ciopath.absolute_path(self.root) if ciopath.wid != self.uid or not abs_path or not exists(abs_path): return () repo = Repo.init(self.root) log = [] for commit in repo.iter_commits( # yapf: disable '--all', max_count=limit, paths=ciopath.path): log.append(( # yapf: disable commit.hexsha, commit.committed_datetime.replace(tzinfo=None), commit.author.name, commit.message)) return tuple(log)
# ------------------------------------------------------------------------- def _fix_lock(self): """Remove long locks. :rtype: bool """ lock_file = join(self.root, '.git', GIT_LOCK_FILE) if exists(lock_file) and getmtime(lock_file) + self._lock_ttl < time(): try: remove(lock_file) except OSError: # pragma: nocover pass return True return False # ------------------------------------------------------------------------- def _exclude_files(self): """Locally exclude `(*)` and `.local` directories.""" exclude_file = join(self.root, '.git', 'info', 'exclude') with open(exclude_file, 'r', encoding='utf8') as hdl: content = hdl.read() if LOCAL_DIR not in content: content += f'\n{IGNORED_PREFIX}*{IGNORED_SUFFIX}\n'\ f'{LOCAL_DIR}' with open(exclude_file, 'w', encoding='utf8') as hdl: hdl.write(content)