# -*- coding: utf-8 -*-
# test_index.py
# Copyright (C) 2008, 2009 Michael Trier ([email protected]) and contributors
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
from io import BytesIO
import os
from stat import (
S_ISLNK,
ST_MODE
)
import tempfile
from unittest import skipIf
import shutil
from git import (
IndexFile,
Repo,
BlobFilter,
UnmergedEntriesError,
Tree,
Object,
Diff,
GitCommandError,
CheckoutError,
)
from git.compat import is_win
from git.exc import (
HookExecutionError,
InvalidGitRepositoryError
)
from git.index.fun import hook_path
from git.index.typ import (
BaseIndexEntry,
IndexEntry
)
from git.objects import Blob
from test.lib import (
TestBase,
fixture_path,
fixture,
with_rw_repo
)
from test.lib import with_rw_directory
from git.util import Actor, rmtree
from git.util import HIDE_WINDOWS_KNOWN_ERRORS, hex_to_bin
from gitdb.base import IStream
import os.path as osp
from git.cmd import Git
HOOKS_SHEBANG = "#!/usr/bin/env sh\n"
is_win_without_bash = is_win and not shutil.which('bash.exe')
def _make_hook(git_dir, name, content, make_exec=True):
"""A helper to create a hook"""
hp = hook_path(name, git_dir)
hpd = osp.dirname(hp)
if not osp.isdir(hpd):
os.mkdir(hpd)
with open(hp, "wt") as fp:
fp.write(HOOKS_SHEBANG + content)
if make_exec:
os.chmod(hp, 0o744)
return hp
class TestIndex(TestBase):
def __init__(self, *args):
super(TestIndex, self).__init__(*args)
self._reset_progress()
def _assert_fprogress(self, entries):
self.assertEqual(len(entries), len(self._fprogress_map))
for _path, call_count in self._fprogress_map.items():
self.assertEqual(call_count, 2)
# END for each item in progress map
self._reset_progress()
def _fprogress(self, path, done, item):
self._fprogress_map.setdefault(path, 0)
curval = self._fprogress_map[path]
if curval == 0:
assert not done
if curval == 1:
assert done
self._fprogress_map[path] = curval + 1
def _fprogress_add(self, path, done, item):
"""Called as progress func - we keep track of the proper
call order"""
assert item is not None
self._fprogress(path, done, item)
def _reset_progress(self):
# maps paths to the count of calls
self._fprogress_map = {}
def _assert_entries(self, entries):
for entry in entries:
assert isinstance(entry, BaseIndexEntry)
assert not osp.isabs(entry.path)
assert "\\" not in entry.path
# END for each entry
def test_index_file_base(self):
# read from file
index = IndexFile(self.rorepo, fixture_path("index"))
assert index.entries
assert index.version > 0
# test entry
entry = next(iter(index.entries.values()))
for attr in ("path", "ctime", "mtime", "dev", "inode", "mode", "uid",
"gid", "size", "binsha", "hexsha", "stage"):
getattr(entry, attr)
# END for each method
# test update
entries = index.entries
assert isinstance(index.update(), IndexFile)
assert entries is not index.entries
# test stage
index_merge = IndexFile(self.rorepo, fixture_path("index_merge"))
self.assertEqual(len(index_merge.entries), 106)
assert len([e for e in index_merge.entries.values() if e.stage != 0])
# write the data - it must match the original
tmpfile = tempfile.mktemp()
index_merge.write(tmpfile)
with open(tmpfile, 'rb') as fp:
self.assertEqual(fp.read(), fixture("index_merge"))
os.remove(tmpfile)
def _cmp_tree_index(self, tree, index):
# fail unless both objects contain the same paths and blobs
if isinstance(tree, str):
tree = self.rorepo.commit(tree).tree
blist = []
for blob in tree.traverse(predicate=lambda e, d: e.type == "blob", branch_first=False):
assert (blob.path, 0) in index.entries
blist.append(blob)
# END for each blob in tree
if len(blist) != len(index.entries):
iset = {k[0] for k in index.entries.keys()}
bset = {b.path for b in blist}
raise AssertionError("CMP Failed: Missing entries in index: %s, missing in tree: %s" %
(bset - iset, iset - bset))
# END assertion message
@with_rw_repo('0.1.6')
def test_index_lock_handling(self, rw_repo):
def add_bad_blob():
rw_repo.index.add([Blob(rw_repo, b'f' * 20, 'bad-permissions', 'foo')])
try:
## 1st fail on purpose adding into index.
add_bad_blob()
except Exception as ex:
msg_py3 = "required argument is not an integer"
msg_py2 = "cannot convert argument to integer"
assert msg_py2 in str(ex) or msg_py3 in str(ex)
## 2nd time should not fail due to stray lock file
try:
add_bad_blob()
except Exception as ex:
assert "index.lock' could not be obtained" not in str(ex)
@with_rw_repo('0.1.6')
def test_index_file_from_tree(self, rw_repo):
common_ancestor_sha = "5117c9c8a4d3af19a9958677e45cda9269de1541"
cur_sha = "4b43ca7ff72d5f535134241e7c797ddc9c7a3573"
other_sha = "39f85c4358b7346fee22169da9cad93901ea9eb9"
# simple index from tree
base_index = IndexFile.from_tree(rw_repo, common_ancestor_sha)
assert base_index.entries
self._cmp_tree_index(common_ancestor_sha, base_index)
# merge two trees - its like a fast-forward
two_way_index = IndexFile.from_tree(rw_repo, common_ancestor_sha, cur_sha)
assert two_way_index.entries
self._cmp_tree_index(cur_sha, two_way_index)
# merge three trees - here we have a merge conflict
three_way_index = IndexFile.from_tree(rw_repo, common_ancestor_sha, cur_sha, other_sha)
assert len([e for e in three_way_index.entries.values() if e.stage != 0])
# ITERATE BLOBS
merge_required = lambda t: t[0] != 0
merge_blobs = list(three_way_index.iter_blobs(merge_required))
assert merge_blobs
assert merge_blobs[0][0] in (1, 2, 3)
assert isinstance(merge_blobs[0][1], Blob)
# test BlobFilter
prefix = 'lib/git'
for _stage, blob in base_index.iter_blobs(BlobFilter([prefix])):
assert blob.path.startswith(prefix)
# writing a tree should fail with an unmerged index
self.assertRaises(UnmergedEntriesError, three_way_index.write_tree)
# removed unmerged entries
unmerged_blob_map = three_way_index.unmerged_blobs()
assert unmerged_blob_map
# pick the first blob at the first stage we find and use it as resolved version
three_way_index.resolve_blobs(line[0][1] for line in unmerged_blob_map.values())
tree = three_way_index.write_tree()
assert isinstance(tree, Tree)
num_blobs = 0
for blob in tree.traverse(predicate=lambda item, d: item.type == "blob"):
assert (blob.path, 0) in three_way_index.entries
num_blobs += 1
# END for each blob
self.assertEqual(num_blobs, len(three_way_index.entries))
@with_rw_repo('0.1.6')
def test_index_merge_tree(self, rw_repo):
# A bit out of place, but we need a different repo for this:
self.assertNotEqual(self.rorepo, rw_repo)
self.assertEqual(len({self.rorepo, self.rorepo, rw_repo, rw_repo}), 2)
# SINGLE TREE MERGE
# current index is at the (virtual) cur_commit
next_commit = "4c39f9da792792d4e73fc3a5effde66576ae128c"
parent_commit = rw_repo.head.commit.parents[0]
manifest_key = IndexFile.entry_key('MANIFEST.in', 0)
manifest_entry = rw_repo.index.entries[manifest_key]
rw_repo.index.merge_tree(next_commit)
# only one change should be recorded
assert manifest_entry.binsha != rw_repo.index.entries[manifest_key].binsha
rw_repo.index.reset(rw_repo.head)
self.assertEqual(rw_repo.index.entries[manifest_key].binsha, manifest_entry.binsha)
# FAKE MERGE
#############
# Add a change with a NULL sha that should conflict with next_commit. We
# pretend there was a change, but we do not even bother adding a proper
# sha for it ( which makes things faster of course )
manifest_fake_entry = BaseIndexEntry((manifest_entry[0], b"\0" * 20, 0, manifest_entry[3]))
# try write flag
self._assert_entries(rw_repo.index.add([manifest_fake_entry], write=False))
# add actually resolves the null-hex-sha for us as a feature, but we can
# edit the index manually
assert rw_repo.index.entries[manifest_key].binsha != Object.NULL_BIN_SHA
# must operate on the same index for this ! Its a bit problematic as
# it might confuse people
index = rw_repo.index
index.entries[manifest_key] = IndexEntry.from_base(manifest_fake_entry)
index.write()
self.assertEqual(rw_repo.index.entries[manifest_key].hexsha, Diff.NULL_HEX_SHA)
# write an unchanged index ( just for the fun of it )
rw_repo.index.write()
# a three way merge would result in a conflict and fails as the command will
# not overwrite any entries in our index and hence leave them unmerged. This is
# mainly a protection feature as the current index is not yet in a tree
self.assertRaises(GitCommandError, index.merge_tree, next_commit, base=parent_commit)
# the only way to get the merged entries is to safe the current index away into a tree,
# which is like a temporary commit for us. This fails as well as the NULL sha deos not
# have a corresponding object
# NOTE: missing_ok is not a kwarg anymore, missing_ok is always true
# self.assertRaises(GitCommandError, index.write_tree)
# if missing objects are okay, this would work though ( they are always okay now )
# As we can't read back the tree with NULL_SHA, we rather set it to something else
index.entries[manifest_key] = IndexEntry(manifest_entry[:1] + (hex_to_bin('f' * 40),) + manifest_entry[2:])
tree = index.write_tree()
# now make a proper three way merge with unmerged entries
unmerged_tree = IndexFile.from_tree(rw_repo, parent_commit, tree, next_commit)
unmerged_blobs = unmerged_tree.unmerged_blobs()
self.assertEqual(len(unmerged_blobs), 1)
self.assertEqual(list(unmerged_blobs.keys())[0], manifest_key[0])
@with_rw_repo('0.1.6')
def test_index_file_diffing(self, rw_repo):
# default Index instance points to our index
index = IndexFile(rw_repo)
assert index.path is not None
assert len(index.entries)
# write the file back
index.write()
# could sha it, or check stats
# test diff
# resetting the head will leave the index in a different state, and the
# diff will yield a few changes
cur_head_commit = rw_repo.head.reference.commit
rw_repo.head.reset('HEAD~6', index=True, working_tree=False)
# diff against same index is 0
diff = index.diff()
self.assertEqual(len(diff), 0)
# against HEAD as string, must be the same as it matches index
diff = index.diff('HEAD')
self.assertEqual(len(diff), 0)
# against previous head, there must be a difference
diff = index.diff(cur_head_commit)
assert len(diff)
# we reverse the result
adiff = index.diff(str(cur_head_commit), R=True)
odiff = index.diff(cur_head_commit, R=False) # now its not reversed anymore
assert adiff != odiff
self.assertEqual(odiff, diff) # both unreversed diffs against HEAD
# against working copy - its still at cur_commit
wdiff = index.diff(None)
assert wdiff != adiff
assert wdiff != odiff
# against something unusual
self.assertRaises(ValueError, index.diff, int)
# adjust the index to match an old revision
cur_branch = rw_repo.active_branch
cur_commit = cur_branch.commit
rev_head_parent = 'HEAD~1'
assert index.reset(rev_head_parent) is index
self.assertEqual(cur_branch, rw_repo.active_branch)
self.assertEqual(cur_commit, rw_repo.head.commit)
# there must be differences towards the working tree which is in the 'future'
assert index.diff(None)
# reset the working copy as well to current head,to pull 'back' as well
new_data = b"will be reverted"
file_path = osp.join(rw_repo.working_tree_dir, "CHANGES")
with open(file_path, "wb") as fp:
fp.write(new_data)
index.reset(rev_head_parent, working_tree=True)
assert not index.diff(None)
self.assertEqual(cur_branch, rw_repo.active_branch)
self.assertEqual(cur_commit, rw_repo.head.commit)
with open(file_path, 'rb') as fp:
assert fp.read() != new_data
# test full checkout
test_file = osp.join(rw_repo.working_tree_dir, "CHANGES")
with open(test_file, 'ab') as fd:
fd.write(b"some data")
rval = index.checkout(None, force=True, fprogress=self._fprogress)
assert 'CHANGES' in list(rval)
self._assert_fprogress([None])
assert osp.isfile(test_file)
os.remove(test_file)
rval = index.checkout(None, force=False, fprogress=self._fprogress)
assert 'CHANGES' in list(rval)
self._assert_fprogress([None])
assert osp.isfile(test_file)
# individual file
os.remove(test_file)
rval = index.checkout(test_file, fprogress=self._fprogress)
self.assertEqual(list(rval)[0], 'CHANGES')
self._assert_fprogress([test_file])
assert osp.exists(test_file)
# checking out non-existing file throws
self.assertRaises(CheckoutError, index.checkout, "doesnt_exist_ever.txt.that")
self.assertRaises(CheckoutError, index.checkout, paths=["doesnt/exist"])
# checkout file with modifications
append_data = b"hello"
with open(test_file, "ab") as fp:
fp.write(append_data)
try:
index.checkout(test_file)
except CheckoutError as e:
# detailed exceptions are only possible in older git versions
if rw_repo.git._version_info < (2, 29):
self.assertEqual(len(e.failed_files), 1)
self.assertEqual(e.failed_files[0], osp.basename(test_file))
self.assertEqual(len(e.failed_files), len(e.failed_reasons))
self.assertIsInstance(e.failed_reasons[0], str)
self.assertEqual(len(e.valid_files), 0)
with open(test_file, 'rb') as fd:
s = fd.read()
self.assertTrue(s.endswith(append_data), s)
else:
raise AssertionError("Exception CheckoutError not thrown")
# if we force it it should work
index.checkout(test_file, force=True)
assert not open(test_file, 'rb').read().endswith(append_data)
# checkout directory
rmtree(osp.join(rw_repo.working_tree_dir, "lib"))
rval = index.checkout('lib')
assert len(list(rval)) > 1
def _count_existing(self, repo, files):
"""
Returns count of files that actually exist in the repository directory.
"""
existing = 0
basedir = repo.working_tree_dir
for f in files:
existing += osp.isfile(osp.join(basedir, f))
# END for each deleted file
return existing
# END num existing helper
@skipIf(HIDE_WINDOWS_KNOWN_ERRORS and Git.is_cygwin(),
"""FIXME: File "C:\\projects\\gitpython\\git\\test\\test_index.py", line 642, in test_index_mutation
self.assertEqual(fd.read(), link_target)
AssertionError: '!