"""Git status classification service for editor/IDE file browsers.
Provides per-file git status used to draw Sublime/VSCode-style coloured dots
beside file entries. A background thread polls ``git status --porcelain=v2``
and ``git ls-files --others --exclude-standard`` at a fixed interval; lookups
are O(1) against an in-memory map.
This module is part of step I1 of the file-status feature. The dot drawer (I2)
and dirty-buffer wiring (I3) live elsewhere; an optional callback hook is
already provided here so that I3 can plug in without touching this file.
"""
from __future__ import annotations
import logging
import subprocess
import threading
from collections.abc import Callable
from enum import StrEnum
from pathlib import Path
log = logging.getLogger(__name__)
__all__ = ["Status", "GitStatusProvider", "parse_porcelain_v2"]
[docs]
class Status(StrEnum):
"""Git status classification used to pick a dot colour.
Order matters when multiple states could apply: ``MODIFIED_UNSAVED`` always
wins (the user is actively editing); ``CONFLICTED`` outranks committed
states; ``UNTRACKED``, ``SAVED_UNCOMMITTED`` and ``COMMITTED_UNPUSHED`` are
mutually exclusive in normal git output.
"""
CLEAN = "clean"
MODIFIED_UNSAVED = "modified_unsaved" # yellow — in-editor unsaved buffer
SAVED_UNCOMMITTED = "saved_uncommitted" # orange — modified vs index
COMMITTED_UNPUSHED = "committed_unpushed" # blue — branch ahead of upstream
CONFLICTED = "conflicted" # red — merge conflict
UNTRACKED = "untracked" # green — new file, not tracked
# XY pairs from porcelain v2 ``1 XY ...`` lines that indicate a merge conflict.
# See ``git status --help`` "Short Format" / "Porcelain Format Version 2".
_CONFLICT_XY = frozenset({"UU", "AA", "DD", "AU", "UA", "DU", "UD"})
[docs]
def parse_porcelain_v2(
porcelain_output: str,
untracked_output: str = "",
repo_root: Path | None = None,
) -> tuple[dict[Path, Status], bool]:
"""Parse ``git status --porcelain=v2 --branch`` plus untracked listing.
Returns ``(map, branch_ahead)`` where ``map`` is a ``{absolute path: Status}``
dict and ``branch_ahead`` is True when the current branch is ahead of its
upstream (``# branch.ab +N -M`` with N > 0). The caller decides how to
apply the repo-level ahead state to otherwise-clean files.
``repo_root``, if given, is used to absolutise the relative paths git
emits. When ``None``, paths are kept as-is (useful for unit tests).
"""
result: dict[Path, Status] = {}
branch_ahead = False
base = repo_root if repo_root is not None else Path()
for raw_line in porcelain_output.splitlines():
if not raw_line:
continue
if raw_line.startswith("# branch.ab "):
# Format: "# branch.ab +N -M"
parts = raw_line.split()
if len(parts) >= 3 and parts[2].startswith("+"):
try:
ahead = int(parts[2][1:])
except ValueError:
ahead = 0
branch_ahead = ahead > 0
continue
if raw_line.startswith("#"):
continue
kind, _, rest = raw_line.partition(" ")
if kind == "1":
# "1 XY sub mH mI mW hH hI path"
fields = rest.split(" ", 7)
if len(fields) < 8:
continue
xy = fields[0]
path = fields[7]
status = _classify_changed(xy)
result[base / path] = status
elif kind == "2":
# Renamed/copied: "2 XY sub mH mI mW hH hI X<score> path<TAB>orig"
fields = rest.split(" ", 8)
if len(fields) < 9:
continue
xy = fields[0]
path_field = fields[8]
new_path = path_field.split("\t", 1)[0]
result[base / new_path] = _classify_changed(xy)
elif kind == "u":
# Unmerged: "u XY sub m1 m2 m3 mW h1 h2 h3 path" — 10 tokens after 'u'.
fields = rest.split(" ", 9)
if len(fields) < 10:
continue
result[base / fields[9]] = Status.CONFLICTED
elif kind == "?":
# Untracked entry (porcelain v2 emits these with --untracked-files)
result[base / rest] = Status.UNTRACKED
for raw_line in untracked_output.splitlines():
if not raw_line:
continue
result[base / raw_line] = Status.UNTRACKED
return result, branch_ahead
def _classify_changed(xy: str) -> Status:
"""Classify a porcelain v2 ``XY`` field for a tracked, changed file."""
if xy in _CONFLICT_XY:
return Status.CONFLICTED
# X = staged (index vs HEAD), Y = worktree (worktree vs index).
# Any non-'.' value means the file differs from the committed state.
return Status.SAVED_UNCOMMITTED
[docs]
class GitStatusProvider:
"""Background-polling git status classifier.
Spawns a daemon thread that runs ``git status --porcelain=v2 --branch``
and ``git ls-files --others --exclude-standard`` every ``poll_interval``
seconds and rebuilds an internal ``{path: Status}`` map. ``status_for``
performs an O(1) lookup against that map.
Thread-safety: the map is replaced atomically via a single attribute
assignment under ``self._lock``. Readers grab a local reference to the
current map under the lock and then index into it without holding the
lock — dict reads on a stable reference are safe in CPython.
A missing ``.git`` directory is **not** an error; everything reports
``CLEAN``. Partial/corrupt repos (worktree pointer with stale ``gitdir:``,
``.git`` directory with no ``HEAD``, missing ``git`` binary) are also
tolerated: the provider logs a warning, marks itself ``_disabled``, and
reports ``CLEAN`` for every lookup. Callers can check ``is_active`` to
decide whether to render git dots at all.
"""
def __init__(
self,
repo_root: Path,
dirty_paths_callback: Callable[[], set[Path]] | None = None,
poll_interval: float = 2.0,
) -> None:
self._disabled: bool = False
self.repo_root = Path(repo_root).resolve()
self.dirty_paths_callback = dirty_paths_callback
self.poll_interval = poll_interval
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._map: dict[Path, Status] = {}
self._branch_ahead: bool = False
self._thread: threading.Thread | None = None
if (self.repo_root / ".git").exists():
self.refresh() # prime the cache before starting the poller
if not self._disabled:
self._thread = threading.Thread(
target=self._run, name="GitStatusProvider", daemon=True
)
self._thread.start()
# ------------------------------------------------------------------ public
[docs]
def status_for(self, path: Path | str) -> Status:
"""Return the status for ``path`` (absolute or relative to repo root)."""
p = Path(path)
if not p.is_absolute():
p = self.repo_root / p
try:
p = p.resolve()
except OSError:
pass
if self.dirty_paths_callback is not None:
try:
dirty = self.dirty_paths_callback()
except Exception:
log.exception("git_status: dirty_paths_callback raised")
dirty = set()
if p in dirty:
return Status.MODIFIED_UNSAVED
with self._lock:
current = self._map
ahead = self._branch_ahead
explicit = current.get(p)
if explicit is not None:
return explicit
if ahead:
# Repo-level "ahead of upstream" — treat all otherwise-clean tracked
# files as committed-unpushed, matching Sublime/VSCode behaviour.
return Status.COMMITTED_UNPUSHED
return Status.CLEAN
[docs]
@property
def is_active(self) -> bool:
"""Whether the provider is functional (git binary present, repo readable)."""
return not self._disabled
[docs]
def refresh(self) -> None:
"""Synchronously rebuild the status map. Called from the polling thread
and at construction time; safe to call from tests.
Tolerates partial/corrupt repos and a missing ``git`` binary: on the
first failure the provider is marked ``_disabled`` and subsequent
``status_for`` calls return ``CLEAN``.
"""
if not (self.repo_root / ".git").exists():
with self._lock:
self._map = {}
self._branch_ahead = False
return
try:
porcelain = subprocess.run(
["git", "status", "--porcelain=v2", "--branch"],
cwd=self.repo_root,
check=False,
capture_output=True,
)
if porcelain.returncode != 0:
self._disable_with_warning(porcelain.stderr)
return
untracked = subprocess.run(
["git", "ls-files", "--others", "--exclude-standard"],
cwd=self.repo_root,
check=False,
capture_output=True,
)
if untracked.returncode != 0:
self._disable_with_warning(untracked.stderr)
return
except FileNotFoundError as exc:
log.warning(
"git_status: 'git' binary not found for %s (%s); disabling provider",
self.repo_root,
exc,
)
self._disabled = True
with self._lock:
self._map = {}
self._branch_ahead = False
return
new_map, ahead = parse_porcelain_v2(
porcelain.stdout.decode(errors="replace"),
untracked.stdout.decode(errors="replace"),
repo_root=self.repo_root,
)
# Resolve symlinks so lookups by realpath also hit.
resolved: dict[Path, Status] = {}
for path, status in new_map.items():
try:
resolved[path.resolve()] = status
except OSError:
resolved[path] = status
with self._lock:
self._map = resolved
self._branch_ahead = ahead
def _disable_with_warning(self, stderr: bytes) -> None:
"""Log a warning, clear state, and mark the provider disabled."""
tail = stderr.decode(errors="replace")[:200] if stderr else ""
log.warning(
"git_status: git command failed for %s (%s); disabling provider",
self.repo_root,
tail.strip(),
)
self._disabled = True
with self._lock:
self._map = {}
self._branch_ahead = False
[docs]
def stop(self, timeout: float | None = 1.0) -> None:
"""Signal the polling thread to exit and wait briefly for it."""
self._stop_event.set()
if self._thread is not None and self._thread.is_alive():
self._thread.join(timeout=timeout)
# ------------------------------------------------------------------ internals
def _run(self) -> None:
while not self._stop_event.wait(self.poll_interval):
if self._disabled:
# Failure is permanent within this process lifetime; stop polling.
return
try:
self.refresh()
except Exception:
log.exception("git_status: unexpected error during refresh")