Source code for simvx.core.git_status

"""Git status classification service for editor/IDE file browsers.

Provides per-file git status used to draw Sublime/VSCode-style coloured dots
beside file entries. A background thread polls ``git status --porcelain=v2``
and ``git ls-files --others --exclude-standard`` at a fixed interval; lookups
are O(1) against an in-memory map.

This module is part of step I1 of the file-status feature. The dot drawer (I2)
and dirty-buffer wiring (I3) live elsewhere; an optional callback hook is
already provided here so that I3 can plug in without touching this file.
"""

from __future__ import annotations

import logging
import subprocess
import threading
from collections.abc import Callable
from enum import StrEnum
from pathlib import Path

log = logging.getLogger(__name__)

__all__ = ["Status", "GitStatusProvider", "parse_porcelain_v2"]


[docs] class Status(StrEnum): """Git status classification used to pick a dot colour. Order matters when multiple states could apply: ``MODIFIED_UNSAVED`` always wins (the user is actively editing); ``CONFLICTED`` outranks committed states; ``UNTRACKED``, ``SAVED_UNCOMMITTED`` and ``COMMITTED_UNPUSHED`` are mutually exclusive in normal git output. """ CLEAN = "clean" MODIFIED_UNSAVED = "modified_unsaved" # yellow — in-editor unsaved buffer SAVED_UNCOMMITTED = "saved_uncommitted" # orange — modified vs index COMMITTED_UNPUSHED = "committed_unpushed" # blue — branch ahead of upstream CONFLICTED = "conflicted" # red — merge conflict UNTRACKED = "untracked" # green — new file, not tracked
# XY pairs from porcelain v2 ``1 XY ...`` lines that indicate a merge conflict. # See ``git status --help`` "Short Format" / "Porcelain Format Version 2". _CONFLICT_XY = frozenset({"UU", "AA", "DD", "AU", "UA", "DU", "UD"})
[docs] def parse_porcelain_v2( porcelain_output: str, untracked_output: str = "", repo_root: Path | None = None, ) -> tuple[dict[Path, Status], bool]: """Parse ``git status --porcelain=v2 --branch`` plus untracked listing. Returns ``(map, branch_ahead)`` where ``map`` is a ``{absolute path: Status}`` dict and ``branch_ahead`` is True when the current branch is ahead of its upstream (``# branch.ab +N -M`` with N > 0). The caller decides how to apply the repo-level ahead state to otherwise-clean files. ``repo_root``, if given, is used to absolutise the relative paths git emits. When ``None``, paths are kept as-is (useful for unit tests). """ result: dict[Path, Status] = {} branch_ahead = False base = repo_root if repo_root is not None else Path() for raw_line in porcelain_output.splitlines(): if not raw_line: continue if raw_line.startswith("# branch.ab "): # Format: "# branch.ab +N -M" parts = raw_line.split() if len(parts) >= 3 and parts[2].startswith("+"): try: ahead = int(parts[2][1:]) except ValueError: ahead = 0 branch_ahead = ahead > 0 continue if raw_line.startswith("#"): continue kind, _, rest = raw_line.partition(" ") if kind == "1": # "1 XY sub mH mI mW hH hI path" fields = rest.split(" ", 7) if len(fields) < 8: continue xy = fields[0] path = fields[7] status = _classify_changed(xy) result[base / path] = status elif kind == "2": # Renamed/copied: "2 XY sub mH mI mW hH hI X<score> path<TAB>orig" fields = rest.split(" ", 8) if len(fields) < 9: continue xy = fields[0] path_field = fields[8] new_path = path_field.split("\t", 1)[0] result[base / new_path] = _classify_changed(xy) elif kind == "u": # Unmerged: "u XY sub m1 m2 m3 mW h1 h2 h3 path" — 10 tokens after 'u'. fields = rest.split(" ", 9) if len(fields) < 10: continue result[base / fields[9]] = Status.CONFLICTED elif kind == "?": # Untracked entry (porcelain v2 emits these with --untracked-files) result[base / rest] = Status.UNTRACKED for raw_line in untracked_output.splitlines(): if not raw_line: continue result[base / raw_line] = Status.UNTRACKED return result, branch_ahead
def _classify_changed(xy: str) -> Status: """Classify a porcelain v2 ``XY`` field for a tracked, changed file.""" if xy in _CONFLICT_XY: return Status.CONFLICTED # X = staged (index vs HEAD), Y = worktree (worktree vs index). # Any non-'.' value means the file differs from the committed state. return Status.SAVED_UNCOMMITTED
[docs] class GitStatusProvider: """Background-polling git status classifier. Spawns a daemon thread that runs ``git status --porcelain=v2 --branch`` and ``git ls-files --others --exclude-standard`` every ``poll_interval`` seconds and rebuilds an internal ``{path: Status}`` map. ``status_for`` performs an O(1) lookup against that map. Thread-safety: the map is replaced atomically via a single attribute assignment under ``self._lock``. Readers grab a local reference to the current map under the lock and then index into it without holding the lock — dict reads on a stable reference are safe in CPython. A missing ``.git`` directory is **not** an error; everything reports ``CLEAN``. Partial/corrupt repos (worktree pointer with stale ``gitdir:``, ``.git`` directory with no ``HEAD``, missing ``git`` binary) are also tolerated: the provider logs a warning, marks itself ``_disabled``, and reports ``CLEAN`` for every lookup. Callers can check ``is_active`` to decide whether to render git dots at all. """ def __init__( self, repo_root: Path, dirty_paths_callback: Callable[[], set[Path]] | None = None, poll_interval: float = 2.0, ) -> None: self._disabled: bool = False self.repo_root = Path(repo_root).resolve() self.dirty_paths_callback = dirty_paths_callback self.poll_interval = poll_interval self._lock = threading.Lock() self._stop_event = threading.Event() self._map: dict[Path, Status] = {} self._branch_ahead: bool = False self._thread: threading.Thread | None = None if (self.repo_root / ".git").exists(): self.refresh() # prime the cache before starting the poller if not self._disabled: self._thread = threading.Thread( target=self._run, name="GitStatusProvider", daemon=True ) self._thread.start() # ------------------------------------------------------------------ public
[docs] def status_for(self, path: Path | str) -> Status: """Return the status for ``path`` (absolute or relative to repo root).""" p = Path(path) if not p.is_absolute(): p = self.repo_root / p try: p = p.resolve() except OSError: pass if self.dirty_paths_callback is not None: try: dirty = self.dirty_paths_callback() except Exception: log.exception("git_status: dirty_paths_callback raised") dirty = set() if p in dirty: return Status.MODIFIED_UNSAVED with self._lock: current = self._map ahead = self._branch_ahead explicit = current.get(p) if explicit is not None: return explicit if ahead: # Repo-level "ahead of upstream" — treat all otherwise-clean tracked # files as committed-unpushed, matching Sublime/VSCode behaviour. return Status.COMMITTED_UNPUSHED return Status.CLEAN
[docs] @property def is_active(self) -> bool: """Whether the provider is functional (git binary present, repo readable).""" return not self._disabled
[docs] def refresh(self) -> None: """Synchronously rebuild the status map. Called from the polling thread and at construction time; safe to call from tests. Tolerates partial/corrupt repos and a missing ``git`` binary: on the first failure the provider is marked ``_disabled`` and subsequent ``status_for`` calls return ``CLEAN``. """ if not (self.repo_root / ".git").exists(): with self._lock: self._map = {} self._branch_ahead = False return try: porcelain = subprocess.run( ["git", "status", "--porcelain=v2", "--branch"], cwd=self.repo_root, check=False, capture_output=True, ) if porcelain.returncode != 0: self._disable_with_warning(porcelain.stderr) return untracked = subprocess.run( ["git", "ls-files", "--others", "--exclude-standard"], cwd=self.repo_root, check=False, capture_output=True, ) if untracked.returncode != 0: self._disable_with_warning(untracked.stderr) return except FileNotFoundError as exc: log.warning( "git_status: 'git' binary not found for %s (%s); disabling provider", self.repo_root, exc, ) self._disabled = True with self._lock: self._map = {} self._branch_ahead = False return new_map, ahead = parse_porcelain_v2( porcelain.stdout.decode(errors="replace"), untracked.stdout.decode(errors="replace"), repo_root=self.repo_root, ) # Resolve symlinks so lookups by realpath also hit. resolved: dict[Path, Status] = {} for path, status in new_map.items(): try: resolved[path.resolve()] = status except OSError: resolved[path] = status with self._lock: self._map = resolved self._branch_ahead = ahead
def _disable_with_warning(self, stderr: bytes) -> None: """Log a warning, clear state, and mark the provider disabled.""" tail = stderr.decode(errors="replace")[:200] if stderr else "" log.warning( "git_status: git command failed for %s (%s); disabling provider", self.repo_root, tail.strip(), ) self._disabled = True with self._lock: self._map = {} self._branch_ahead = False
[docs] def stop(self, timeout: float | None = 1.0) -> None: """Signal the polling thread to exit and wait briefly for it.""" self._stop_event.set() if self._thread is not None and self._thread.is_alive(): self._thread.join(timeout=timeout)
# ------------------------------------------------------------------ internals def _run(self) -> None: while not self._stop_event.wait(self.poll_interval): if self._disabled: # Failure is permanent within this process lifetime; stop polling. return try: self.refresh() except Exception: log.exception("git_status: unexpected error during refresh")