from __future__ import annotations
import hashlib
import shutil
import subprocess
from dataclasses import dataclass
from fnmatch import fnmatch
from pathlib import Path, PurePosixPath
from archledger.errors import StorageError
from archledger.model import (
VALID_OUTPUT_FORMATS,
ArchitectureRecord,
default_document_filename_for_output_format,
is_visible_status,
)
from archledger.storage.common import utc_now_iso
from archledger.storage.paths import ProjectPaths
from archledger.storage.paths import is_relative_to as _is_relative_to
from archledger.storage.project_config import ProjectConfig
SOURCE_STATE_SCHEMA = "archledger.source-state.v2"
[docs]
@dataclass(frozen=True, slots=True)
class TrackedFile:
path: str
sha256: str
[docs]
@dataclass(frozen=True, slots=True)
class DirectoryState:
path: str
sha256: str
file_count: int
[docs]
@dataclass(frozen=True, slots=True)
class SourceState:
schema: str
project_uuid: str
project_name: str
created_at: str
updated_at: str
reason: str
scanner: dict[str, object]
files: dict[str, TrackedFile]
directories: dict[str, DirectoryState]
[docs]
@dataclass(frozen=True, slots=True)
class ChangedFile:
path: str
change: str
old_sha256: str | None = None
new_sha256: str | None = None
[docs]
@dataclass(frozen=True, slots=True)
class PossibleRename:
old_path: str
new_path: str
sha256: str
[docs]
@dataclass(frozen=True, slots=True)
class ImpactedRecord:
id: str
type: str
title: str
status: str
section: str
path: str
matched_refs: tuple[str, ...]
[docs]
@dataclass(frozen=True, slots=True)
class ChangeSet:
baseline_exists: bool
baseline_updated_at: str | None
baseline_reason: str | None
current_scanned_at: str
scanner_used: str
file_count: int
changed_files: tuple[ChangedFile, ...]
possible_renames: tuple[PossibleRename, ...]
impacted_records: tuple[ImpactedRecord, ...]
impacted_sections: tuple[str, ...]
unlinked_changed_files: tuple[str, ...]
unbaselined_files: tuple[str, ...] = ()
[docs]
def scan_workspace(
paths: ProjectPaths,
config: ProjectConfig,
*,
reason: str = "manual",
scanned_at: str | None = None,
) -> SourceState:
timestamp = utc_now_iso() if scanned_at is None else scanned_at
scanner_used, candidates = _scan_candidate_paths(paths, config)
files: dict[str, TrackedFile] = {}
for file_path in candidates:
if not file_path.is_file():
continue
if _should_skip_path(file_path, paths, config):
continue
relative_path = _relative_posix_path(paths.workspace_root, file_path)
if not _matches_any_pattern(relative_path, config.tracking_include):
continue
if _matches_any_pattern(relative_path, config.tracking_exclude):
continue
size = file_path.stat().st_size
if size > config.tracking_max_file_bytes:
continue
files[relative_path] = TrackedFile(
path=relative_path,
sha256=_sha256_for_path(file_path),
)
sorted_files = dict(sorted(files.items()))
return SourceState(
schema=SOURCE_STATE_SCHEMA,
project_uuid=config.project_uuid,
project_name=config.project_name,
created_at=timestamp,
updated_at=timestamp,
reason=reason,
scanner={
"mode": config.tracking_scanner,
"used": scanner_used,
"include": list(config.tracking_include),
"exclude": list(config.tracking_exclude),
"max_file_bytes": config.tracking_max_file_bytes,
"hash_algorithm": "sha256",
"hash_content": "utf8-surrogateescape-lf-normalized",
},
files=sorted_files,
directories=_build_directory_tree(sorted_files),
)
[docs]
def diff_source_states(
baseline: SourceState | None,
current: SourceState,
) -> ChangeSet:
if baseline is None:
return ChangeSet(
baseline_exists=False,
baseline_updated_at=None,
baseline_reason=None,
current_scanned_at=current.updated_at,
scanner_used=_scanner_used(current),
file_count=len(current.files),
changed_files=(),
possible_renames=(),
impacted_records=(),
impacted_sections=(),
unlinked_changed_files=(),
unbaselined_files=tuple(sorted(current.files)),
)
old_paths = set(baseline.files)
new_paths = set(current.files)
added_paths = sorted(new_paths - old_paths)
deleted_paths = sorted(old_paths - new_paths)
common_paths = sorted(old_paths & new_paths)
changed_files: list[ChangedFile] = []
for path in added_paths:
tracked = current.files[path]
changed_files.append(
ChangedFile(
path=path,
change="added",
new_sha256=tracked.sha256,
)
)
for path in common_paths:
old_tracked = baseline.files[path]
new_tracked = current.files[path]
if old_tracked.sha256 == new_tracked.sha256:
continue
changed_files.append(
ChangedFile(
path=path,
change="modified",
old_sha256=old_tracked.sha256,
new_sha256=new_tracked.sha256,
)
)
for path in deleted_paths:
tracked = baseline.files[path]
changed_files.append(
ChangedFile(
path=path,
change="deleted",
old_sha256=tracked.sha256,
)
)
deleted_by_sha: dict[str, list[str]] = {}
for path in deleted_paths:
deleted_by_sha.setdefault(baseline.files[path].sha256, []).append(path)
for sha256_paths in deleted_by_sha.values():
sha256_paths.sort()
possible_renames: list[PossibleRename] = []
for path in added_paths:
tracked = current.files[path]
old_paths_for_sha = deleted_by_sha.get(tracked.sha256, [])
if not old_paths_for_sha:
continue
old_path = old_paths_for_sha.pop(0)
possible_renames.append(
PossibleRename(
old_path=old_path,
new_path=path,
sha256=tracked.sha256,
)
)
return ChangeSet(
baseline_exists=True,
baseline_updated_at=baseline.updated_at,
baseline_reason=baseline.reason,
current_scanned_at=current.updated_at,
scanner_used=_scanner_used(current),
file_count=len(current.files),
changed_files=tuple(
sorted(changed_files, key=lambda item: (item.change, item.path))
),
possible_renames=tuple(
sorted(possible_renames, key=lambda item: (item.old_path, item.new_path))
),
impacted_records=(),
impacted_sections=(),
unlinked_changed_files=(),
)
[docs]
def resolve_impacts(
records: list[ArchitectureRecord],
changes: ChangeSet,
*,
include_draft: bool,
include_superseded: bool,
) -> ChangeSet:
changed_paths = {
item.path
for item in changes.changed_files
if item.change in {"added", "modified", "deleted"}
}
impacted_records: list[ImpactedRecord] = []
impacted_sections: set[str] = set(changes.impacted_sections)
linked_changed_paths: set[str] = set()
for record in records:
if record.type != "section" and not is_visible_status(
record.status,
include_draft=include_draft,
include_superseded=include_superseded,
):
continue
matched_refs = sorted(
{
source_ref.path
for source_ref in record.source_refs
for changed_path in changed_paths
if _source_ref_matches_path(source_ref.path, changed_path)
}
)
if not matched_refs:
continue
linked_changed_paths.update(
changed_path
for changed_path in changed_paths
if any(
_source_ref_matches_path(source_ref.path, changed_path)
for source_ref in record.source_refs
)
)
if record.type == "section":
impacted_sections.add(record.section)
continue
impacted_records.append(
ImpactedRecord(
id=record.id,
type=record.type,
title=record.title,
status=record.status,
section=record.section,
path=record.path.as_posix(),
matched_refs=tuple(matched_refs),
)
)
impacted_sections.add(record.section)
return ChangeSet(
baseline_exists=changes.baseline_exists,
baseline_updated_at=changes.baseline_updated_at,
baseline_reason=changes.baseline_reason,
current_scanned_at=changes.current_scanned_at,
scanner_used=changes.scanner_used,
file_count=changes.file_count,
changed_files=changes.changed_files,
possible_renames=changes.possible_renames,
impacted_records=tuple(sorted(impacted_records, key=lambda item: item.id)),
impacted_sections=tuple(sorted(impacted_sections)),
unlinked_changed_files=tuple(sorted(changed_paths - linked_changed_paths)),
unbaselined_files=changes.unbaselined_files,
)
def _scan_candidate_paths(
paths: ProjectPaths,
config: ProjectConfig,
) -> tuple[str, list[Path]]:
scanner_mode = config.tracking_scanner
if scanner_mode == "filesystem":
return ("filesystem", _scan_filesystem_paths(paths.workspace_root))
if scanner_mode == "git":
return ("git", _scan_git_paths(paths.workspace_root))
try:
return ("git", _scan_git_paths(paths.workspace_root))
except StorageError:
return ("filesystem", _scan_filesystem_paths(paths.workspace_root))
def _scan_git_paths(workspace_root: Path) -> list[Path]:
if shutil.which("git") is None:
raise StorageError("git is not available for source tracking.")
result = subprocess.run(
[
"git",
"-C",
str(workspace_root),
"ls-files",
"--cached",
"--others",
"--exclude-standard",
],
capture_output=True,
check=False,
text=True,
)
if result.returncode != 0:
raise StorageError("git scanner could not enumerate workspace files.")
paths: list[Path] = []
for line in result.stdout.splitlines():
relative_path = line.strip()
if not relative_path:
continue
paths.append((workspace_root / Path(relative_path)).resolve())
return sorted(paths)
def _scan_filesystem_paths(workspace_root: Path) -> list[Path]:
return sorted(
path.resolve() for path in workspace_root.rglob("*") if path.is_file()
)
def _should_skip_path(
path: Path,
paths: ProjectPaths,
config: ProjectConfig,
) -> bool:
resolved = path.resolve()
if _is_relative_to(resolved, paths.archledger_dir):
return True
if paths.build_dir != paths.workspace_root and _is_relative_to(
resolved, paths.build_dir
):
return True
if resolved in _generated_output_paths(paths, config):
return True
return False
def _relative_posix_path(workspace_root: Path, path: Path) -> str:
return path.relative_to(workspace_root).as_posix()
def _sha256_for_path(path: Path) -> str:
data = path.read_bytes()
text = data.decode("utf-8", errors="surrogateescape")
normalized = text.replace("\r\n", "\n").replace("\r", "\n")
return hashlib.sha256(
normalized.encode("utf-8", errors="surrogateescape")
).hexdigest()
def _matches_any_pattern(path: str, patterns: tuple[str, ...]) -> bool:
return any(_matches_pattern(path, pattern) for pattern in patterns)
def _matches_pattern(path: str, pattern: str) -> bool:
pure_path = PurePosixPath(path)
if pure_path.match(pattern):
return True
if pattern.startswith("**/") and pure_path.match(pattern[3:]):
return True
if pattern.endswith("/**"):
prefix = pattern[:-3].rstrip("/")
return path == prefix or path.startswith(f"{prefix}/")
return fnmatch(path, pattern)
def _scanner_used(state: SourceState) -> str:
scanner_used = state.scanner.get("used")
if not isinstance(scanner_used, str):
return "filesystem"
return scanner_used
def _source_ref_matches_path(source_ref_path: str, changed_path: str) -> bool:
if source_ref_path.endswith("/"):
return changed_path.startswith(source_ref_path)
return source_ref_path == changed_path
def _generated_output_paths(paths: ProjectPaths, config: ProjectConfig) -> set[Path]:
output_paths = {
(paths.build_dir / config.build_default_output).resolve(),
}
for output_format in VALID_OUTPUT_FORMATS:
output_paths.add(
(
paths.build_dir
/ default_document_filename_for_output_format(output_format)
).resolve()
)
return output_paths
def _build_directory_tree(files: dict[str, TrackedFile]) -> dict[str, DirectoryState]:
children: dict[str, set[tuple[str, str, str]]] = {".": set()}
file_counts: dict[str, int] = {".": 0}
directories: set[str] = {"."}
for file_path, tracked in sorted(files.items()):
parts = PurePosixPath(file_path).parts
if not parts:
continue
parent = "."
ancestors = ["."]
for part in parts[:-1]:
child_path = part if parent == "." else f"{parent}/{part}"
children.setdefault(parent, set()).add(("D", part, child_path))
children.setdefault(child_path, set())
directories.add(child_path)
parent = child_path
ancestors.append(parent)
children.setdefault(parent, set()).add(("F", parts[-1], tracked.sha256))
for directory in ancestors:
file_counts[directory] = file_counts.get(directory, 0) + 1
hashes: dict[str, str] = {}
for directory in sorted(
directories,
key=lambda value: 0 if value == "." else len(PurePosixPath(value).parts),
reverse=True,
):
digest = hashlib.sha256()
for kind, name, value in sorted(children.get(directory, set())):
if kind == "D":
value = hashes[value]
digest.update(f"{kind}\0{name}\0{value}\n".encode())
hashes[directory] = digest.hexdigest()
return {
path: DirectoryState(
path=path,
sha256=hashes[path],
file_count=file_counts.get(path, 0),
)
for path in sorted(directories)
}