# This module is part of GitPython and is released under the
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
"""Standalone functions to accompany the index implementation and make it more
versatile."""
__all__ = [
"write_cache",
"read_cache",
"write_tree_from_cache",
"entry_key",
"stat_mode_to_index_mode",
"S_IFGITLINK",
"run_commit_hook",
"hook_path",
]
from io import BytesIO
import os
import os.path as osp
from pathlib import Path
from stat import S_IFDIR, S_IFLNK, S_IFMT, S_IFREG, S_ISDIR, S_ISLNK, S_IXUSR
import subprocess
import sys
from gitdb.base import IStream
from gitdb.typ import str_tree_type
from git.cmd import handle_process_output, safer_popen
from git.compat import defenc, force_bytes, force_text, safe_decode
from git.exc import HookExecutionError, UnmergedEntriesError
from git.objects.fun import (
traverse_tree_recursive,
traverse_trees_recursive,
tree_to_stream,
)
from git.util import IndexFileSHA1Writer, finalize_process
from .typ import CE_EXTENDED, BaseIndexEntry, IndexEntry, CE_NAMEMASK, CE_STAGESHIFT
from .util import pack, unpack
# typing -----------------------------------------------------------------------------
from typing import Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast
from git.types import PathLike
if TYPE_CHECKING:
from git.db import GitCmdObjectDB
from git.objects.tree import TreeCacheTup
from .base import IndexFile
# ------------------------------------------------------------------------------------
S_IFGITLINK = S_IFLNK | S_IFDIR
"""Flags for a submodule."""
CE_NAMEMASK_INV = ~CE_NAMEMASK
def hook_path(name: str, git_dir: PathLike) -> str:
""":return: path to the given named hook in the given git repository directory"""
return osp.join(git_dir, "hooks", name)
def _has_file_extension(path: str) -> str:
return osp.splitext(path)[1]
def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None:
"""Run the commit hook of the given name. Silently ignore hooks that do not exist.
:param name:
Name of hook, like ``pre-commit``.
:param index:
:class:`~git.index.base.IndexFile` instance.
:param args:
Arguments passed to hook file.
:raise git.exc.HookExecutionError:
"""
hp = hook_path(name, index.repo.git_dir)
if not os.access(hp, os.X_OK):
return
env = os.environ.copy()
env["GIT_INDEX_FILE"] = safe_decode(os.fspath(index.path))
env["GIT_EDITOR"] = ":"
cmd = [hp]
try:
if sys.platform == "win32" and not _has_file_extension(hp):
# Windows only uses extensions to determine how to open files
# (doesn't understand shebangs). Try using bash to run the hook.
relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix()
cmd = ["bash.exe", relative_hp]
process = safer_popen(
cmd + list(args),
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=index.repo.working_dir,
)
except Exception as ex:
raise HookExecutionError(hp, ex) from ex
else:
stdout_list: List[str] = []
stderr_list: List[str] = []
handle_process_output(process, stdout_list.append, stderr_list.append, finalize_process)
stdout = "".join(stdout_list)
stderr = "".join(stderr_list)
if process.returncode != 0:
stdout = force_text(stdout, defenc)
stderr = force_text(stderr, defenc)
raise HookExecutionError(hp, process.returncode, stderr, stdout)
# END handle return code
def stat_mode_to_index_mode(mode: int) -> int:
"""Convert the given mode from a stat call to the corresponding index mode and
return it."""
if S_ISLNK(mode): # symlinks
return S_IFLNK
if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules
return S_IFGITLINK
return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) # blobs with or without executable bit
def write_cache(
entries: Sequence[Union[BaseIndexEntry, "IndexEntry"]],
stream: IO[bytes],
extension_data: Union[None, bytes] = None,
ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer,
) -> None:
"""Write the cache represented by entries to a stream.
:param entries:
**Sorted** list of entries.
:param stream:
Stream to wrap into the AdapterStreamCls - it is used for final output.
:param ShaStreamCls:
Type to use when writing to the stream. It produces a sha while writing to it,
before the data is passed on to the wrapped stream.
:param extension_data:
Any kind of data to write as a trailer, it must begin a 4 byte identifier,
followed by its size (4 bytes).
"""
# Wrap the stream into a compatible writer.
stream_sha = ShaStreamCls(stream)
tell = stream_sha.tell
write = stream_sha.write
# Header
version = 3 if any(entry.extended_flags for entry in entries) else 2
write(b"DIRC")
write(pack(">LL", version, len(entries)))
# Body
for entry in entries:
beginoffset = tell()
write(entry.ctime_bytes) # ctime
write(entry.mtime_bytes) # mtime
path_str = str(entry.path)
path: bytes = force_bytes(path_str, encoding=defenc)
plen = len(path) & CE_NAMEMASK # Path length
assert plen == len(path), "Path %s too long to fit into index" % entry.path
flags = plen | (entry.flags & CE_NAMEMASK_INV) # Clear possible previous values.
if entry.extended_flags:
flags |= CE_EXTENDED
write(
pack(
">LLLLLL20sH",
entry.dev,
entry.inode,
entry.mode,
entry.uid,
entry.gid,
entry.size,
entry.binsha,
flags,
)
)
if entry.extended_flags:
write(pack(">H", entry.extended_flags))
write(path)
real_size = (tell() - beginoffset + 8) & ~7
write(b"\0" * ((beginoffset + real_size) - tell()))
# END for each entry
# Write previously cached extensions data.
if extension_data is not None:
stream_sha.write(extension_data)
# Write the sha over the content.
stream_sha.write_sha()
def read_header(stream: IO[bytes]) -> Tuple[int, int]:
"""Return tuple(version_long, num_entries) from the given stream."""
type_id = stream.read(4)
if type_id != b"DIRC":
raise AssertionError("Invalid index file header: %r" % type_id)
unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2)))
version, num_entries = unpacked
assert version in (1, 2, 3), "Unsupported git index version %i, only 1, 2, and 3 are supported" % version
return version, num_entries
def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, int]:
"""
:return:
Key suitable to be used for the
:attr:`index.entries