from __future__ import annotations
from dataclasses import dataclass
from dataclasses import replace as dataclass_replace
from pathlib import Path
from typing import cast
from jinja2 import Environment, PackageLoader, select_autoescape
from archledger import __version__
from archledger.checks import content_warnings
from archledger.errors import StorageError, ValidationError
from archledger.id_segments import id_segment_for_metadata
from archledger.ids import validate_id_segment
from archledger.ledger_sequence import (
NumberedSourcePath as _NumberedSourcePath,
)
from archledger.ledger_sequence import (
analyze_ledger_sequence as _analyze_ledger_sequence,
)
from archledger.ledger_sequence import (
collect_numbered_source_paths as _collect_numbered_source_paths,
)
from archledger.model import (
CURRENT_SOURCE_SCHEMA_VERSION,
MAJOR_SECTION_SPECS,
RECORD_TYPE_TO_DEFAULT_SECTION,
RECORD_TYPE_TO_DIR,
RECORD_TYPES,
REQUIRED_RECORD_FIELDS,
VALID_BODY_FORMATS,
ArchitectureRecord,
SectionSpec,
is_visible_status,
known_source_extensions,
normalize_kind,
record_sort_key,
record_template_name_for_source_format,
section_body_placeholder_for_source_format,
validate_record,
)
from archledger.record_types import RecordContextInput
from archledger.source_refs import normalize_source_refs
from archledger.storage.common import ensure_dir, utc_now_iso, write_text_atomic
from archledger.storage.frontmatter import (
FrontMatterError,
iter_source_files,
read_front_matter_document,
write_front_matter_document,
)
from archledger.storage.meta import (
default_storage_meta,
next_number_floor,
read_storage_meta,
write_storage_meta,
)
from archledger.storage.paths import ProjectPaths, is_relative_to
from archledger.storage.project_config import ProjectConfig
[docs]
@dataclass(frozen=True, slots=True)
class InitResult:
workspace_root: Path
config_path: Path
archledger_dir: Path
created_paths: tuple[Path, ...]
[docs]
@dataclass(frozen=True, slots=True)
class StatusResult:
workspace_root: Path
config_path: Path
archledger_dir: Path
archive_dir: Path
sections_count: int
record_directories_count: int
storage_meta_path: Path
build_dir: Path
[docs]
@dataclass(frozen=True, slots=True)
class CheckFinding:
level: str
message: str
path: Path | None = None
[docs]
@dataclass(frozen=True, slots=True)
class CheckResult:
errors: tuple[CheckFinding, ...]
warnings: tuple[CheckFinding, ...]
[docs]
def has_failures(self, *, strict: bool) -> bool:
return bool(self.errors) or (strict and bool(self.warnings))
[docs]
@dataclass(frozen=True, slots=True)
class ArchiveResult:
record_id: str
source_path: Path
archive_path: Path
reason: str
already_archived: bool = False
[docs]
@dataclass(frozen=True, slots=True)
class DoctorRepair:
kind: str
message: str
path: Path | None = None
before: int | None = None
after: int | None = None
[docs]
@dataclass(frozen=True, slots=True)
class DoctorResult:
errors: tuple[CheckFinding, ...]
warnings: tuple[CheckFinding, ...]
repairs: tuple[DoctorRepair, ...]
storage_next_number_before: int
storage_next_number_after: int
highest_seen: int
missing_numbers: tuple[int, ...]
duplicate_numbers: tuple[int, ...]
# Re-export for backward compatibility
NumberedSourcePath = _NumberedSourcePath
[docs]
class ArchitectureRepository:
[docs]
def __init__(self, paths: ProjectPaths, config: ProjectConfig) -> None:
self.paths = paths
self.config = config
self._template_env = Environment(
loader=PackageLoader("archledger", "templates"),
autoescape=select_autoescape(
enabled_extensions=(),
default_for_string=False,
),
keep_trailing_newline=True,
trim_blocks=False,
lstrip_blocks=False,
)
[docs]
def init(self, *, overwrite: bool = False) -> InitResult:
created_paths: list[Path] = []
created_at = utc_now_iso()
for path in (
self.paths.archledger_dir,
self.paths.sections_dir,
self.paths.records_dir,
self.paths.archive_dir,
self.paths.build_dir,
):
if not path.exists():
created_paths.append(path)
ensure_dir(path)
for directory_name in sorted(set(RECORD_TYPE_TO_DIR.values())):
directory_path = self.paths.records_dir / directory_name
if not directory_path.exists():
created_paths.append(directory_path)
ensure_dir(directory_path)
for section_spec in MAJOR_SECTION_SPECS:
section_id = self.config.id_format.format(
section_spec.number,
segment=self._id_segment_for_section(section_spec),
)
section_path = (
self.paths.sections_dir / f"{section_id}{self.config.section_extension}"
)
if not section_path.exists() or overwrite:
write_text_atomic(
section_path,
_section_document(
section_spec,
self.config.source_format,
record_id=section_id,
source_schema_version=self.config.source_schema_version,
created_at=created_at,
),
)
created_paths.append(section_path)
if not self.paths.storage_meta_path.exists() or overwrite:
meta = default_storage_meta(self.config.project_uuid, __version__)
meta = dataclass_replace(
meta,
next_number=next_number_floor(
self.paths.archledger_dir,
meta.next_number,
source_extensions=(
self.config.section_extension,
self.config.record_extension,
),
id_format=self.config.id_format,
),
)
write_storage_meta(self.paths.storage_meta_path, meta)
created_paths.append(self.paths.storage_meta_path)
return InitResult(
workspace_root=self.paths.workspace_root,
config_path=self.paths.config_path,
archledger_dir=self.paths.archledger_dir,
created_paths=tuple(created_paths),
)
[docs]
def status(self) -> StatusResult:
read_storage_meta(self.paths.storage_meta_path)
section_count = len(
iter_source_files(
self.paths.sections_dir,
(self.config.section_extension,),
)
)
record_directories_count = sum(
1 for path in self.paths.records_dir.iterdir() if path.is_dir()
)
return StatusResult(
workspace_root=self.paths.workspace_root,
config_path=self.paths.config_path,
archledger_dir=self.paths.archledger_dir,
archive_dir=self.paths.archive_dir,
sections_count=section_count,
record_directories_count=record_directories_count,
storage_meta_path=self.paths.storage_meta_path,
build_dir=self.paths.build_dir,
)
[docs]
def create_record(
self,
kind: str,
title: str,
**kwargs: object,
) -> ArchitectureRecord:
normalized_kind = normalize_kind(kind)
self._ensure_storage_ready()
sequence_errors, _, missing_numbers, duplicate_numbers, _ = (
self._ledger_sequence_findings()
)
if missing_numbers or duplicate_numbers:
raise ValidationError(
"Ledger numbering is inconsistent. Run: archledger doctor --repair"
)
if sequence_errors:
raise ValidationError(
"Storage integrity checks failed. Run: archledger doctor --repair"
)
meta = read_storage_meta(self.paths.storage_meta_path)
number = next_number_floor(
self.paths.archledger_dir,
meta.next_number,
source_extensions=(
self.config.section_extension,
self.config.record_extension,
),
id_format=self.config.id_format,
)
segment = self._id_segment_for_kind(normalized_kind)
record_id = self.config.id_format.format(number, segment=segment)
filename = f"{record_id}{self.config.record_extension}"
target_dir = self.paths.records_dir / RECORD_TYPE_TO_DIR[normalized_kind]
target_path = target_dir / filename
while target_path.exists():
number += 1
record_id = self.config.id_format.format(number, segment=segment)
filename = f"{record_id}{self.config.record_extension}"
target_path = target_dir / filename
order = self._next_order(normalized_kind)
created_at = utc_now_iso()
template_name = record_template_name_for_source_format(
normalized_kind,
self.config.source_format,
)
context = self._template_context(
normalized_kind,
title=title,
order=order,
created_at=created_at,
target_path=target_path,
**kwargs,
)
text = self._template_env.get_template(f"records/{template_name}").render(
**context
)
write_text_atomic(target_path, text)
self._write_counter(number + 1)
return self._load_record_from_path(target_path)
[docs]
def list_records(
self,
*,
include_draft: bool = False,
include_superseded: bool = False,
kind: str | None = None,
) -> list[ArchitectureRecord]:
self._ensure_storage_ready()
all_records = self._load_records(include_sections=False)
if kind is not None:
normalized_kind = normalize_kind(kind)
all_records = [
record for record in all_records if record.type == normalized_kind
]
visible_records = [
record
for record in all_records
if is_visible_status(
record.status,
include_draft=include_draft,
include_superseded=include_superseded,
)
]
return sorted(visible_records, key=record_sort_key)
[docs]
def load_all_records(
self,
*,
include_sections: bool = True,
) -> list[ArchitectureRecord]:
self._ensure_storage_ready()
records = [
self._load_record_from_path(path) for path in self._all_record_paths()
]
if include_sections:
return sorted(records, key=record_sort_key)
return sorted(
[record for record in records if record.type != "section"],
key=record_sort_key,
)
[docs]
def get_record(self, record_id: str) -> ArchitectureRecord:
self._ensure_storage_ready()
for path in self._all_record_paths():
if path.stem != record_id:
continue
return self._load_record_from_path(path)
raise ValidationError(f"Record not found: {record_id}")
[docs]
def check(
self,
*,
strict: bool = False,
) -> CheckResult:
del strict
self._ensure_storage_ready()
findings_errors: list[CheckFinding] = []
findings_warnings: list[CheckFinding] = []
loaded_records: list[ArchitectureRecord] = []
for path in self._all_record_paths(include_archive=True):
try:
record = self._load_record_from_path(path)
except FrontMatterError as exc:
findings_errors.append(CheckFinding("error", exc.message, path))
continue
except ValidationError as exc:
findings_errors.append(CheckFinding("error", exc.message, path))
continue
expected_segment = (
self._id_segment_for_metadata(record.metadata)
if self.config.id_segment_mode != "none"
else None
)
issues = validate_record(
record,
id_format=self.config.id_format,
expected_segment=expected_segment,
)
for issue in issues:
findings_errors.append(CheckFinding("error", issue, path))
source_errors, source_warnings = self._source_contract_findings(record)
findings_errors.extend(
CheckFinding("error", message, path) for message in source_errors
)
findings_warnings.extend(
CheckFinding("warning", message, path) for message in source_warnings
)
loaded_records.append(record)
for warning_message in content_warnings(record):
findings_warnings.append(CheckFinding("warning", warning_message, path))
if record.status == "archived" and not path.is_relative_to(
self.paths.archive_dir
):
findings_errors.append(
CheckFinding(
"error",
f"Archived record {record.id} is outside archive storage.",
path,
)
)
if (
path.is_relative_to(self.paths.archive_dir)
and record.status != "archived"
):
findings_errors.append(
CheckFinding(
"error",
f"Archived file {record.id} must use status archived.",
path,
)
)
seen_ids: dict[str, Path] = {}
for record in loaded_records:
if record.id in seen_ids:
findings_errors.append(
CheckFinding(
"error",
f"Duplicate record ID: {record.id}",
record.path,
)
)
else:
seen_ids[record.id] = record.path
loaded_ids = set(seen_ids)
for record in loaded_records:
parent = record.metadata.get("parent")
if parent not in (None, "", "null") and str(parent) not in loaded_ids:
findings_errors.append(
CheckFinding(
"error",
f"Parent reference points to a missing record: {parent}",
record.path,
)
)
if record.status == "draft":
findings_warnings.append(
CheckFinding(
"warning",
f"Draft record {record.id} is excluded from the default build.",
record.path,
)
)
if record.type == "section" and path_in_archive(
record.path, self.paths.archive_dir
):
findings_errors.append(
CheckFinding(
"error",
f"Required section {record.id} must not be archived.",
record.path,
)
)
for section_spec in MAJOR_SECTION_SPECS:
section_id = self.config.id_format.format(
section_spec.number,
segment=self._id_segment_for_section(section_spec),
)
section_paths = [
self.paths.sections_dir / f"{section_id}{extension}"
for extension in self._known_source_extensions()
]
archived_section_paths = [
self.paths.archive_dir / "sections" / f"{section_id}{extension}"
for extension in self._known_source_extensions()
]
if not any(path.is_file() for path in section_paths):
findings_errors.append(
CheckFinding(
"error",
f"Required section file is missing: {section_spec.key}",
section_paths[0],
)
)
archived_section = next(
(path for path in archived_section_paths if path.is_file()),
None,
)
if archived_section is not None:
findings_errors.append(
CheckFinding(
"error",
f"Required section file is archived: {section_spec.key}",
archived_section,
)
)
if any(
record.type != "section"
and record.section == section_spec.key
and record.status in {"accepted", "proposed"}
and not path_in_archive(record.path, self.paths.archive_dir)
for record in loaded_records
):
continue
findings_warnings.append(
CheckFinding(
"warning",
f"Section {section_spec.key} has no accepted/proposed records.",
section_paths[0],
)
)
sequence_errors, sequence_warnings, _, _, _ = self._ledger_sequence_findings()
findings_errors.extend(sequence_errors)
findings_warnings.extend(sequence_warnings)
return CheckResult(
errors=tuple(findings_errors),
warnings=tuple(findings_warnings),
)
[docs]
def archive_record(self, record_id: str, *, reason: str = "") -> ArchiveResult:
self._ensure_storage_ready()
if not self.config.id_format.is_id(record_id):
raise ValidationError(f"Invalid ledger ID: {record_id}")
active_path: Path | None = None
for path in iter_source_files(
self.paths.records_dir,
self._known_source_extensions(),
):
if path.stem == record_id:
active_path = path
break
if active_path is None:
archived_path = self._find_archived_path(record_id)
if archived_path is not None:
return ArchiveResult(
record_id=record_id,
source_path=archived_path,
archive_path=archived_path,
reason=reason,
already_archived=True,
)
section_candidate = (
self.paths.sections_dir / f"{record_id}{self.config.section_extension}"
)
if section_candidate.is_file():
raise ValidationError(
f"Cannot archive required section {record_id}. "
"Sections are part of the arc42 skeleton."
)
raise ValidationError(f"Record not found: {record_id}")
metadata, body = read_front_matter_document(active_path)
now = utc_now_iso()
relative_active = active_path.relative_to(self.paths.archledger_dir)
archive_path = self.paths.archive_dir / relative_active
if archive_path.exists():
raise ValidationError(f"Archive target already exists: {archive_path}")
metadata = {
**metadata,
"status": "archived",
"archived_at": now,
"archived_reason": reason,
"archived_from": str(relative_active),
"updated_at": now,
}
ensure_dir(archive_path.parent)
write_front_matter_document(archive_path, metadata, body)
active_path.unlink()
return ArchiveResult(
record_id=record_id,
source_path=active_path,
archive_path=archive_path,
reason=reason,
)
[docs]
def doctor(self, *, repair: bool = False) -> DoctorResult:
self._ensure_storage_ready()
meta_before = read_storage_meta(self.paths.storage_meta_path)
repairs: list[DoctorRepair] = []
if repair and not self.paths.archive_dir.is_dir():
ensure_dir(self.paths.archive_dir)
repairs.append(
DoctorRepair(
kind="created_archive_dir",
message=f"Created archive directory: {self.paths.archive_dir}",
path=self.paths.archive_dir,
)
)
(
sequence_errors,
sequence_warnings,
missing_numbers,
duplicate_numbers,
highest_seen,
) = self._ledger_sequence_findings()
if repair and not duplicate_numbers:
for number in missing_numbers:
section_spec = next(
(spec for spec in MAJOR_SECTION_SPECS if spec.number == number),
None,
)
if section_spec is not None:
section_id = self.config.id_format.format(
section_spec.number,
segment=self._id_segment_for_section(section_spec),
)
section_path = (
self.paths.sections_dir
/ f"{section_id}{self.config.section_extension}"
)
if not section_path.is_file():
write_text_atomic(
section_path,
_section_document(
section_spec,
self.config.source_format,
record_id=section_id,
source_schema_version=self.config.source_schema_version,
),
)
repairs.append(
DoctorRepair(
kind="recreated_section",
message=(f"Recreated required section {section_id}"),
path=section_path,
)
)
continue
tombstone_path = self._write_archive_tombstone(number)
repairs.append(
DoctorRepair(
kind="created_tombstone",
message=(
f"Created archive tombstone"
f" {self._display_missing_id(number)}"
),
path=tombstone_path,
)
)
(
sequence_errors,
sequence_warnings,
missing_numbers,
duplicate_numbers,
highest_seen,
) = self._ledger_sequence_findings()
next_number_after = next_number_floor(
self.paths.archledger_dir,
meta_before.next_number,
source_extensions=(
self.config.section_extension,
self.config.record_extension,
),
id_format=self.config.id_format,
)
if repair and next_number_after != meta_before.next_number:
self._write_counter(next_number_after)
repairs.append(
DoctorRepair(
kind="recomputed_counter",
message=(
f"Recomputed storage.yaml next_number to {next_number_after}"
),
path=self.paths.storage_meta_path,
before=meta_before.next_number,
after=next_number_after,
)
)
meta_after = read_storage_meta(self.paths.storage_meta_path)
return DoctorResult(
errors=tuple(sequence_errors),
warnings=tuple(sequence_warnings),
repairs=tuple(repairs),
storage_next_number_before=meta_before.next_number,
storage_next_number_after=meta_after.next_number,
highest_seen=highest_seen,
missing_numbers=missing_numbers,
duplicate_numbers=duplicate_numbers,
)
def _find_archived_path(self, record_id: str) -> Path | None:
for path in iter_source_files(
self.paths.archive_dir,
self._known_source_extensions(),
):
if path.stem == record_id:
return path
return None
def _numbered_source_paths(
self, *, include_archive: bool
) -> list[NumberedSourcePath]:
return _collect_numbered_source_paths(
self.paths,
self.config,
self._known_source_extensions(),
include_archive=include_archive,
)
def _ledger_sequence_findings(
self,
) -> tuple[
list[CheckFinding],
list[CheckFinding],
tuple[int, ...],
tuple[int, ...],
int,
]:
result = _analyze_ledger_sequence(
self.paths,
self.config,
self._known_source_extensions(),
display_missing_id=self._display_missing_id,
)
errors = [CheckFinding(level, msg, path) for level, msg, path in result.errors]
warnings = [
CheckFinding(level, msg, path) for level, msg, path in result.warnings
]
return (
errors,
warnings,
result.missing_numbers,
result.duplicate_numbers,
result.highest_seen,
)
def _write_archive_tombstone(self, number: int) -> Path:
record_id = self.config.id_format.format(
number,
segment=self.config.id_segment_map.get(
"archive_tombstone",
self.config.id_default_segment,
),
)
path = (
self.paths.archive_dir
/ "tombstones"
/ f"{record_id}{self.config.record_extension}"
)
if path.exists():
return path
now = utc_now_iso()
metadata = {
"schema_version": self.config.source_schema_version,
"id": record_id,
"type": "archive_tombstone",
"title": f"Archived placeholder for missing ledger ID {record_id}",
"status": "archived",
"section": "risks_and_technical_debt",
"order": number,
"date": now[:10],
"body_format": self.config.source_format,
"created_at": now,
"updated_at": now,
"archived_at": now,
"archived_reason": (
"Created by archledger doctor --repair for a missing ledger number."
),
}
body = (
"This tombstone preserves a ledger number whose original source fragment "
"is no longer present. It was created automatically by "
"`archledger doctor --repair`.\n"
)
ensure_dir(path.parent)
write_front_matter_document(path, metadata, body)
return path
def _id_segment_for_kind(self, kind: str) -> str:
return validate_id_segment(
self.config.id_segment_map.get(kind, self.config.id_default_segment)
)
def _id_segment_for_section(self, section_spec: SectionSpec) -> str:
del section_spec
return validate_id_segment(
self.config.id_segment_map.get("section", self.config.id_default_segment)
)
def _id_segment_for_metadata(self, metadata: dict[str, object]) -> str:
return id_segment_for_metadata(
metadata,
default_segment=self.config.id_default_segment,
segment_map=self.config.id_segment_map,
)
def _display_missing_id(self, number: int) -> str:
if self.config.id_segment_mode == "none":
return self.config.id_format.format(number)
return f"{self.config.id_prefix}_<segment>_{number:0{self.config.id_width}d}"
def _template_context(
self,
kind: str,
*,
title: str,
order: int,
created_at: str,
target_path: Path,
**kwargs: object,
) -> dict[str, object]:
spec = RECORD_TYPES[kind]
status = kwargs.get("status", spec.default_status)
section = kwargs.get("section") or RECORD_TYPE_TO_DEFAULT_SECTION[kind]
parent = kwargs.get("parent")
if kind == "diagram" and not kwargs.get("diagram_type"):
kwargs = {**kwargs, "diagram_type": self.config.diagram_default_type}
context: dict[str, object] = {
"schema_version": self.config.source_schema_version,
"id": target_path.stem,
"type": kind,
"title": title,
"status": status,
"section": section,
"order": order,
"created_at": created_at,
"updated_at": created_at,
"date": created_at[:10],
"body_format": self.config.source_format,
"parent": "null" if parent in (None, "") else parent,
"level": kwargs.get("level", spec.default_level),
}
context.update(
spec.context_factory(
RecordContextInput(
title=title,
status=str(status),
section=str(section),
parent=None if parent in (None, "") else str(parent),
kwargs=kwargs,
)
)
)
return context
def _next_order(self, kind: str) -> int:
existing_orders = [
record.order
for record in self._load_records(include_sections=False)
if record.type == kind
]
return (max(existing_orders) + 10) if existing_orders else 10
def _load_records(self, *, include_sections: bool) -> list[ArchitectureRecord]:
records = [
self._load_record_from_path(path) for path in self._all_record_paths()
]
if include_sections:
return records
return [record for record in records if record.type != "section"]
def _all_record_paths(self, *, include_archive: bool = False) -> list[Path]:
paths = [
*iter_source_files(
self.paths.sections_dir,
self._known_source_extensions(),
),
*iter_source_files(
self.paths.records_dir,
self._known_source_extensions(),
),
]
if include_archive:
paths.extend(
iter_source_files(
self.paths.archive_dir,
self._known_source_extensions(),
)
)
return paths
def _load_record_from_path(self, path: Path) -> ArchitectureRecord:
metadata, body = read_front_matter_document(path)
missing_fields = [
field for field in REQUIRED_RECORD_FIELDS if field not in metadata
]
if missing_fields:
missing = ", ".join(missing_fields)
raise ValidationError(f"Missing required key(s): {missing}")
title = metadata["title"]
status = metadata["status"]
section = metadata["section"]
order = metadata["order"]
record_type = metadata["type"]
record_id = metadata["id"]
required_strings = (title, status, section, record_type, record_id)
if not all(isinstance(value, str) for value in required_strings):
raise ValidationError("Required string fields must be strings.")
if isinstance(order, bool) or not isinstance(order, int):
raise ValidationError("Required key order must be an integer.")
record = ArchitectureRecord(
id=cast(str, record_id),
type=cast(str, record_type),
title=cast(str, title),
status=cast(str, status),
section=cast(str, section),
order=order,
path=path,
metadata=metadata,
body=body,
source_refs=normalize_source_refs(
cast(str, record_id),
metadata.get("source_refs"),
workspace_root=self.paths.workspace_root,
)[0],
)
return record
def _ensure_storage_ready(self) -> None:
if not self.paths.storage_meta_path.is_file():
raise StorageError(
"Missing storage metadata file: "
f"{self.paths.storage_meta_path}. Run: archledger init"
)
if not self.paths.sections_dir.is_dir() or not self.paths.records_dir.is_dir():
raise StorageError(
"archledger storage layout is incomplete. Run: archledger init"
)
def _write_counter(self, next_number: int) -> None:
current_meta = read_storage_meta(self.paths.storage_meta_path)
write_storage_meta(
self.paths.storage_meta_path,
dataclass_replace(
current_meta,
next_number=max(
next_number_floor(
self.paths.archledger_dir,
current_meta.next_number,
source_extensions=(
self.config.section_extension,
self.config.record_extension,
),
id_format=self.config.id_format,
),
next_number,
),
),
)
def _source_contract_findings(
self,
record: ArchitectureRecord,
) -> tuple[list[str], list[str]]:
errors: list[str] = []
warnings: list[str] = []
source_format = self.config.source_format
config_version = self.config.config_version
schema_version = record.metadata.get("schema_version")
body_format = record.metadata.get("body_format")
date = record.metadata.get("date")
if schema_version is None:
if config_version >= 4:
errors.append(f"Record {record.id} is missing schema_version.")
elif isinstance(schema_version, bool) or not isinstance(schema_version, int):
errors.append(f"Record {record.id} schema_version must be an integer.")
elif schema_version != self.config.source_schema_version:
message = (
f"Record {record.id} schema_version {schema_version} does not match "
f"source.schema_version {self.config.source_schema_version}."
)
if config_version >= 4:
errors.append(message)
else:
warnings.append(message)
if date is None:
if config_version >= 4:
errors.append(f"Record {record.id} is missing date.")
elif not isinstance(date, str) or not date.strip():
errors.append(f"Record {record.id} date must be a non-empty string.")
if body_format is None:
if config_version >= 4:
errors.append(f"Record {record.id} is missing body_format.")
elif not isinstance(body_format, str):
errors.append(f"Record {record.id} body_format must be a string.")
else:
normalized_body_format = body_format.strip().lower()
if normalized_body_format not in VALID_BODY_FORMATS:
errors.append(
f"Record {record.id} body_format must be one of: "
+ ", ".join(sorted(VALID_BODY_FORMATS))
+ "."
)
elif normalized_body_format != source_format:
message = (
f"Record {record.id} body_format {normalized_body_format!r} does "
f"not match source format {source_format!r}."
)
if config_version >= 4:
errors.append(message)
else:
warnings.append(message)
_, source_ref_warnings = normalize_source_refs(
record.id,
record.metadata.get("source_refs"),
workspace_root=self.paths.workspace_root,
)
warnings.extend(source_ref_warnings)
return errors, warnings
def _known_source_extensions(self) -> tuple[str, ...]:
return known_source_extensions(self.config)
def _section_document(
section_spec: SectionSpec,
source_format: str,
*,
record_id: str,
source_schema_version: int = CURRENT_SOURCE_SCHEMA_VERSION,
created_at: str | None = None,
) -> str:
timestamp = utc_now_iso() if created_at is None else created_at
lines = [
"---",
f"schema_version: {source_schema_version}",
f"id: {record_id}",
"type: section",
f"section: {section_spec.key}",
f"title: {section_spec.title}",
f"order: {section_spec.order}",
"status: accepted",
f'date: "{timestamp[:10]}"',
f"body_format: {source_format}",
f'created_at: "{timestamp}"',
f'updated_at: "{timestamp}"',
"---",
"",
section_body_placeholder_for_source_format(source_format),
"",
]
return "\n".join(lines)
[docs]
def path_in_archive(path: Path, archive_dir: Path) -> bool:
return is_relative_to(path, archive_dir)