feat: Calculer automatiquement les moyennes après chaque saisie de notes
Les enseignants ont besoin de moyennes à jour immédiatement après la publication ou modification des notes, sans attendre un batch nocturne. Le système recalcule via Domain Events synchrones : statistiques d'évaluation (min/max/moyenne/médiane), moyennes matières pondérées (normalisation /20), et moyenne générale par élève. Les résultats sont stockés dans des tables dénormalisées avec cache Redis (TTL 5 min). Trois endpoints API exposent les données avec contrôle d'accès par rôle. Une commande console permet le backfill des données historiques au déploiement.
This commit is contained in:
@@ -0,0 +1,259 @@
|
||||
#!/usr/bin/env python3
|
||||
# /// script
|
||||
# requires-python = ">=3.9"
|
||||
# dependencies = []
|
||||
# ///
|
||||
"""Remove legacy module directories from _bmad/ after config migration.
|
||||
|
||||
After merge-config.py and merge-help-csv.py have migrated config data and
|
||||
deleted individual legacy files, this script removes the now-redundant
|
||||
directory trees. These directories contain skill files that are already
|
||||
installed at .claude/skills/ (or equivalent) — only the config files at
|
||||
_bmad/ root need to persist.
|
||||
|
||||
When --skills-dir is provided, the script verifies that every skill found
|
||||
in the legacy directories exists at the installed location before removing
|
||||
anything. Directories without skills (like _config/) are removed directly.
|
||||
|
||||
Exit codes: 0=success (including nothing to remove), 1=validation error, 2=runtime error
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import shutil
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Remove legacy module directories from _bmad/ after config migration."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--bmad-dir",
|
||||
required=True,
|
||||
help="Path to the _bmad/ directory",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--module-code",
|
||||
required=True,
|
||||
help="Module code being cleaned up (e.g. 'bmb')",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--also-remove",
|
||||
action="append",
|
||||
default=[],
|
||||
help="Additional directory names under _bmad/ to remove (repeatable)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--skills-dir",
|
||||
help="Path to .claude/skills/ — enables safety verification that skills "
|
||||
"are installed before removing legacy copies",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose",
|
||||
action="store_true",
|
||||
help="Print detailed progress to stderr",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def find_skill_dirs(base_path: str) -> list:
|
||||
"""Find directories that contain a SKILL.md file.
|
||||
|
||||
Walks the directory tree and returns the leaf directory name for each
|
||||
directory containing a SKILL.md. These are considered skill directories.
|
||||
|
||||
Returns:
|
||||
List of skill directory names (e.g. ['bmad-agent-builder', 'bmad-builder-setup'])
|
||||
"""
|
||||
skills = []
|
||||
root = Path(base_path)
|
||||
if not root.exists():
|
||||
return skills
|
||||
for skill_md in root.rglob("SKILL.md"):
|
||||
skills.append(skill_md.parent.name)
|
||||
return sorted(set(skills))
|
||||
|
||||
|
||||
def verify_skills_installed(
|
||||
bmad_dir: str, dirs_to_check: list, skills_dir: str, verbose: bool = False
|
||||
) -> list:
|
||||
"""Verify that skills in legacy directories exist at the installed location.
|
||||
|
||||
Scans each directory in dirs_to_check for skill folders (containing SKILL.md),
|
||||
then checks that a matching directory exists under skills_dir. Directories
|
||||
that contain no skills (like _config/) are silently skipped.
|
||||
|
||||
Returns:
|
||||
List of verified skill names.
|
||||
|
||||
Raises SystemExit(1) if any skills are missing from skills_dir.
|
||||
"""
|
||||
all_verified = []
|
||||
missing = []
|
||||
|
||||
for dirname in dirs_to_check:
|
||||
legacy_path = Path(bmad_dir) / dirname
|
||||
if not legacy_path.exists():
|
||||
continue
|
||||
|
||||
skill_names = find_skill_dirs(str(legacy_path))
|
||||
if not skill_names:
|
||||
if verbose:
|
||||
print(
|
||||
f"No skills found in {dirname}/ — skipping verification",
|
||||
file=sys.stderr,
|
||||
)
|
||||
continue
|
||||
|
||||
for skill_name in skill_names:
|
||||
installed_path = Path(skills_dir) / skill_name
|
||||
if installed_path.is_dir():
|
||||
all_verified.append(skill_name)
|
||||
if verbose:
|
||||
print(
|
||||
f"Verified: {skill_name} exists at {installed_path}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
else:
|
||||
missing.append(skill_name)
|
||||
if verbose:
|
||||
print(
|
||||
f"MISSING: {skill_name} not found at {installed_path}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
if missing:
|
||||
error_result = {
|
||||
"status": "error",
|
||||
"error": "Skills not found at installed location",
|
||||
"missing_skills": missing,
|
||||
"skills_dir": str(Path(skills_dir).resolve()),
|
||||
}
|
||||
print(json.dumps(error_result, indent=2))
|
||||
sys.exit(1)
|
||||
|
||||
return sorted(set(all_verified))
|
||||
|
||||
|
||||
def count_files(path: Path) -> int:
|
||||
"""Count all files recursively in a directory."""
|
||||
count = 0
|
||||
for item in path.rglob("*"):
|
||||
if item.is_file():
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
def cleanup_directories(
|
||||
bmad_dir: str, dirs_to_remove: list, verbose: bool = False
|
||||
) -> tuple:
|
||||
"""Remove specified directories under bmad_dir.
|
||||
|
||||
Returns:
|
||||
(removed, not_found, total_files_removed) tuple
|
||||
"""
|
||||
removed = []
|
||||
not_found = []
|
||||
total_files = 0
|
||||
|
||||
for dirname in dirs_to_remove:
|
||||
target = Path(bmad_dir) / dirname
|
||||
if not target.exists():
|
||||
not_found.append(dirname)
|
||||
if verbose:
|
||||
print(f"Not found (skipping): {target}", file=sys.stderr)
|
||||
continue
|
||||
|
||||
if not target.is_dir():
|
||||
if verbose:
|
||||
print(f"Not a directory (skipping): {target}", file=sys.stderr)
|
||||
not_found.append(dirname)
|
||||
continue
|
||||
|
||||
file_count = count_files(target)
|
||||
if verbose:
|
||||
print(
|
||||
f"Removing {target} ({file_count} files)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
try:
|
||||
shutil.rmtree(target)
|
||||
except OSError as e:
|
||||
error_result = {
|
||||
"status": "error",
|
||||
"error": f"Failed to remove {target}: {e}",
|
||||
"directories_removed": removed,
|
||||
"directories_failed": dirname,
|
||||
}
|
||||
print(json.dumps(error_result, indent=2))
|
||||
sys.exit(2)
|
||||
|
||||
removed.append(dirname)
|
||||
total_files += file_count
|
||||
|
||||
return removed, not_found, total_files
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
|
||||
bmad_dir = args.bmad_dir
|
||||
module_code = args.module_code
|
||||
|
||||
# Build the list of directories to remove
|
||||
dirs_to_remove = [module_code, "core"] + args.also_remove
|
||||
# Deduplicate while preserving order
|
||||
seen = set()
|
||||
unique_dirs = []
|
||||
for d in dirs_to_remove:
|
||||
if d not in seen:
|
||||
seen.add(d)
|
||||
unique_dirs.append(d)
|
||||
dirs_to_remove = unique_dirs
|
||||
|
||||
if args.verbose:
|
||||
print(f"Directories to remove: {dirs_to_remove}", file=sys.stderr)
|
||||
|
||||
# Safety check: verify skills are installed before removing
|
||||
verified_skills = None
|
||||
if args.skills_dir:
|
||||
if args.verbose:
|
||||
print(
|
||||
f"Verifying skills installed at {args.skills_dir}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
verified_skills = verify_skills_installed(
|
||||
bmad_dir, dirs_to_remove, args.skills_dir, args.verbose
|
||||
)
|
||||
|
||||
# Remove directories
|
||||
removed, not_found, total_files = cleanup_directories(
|
||||
bmad_dir, dirs_to_remove, args.verbose
|
||||
)
|
||||
|
||||
# Build result
|
||||
result = {
|
||||
"status": "success",
|
||||
"bmad_dir": str(Path(bmad_dir).resolve()),
|
||||
"directories_removed": removed,
|
||||
"directories_not_found": not_found,
|
||||
"files_removed_count": total_files,
|
||||
}
|
||||
|
||||
if args.skills_dir:
|
||||
result["safety_checks"] = {
|
||||
"skills_verified": True,
|
||||
"skills_dir": str(Path(args.skills_dir).resolve()),
|
||||
"verified_skills": verified_skills,
|
||||
}
|
||||
else:
|
||||
result["safety_checks"] = None
|
||||
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,408 @@
|
||||
#!/usr/bin/env python3
|
||||
# /// script
|
||||
# requires-python = ">=3.9"
|
||||
# dependencies = ["pyyaml"]
|
||||
# ///
|
||||
"""Merge module configuration into shared _bmad/config.yaml and config.user.yaml.
|
||||
|
||||
Reads a module.yaml definition and a JSON answers file, then writes or updates
|
||||
the shared config.yaml (core values at root + module section) and config.user.yaml
|
||||
(user_name, communication_language, plus any module variable with user_setting: true).
|
||||
Uses an anti-zombie pattern for the module section in config.yaml.
|
||||
|
||||
Legacy migration: when --legacy-dir is provided, reads old per-module config files
|
||||
from {legacy-dir}/{module-code}/config.yaml and {legacy-dir}/core/config.yaml.
|
||||
Matching values serve as fallback defaults (answers override them). After a
|
||||
successful merge, the legacy config.yaml files are deleted. Only the current
|
||||
module and core directories are touched — other module directories are left alone.
|
||||
|
||||
Exit codes: 0=success, 1=validation error, 2=runtime error
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
print("Error: pyyaml is required (PEP 723 dependency)", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Merge module config into shared _bmad/config.yaml with anti-zombie pattern."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--config-path",
|
||||
required=True,
|
||||
help="Path to the target _bmad/config.yaml file",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--module-yaml",
|
||||
required=True,
|
||||
help="Path to the module.yaml definition file",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--answers",
|
||||
required=True,
|
||||
help="Path to JSON file with collected answers",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--user-config-path",
|
||||
required=True,
|
||||
help="Path to the target _bmad/config.user.yaml file",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--legacy-dir",
|
||||
help="Path to _bmad/ directory to check for legacy per-module config files. "
|
||||
"Matching values are used as fallback defaults, then legacy files are deleted.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose",
|
||||
action="store_true",
|
||||
help="Print detailed progress to stderr",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def load_yaml_file(path: str) -> dict:
|
||||
"""Load a YAML file, returning empty dict if file doesn't exist."""
|
||||
file_path = Path(path)
|
||||
if not file_path.exists():
|
||||
return {}
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
content = yaml.safe_load(f)
|
||||
return content if content else {}
|
||||
|
||||
|
||||
def load_json_file(path: str) -> dict:
|
||||
"""Load a JSON file."""
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
# Keys that live at config root (shared across all modules)
|
||||
_CORE_KEYS = frozenset(
|
||||
{"user_name", "communication_language", "document_output_language", "output_folder"}
|
||||
)
|
||||
|
||||
|
||||
def load_legacy_values(
|
||||
legacy_dir: str, module_code: str, module_yaml: dict, verbose: bool = False
|
||||
) -> tuple[dict, dict, list]:
|
||||
"""Read legacy per-module config files and return core/module value dicts.
|
||||
|
||||
Reads {legacy_dir}/core/config.yaml and {legacy_dir}/{module_code}/config.yaml.
|
||||
Only returns values whose keys match the current schema (core keys or module.yaml
|
||||
variable definitions). Other modules' directories are not touched.
|
||||
|
||||
Returns:
|
||||
(legacy_core, legacy_module, files_found) where files_found lists paths read.
|
||||
"""
|
||||
legacy_core: dict = {}
|
||||
legacy_module: dict = {}
|
||||
files_found: list = []
|
||||
|
||||
# Read core legacy config
|
||||
core_path = Path(legacy_dir) / "core" / "config.yaml"
|
||||
if core_path.exists():
|
||||
core_data = load_yaml_file(str(core_path))
|
||||
files_found.append(str(core_path))
|
||||
for k, v in core_data.items():
|
||||
if k in _CORE_KEYS:
|
||||
legacy_core[k] = v
|
||||
if verbose:
|
||||
print(f"Legacy core config: {list(legacy_core.keys())}", file=sys.stderr)
|
||||
|
||||
# Read module legacy config
|
||||
mod_path = Path(legacy_dir) / module_code / "config.yaml"
|
||||
if mod_path.exists():
|
||||
mod_data = load_yaml_file(str(mod_path))
|
||||
files_found.append(str(mod_path))
|
||||
for k, v in mod_data.items():
|
||||
if k in _CORE_KEYS:
|
||||
# Core keys duplicated in module config — only use if not already set
|
||||
if k not in legacy_core:
|
||||
legacy_core[k] = v
|
||||
elif k in module_yaml and isinstance(module_yaml[k], dict):
|
||||
# Module-specific key that matches a current variable definition
|
||||
legacy_module[k] = v
|
||||
if verbose:
|
||||
print(
|
||||
f"Legacy module config: {list(legacy_module.keys())}", file=sys.stderr
|
||||
)
|
||||
|
||||
return legacy_core, legacy_module, files_found
|
||||
|
||||
|
||||
def apply_legacy_defaults(answers: dict, legacy_core: dict, legacy_module: dict) -> dict:
|
||||
"""Apply legacy values as fallback defaults under the answers.
|
||||
|
||||
Legacy values fill in any key not already present in answers.
|
||||
Explicit answers always win.
|
||||
"""
|
||||
merged = dict(answers)
|
||||
|
||||
if legacy_core:
|
||||
core = merged.get("core", {})
|
||||
filled_core = dict(legacy_core) # legacy as base
|
||||
filled_core.update(core) # answers override
|
||||
merged["core"] = filled_core
|
||||
|
||||
if legacy_module:
|
||||
mod = merged.get("module", {})
|
||||
filled_mod = dict(legacy_module) # legacy as base
|
||||
filled_mod.update(mod) # answers override
|
||||
merged["module"] = filled_mod
|
||||
|
||||
return merged
|
||||
|
||||
|
||||
def cleanup_legacy_configs(
|
||||
legacy_dir: str, module_code: str, verbose: bool = False
|
||||
) -> list:
|
||||
"""Delete legacy config.yaml files for this module and core only.
|
||||
|
||||
Returns list of deleted file paths.
|
||||
"""
|
||||
deleted = []
|
||||
for subdir in (module_code, "core"):
|
||||
legacy_path = Path(legacy_dir) / subdir / "config.yaml"
|
||||
if legacy_path.exists():
|
||||
if verbose:
|
||||
print(f"Deleting legacy config: {legacy_path}", file=sys.stderr)
|
||||
legacy_path.unlink()
|
||||
deleted.append(str(legacy_path))
|
||||
return deleted
|
||||
|
||||
|
||||
def extract_module_metadata(module_yaml: dict) -> dict:
|
||||
"""Extract non-variable metadata fields from module.yaml."""
|
||||
meta = {}
|
||||
for k in ("name", "description"):
|
||||
if k in module_yaml:
|
||||
meta[k] = module_yaml[k]
|
||||
meta["version"] = module_yaml.get("module_version") # null if absent
|
||||
if "default_selected" in module_yaml:
|
||||
meta["default_selected"] = module_yaml["default_selected"]
|
||||
return meta
|
||||
|
||||
|
||||
def apply_result_templates(
|
||||
module_yaml: dict, module_answers: dict, verbose: bool = False
|
||||
) -> dict:
|
||||
"""Apply result templates from module.yaml to transform raw answer values.
|
||||
|
||||
For each answer, if the corresponding variable definition in module.yaml has
|
||||
a 'result' field, replaces {value} in that template with the answer. Skips
|
||||
the template if the answer already contains '{project-root}' to prevent
|
||||
double-prefixing.
|
||||
"""
|
||||
transformed = {}
|
||||
for key, value in module_answers.items():
|
||||
var_def = module_yaml.get(key)
|
||||
if (
|
||||
isinstance(var_def, dict)
|
||||
and "result" in var_def
|
||||
and "{project-root}" not in str(value)
|
||||
):
|
||||
template = var_def["result"]
|
||||
transformed[key] = template.replace("{value}", str(value))
|
||||
if verbose:
|
||||
print(
|
||||
f"Applied result template for '{key}': {value} → {transformed[key]}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
else:
|
||||
transformed[key] = value
|
||||
return transformed
|
||||
|
||||
|
||||
def merge_config(
|
||||
existing_config: dict,
|
||||
module_yaml: dict,
|
||||
answers: dict,
|
||||
verbose: bool = False,
|
||||
) -> dict:
|
||||
"""Merge answers into config, applying anti-zombie pattern.
|
||||
|
||||
Args:
|
||||
existing_config: Current config.yaml contents (may be empty)
|
||||
module_yaml: The module definition
|
||||
answers: JSON with 'core' and/or 'module' keys
|
||||
verbose: Print progress to stderr
|
||||
|
||||
Returns:
|
||||
Updated config dict ready to write
|
||||
"""
|
||||
config = dict(existing_config)
|
||||
module_code = module_yaml.get("code")
|
||||
|
||||
if not module_code:
|
||||
print("Error: module.yaml must have a 'code' field", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Migrate legacy core: section to root
|
||||
if "core" in config and isinstance(config["core"], dict):
|
||||
if verbose:
|
||||
print("Migrating legacy 'core' section to root", file=sys.stderr)
|
||||
config.update(config.pop("core"))
|
||||
|
||||
# Strip user-only keys from config — they belong exclusively in config.user.yaml
|
||||
for key in _CORE_USER_KEYS:
|
||||
if key in config:
|
||||
if verbose:
|
||||
print(f"Removing user-only key '{key}' from config (belongs in config.user.yaml)", file=sys.stderr)
|
||||
del config[key]
|
||||
|
||||
# Write core values at root (global properties, not nested under "core")
|
||||
# Exclude user-only keys — those belong exclusively in config.user.yaml
|
||||
core_answers = answers.get("core")
|
||||
if core_answers:
|
||||
shared_core = {k: v for k, v in core_answers.items() if k not in _CORE_USER_KEYS}
|
||||
if shared_core:
|
||||
if verbose:
|
||||
print(f"Writing core config at root: {list(shared_core.keys())}", file=sys.stderr)
|
||||
config.update(shared_core)
|
||||
|
||||
# Anti-zombie: remove existing module section
|
||||
if module_code in config:
|
||||
if verbose:
|
||||
print(
|
||||
f"Removing existing '{module_code}' section (anti-zombie)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
del config[module_code]
|
||||
|
||||
# Build module section: metadata + variable values
|
||||
module_section = extract_module_metadata(module_yaml)
|
||||
module_answers = apply_result_templates(
|
||||
module_yaml, answers.get("module", {}), verbose
|
||||
)
|
||||
module_section.update(module_answers)
|
||||
|
||||
if verbose:
|
||||
print(
|
||||
f"Writing '{module_code}' section with keys: {list(module_section.keys())}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
config[module_code] = module_section
|
||||
|
||||
return config
|
||||
|
||||
|
||||
# Core keys that are always written to config.user.yaml
|
||||
_CORE_USER_KEYS = ("user_name", "communication_language")
|
||||
|
||||
|
||||
def extract_user_settings(module_yaml: dict, answers: dict) -> dict:
|
||||
"""Collect settings that belong in config.user.yaml.
|
||||
|
||||
Includes user_name and communication_language from core answers, plus any
|
||||
module variable whose definition contains user_setting: true.
|
||||
"""
|
||||
user_settings = {}
|
||||
|
||||
core_answers = answers.get("core", {})
|
||||
for key in _CORE_USER_KEYS:
|
||||
if key in core_answers:
|
||||
user_settings[key] = core_answers[key]
|
||||
|
||||
module_answers = answers.get("module", {})
|
||||
for var_name, var_def in module_yaml.items():
|
||||
if isinstance(var_def, dict) and var_def.get("user_setting") is True:
|
||||
if var_name in module_answers:
|
||||
user_settings[var_name] = module_answers[var_name]
|
||||
|
||||
return user_settings
|
||||
|
||||
|
||||
def write_config(config: dict, config_path: str, verbose: bool = False) -> None:
|
||||
"""Write config dict to YAML file, creating parent dirs as needed."""
|
||||
path = Path(config_path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if verbose:
|
||||
print(f"Writing config to {path}", file=sys.stderr)
|
||||
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
yaml.dump(
|
||||
config,
|
||||
f,
|
||||
default_flow_style=False,
|
||||
allow_unicode=True,
|
||||
sort_keys=False,
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
|
||||
# Load inputs
|
||||
module_yaml = load_yaml_file(args.module_yaml)
|
||||
if not module_yaml:
|
||||
print(f"Error: Could not load module.yaml from {args.module_yaml}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
answers = load_json_file(args.answers)
|
||||
existing_config = load_yaml_file(args.config_path)
|
||||
|
||||
if args.verbose:
|
||||
exists = Path(args.config_path).exists()
|
||||
print(f"Config file exists: {exists}", file=sys.stderr)
|
||||
if exists:
|
||||
print(f"Existing sections: {list(existing_config.keys())}", file=sys.stderr)
|
||||
|
||||
# Legacy migration: read old per-module configs as fallback defaults
|
||||
legacy_files_found = []
|
||||
if args.legacy_dir:
|
||||
module_code = module_yaml.get("code", "")
|
||||
legacy_core, legacy_module, legacy_files_found = load_legacy_values(
|
||||
args.legacy_dir, module_code, module_yaml, args.verbose
|
||||
)
|
||||
if legacy_core or legacy_module:
|
||||
answers = apply_legacy_defaults(answers, legacy_core, legacy_module)
|
||||
if args.verbose:
|
||||
print("Applied legacy values as fallback defaults", file=sys.stderr)
|
||||
|
||||
# Merge and write config.yaml
|
||||
updated_config = merge_config(existing_config, module_yaml, answers, args.verbose)
|
||||
write_config(updated_config, args.config_path, args.verbose)
|
||||
|
||||
# Merge and write config.user.yaml
|
||||
user_settings = extract_user_settings(module_yaml, answers)
|
||||
existing_user_config = load_yaml_file(args.user_config_path)
|
||||
updated_user_config = dict(existing_user_config)
|
||||
updated_user_config.update(user_settings)
|
||||
if user_settings:
|
||||
write_config(updated_user_config, args.user_config_path, args.verbose)
|
||||
|
||||
# Legacy cleanup: delete old per-module config files
|
||||
legacy_deleted = []
|
||||
if args.legacy_dir:
|
||||
legacy_deleted = cleanup_legacy_configs(
|
||||
args.legacy_dir, module_yaml["code"], args.verbose
|
||||
)
|
||||
|
||||
# Output result summary as JSON
|
||||
module_code = module_yaml["code"]
|
||||
result = {
|
||||
"status": "success",
|
||||
"config_path": str(Path(args.config_path).resolve()),
|
||||
"user_config_path": str(Path(args.user_config_path).resolve()),
|
||||
"module_code": module_code,
|
||||
"core_updated": bool(answers.get("core")),
|
||||
"module_keys": list(updated_config.get(module_code, {}).keys()),
|
||||
"user_keys": list(user_settings.keys()),
|
||||
"legacy_configs_found": legacy_files_found,
|
||||
"legacy_configs_deleted": legacy_deleted,
|
||||
}
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,220 @@
|
||||
#!/usr/bin/env python3
|
||||
# /// script
|
||||
# requires-python = ">=3.9"
|
||||
# dependencies = []
|
||||
# ///
|
||||
"""Merge module help entries into shared _bmad/module-help.csv.
|
||||
|
||||
Reads a source CSV with module help entries and merges them into a target CSV.
|
||||
Uses an anti-zombie pattern: all existing rows matching the source module code
|
||||
are removed before appending fresh rows.
|
||||
|
||||
Legacy cleanup: when --legacy-dir and --module-code are provided, deletes old
|
||||
per-module module-help.csv files from {legacy-dir}/{module-code}/ and
|
||||
{legacy-dir}/core/. Only the current module and core are touched.
|
||||
|
||||
Exit codes: 0=success, 1=validation error, 2=runtime error
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import json
|
||||
import sys
|
||||
from io import StringIO
|
||||
from pathlib import Path
|
||||
|
||||
# CSV header for module-help.csv
|
||||
HEADER = [
|
||||
"module",
|
||||
"agent-name",
|
||||
"skill-name",
|
||||
"display-name",
|
||||
"menu-code",
|
||||
"capability",
|
||||
"args",
|
||||
"description",
|
||||
"phase",
|
||||
"after",
|
||||
"before",
|
||||
"required",
|
||||
"output-location",
|
||||
"outputs",
|
||||
"", # trailing empty column from trailing comma
|
||||
]
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Merge module help entries into shared _bmad/module-help.csv with anti-zombie pattern."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--target",
|
||||
required=True,
|
||||
help="Path to the target _bmad/module-help.csv file",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--source",
|
||||
required=True,
|
||||
help="Path to the source module-help.csv with entries to merge",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--legacy-dir",
|
||||
help="Path to _bmad/ directory to check for legacy per-module CSV files.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--module-code",
|
||||
help="Module code (required with --legacy-dir for scoping cleanup).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose",
|
||||
action="store_true",
|
||||
help="Print detailed progress to stderr",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def read_csv_rows(path: str) -> tuple[list[str], list[list[str]]]:
|
||||
"""Read CSV file returning (header, data_rows).
|
||||
|
||||
Returns empty header and rows if file doesn't exist.
|
||||
"""
|
||||
file_path = Path(path)
|
||||
if not file_path.exists():
|
||||
return [], []
|
||||
|
||||
with open(file_path, "r", encoding="utf-8", newline="") as f:
|
||||
content = f.read()
|
||||
|
||||
reader = csv.reader(StringIO(content))
|
||||
rows = list(reader)
|
||||
|
||||
if not rows:
|
||||
return [], []
|
||||
|
||||
return rows[0], rows[1:]
|
||||
|
||||
|
||||
def extract_module_codes(rows: list[list[str]]) -> set[str]:
|
||||
"""Extract unique module codes from data rows."""
|
||||
codes = set()
|
||||
for row in rows:
|
||||
if row and row[0].strip():
|
||||
codes.add(row[0].strip())
|
||||
return codes
|
||||
|
||||
|
||||
def filter_rows(rows: list[list[str]], module_code: str) -> list[list[str]]:
|
||||
"""Remove all rows matching the given module code."""
|
||||
return [row for row in rows if not row or row[0].strip() != module_code]
|
||||
|
||||
|
||||
def write_csv(path: str, header: list[str], rows: list[list[str]], verbose: bool = False) -> None:
|
||||
"""Write header + rows to CSV file, creating parent dirs as needed."""
|
||||
file_path = Path(path)
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if verbose:
|
||||
print(f"Writing {len(rows)} data rows to {path}", file=sys.stderr)
|
||||
|
||||
with open(file_path, "w", encoding="utf-8", newline="") as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(header)
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
|
||||
|
||||
def cleanup_legacy_csvs(
|
||||
legacy_dir: str, module_code: str, verbose: bool = False
|
||||
) -> list:
|
||||
"""Delete legacy per-module module-help.csv files for this module and core only.
|
||||
|
||||
Returns list of deleted file paths.
|
||||
"""
|
||||
deleted = []
|
||||
for subdir in (module_code, "core"):
|
||||
legacy_path = Path(legacy_dir) / subdir / "module-help.csv"
|
||||
if legacy_path.exists():
|
||||
if verbose:
|
||||
print(f"Deleting legacy CSV: {legacy_path}", file=sys.stderr)
|
||||
legacy_path.unlink()
|
||||
deleted.append(str(legacy_path))
|
||||
return deleted
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
|
||||
# Read source entries
|
||||
source_header, source_rows = read_csv_rows(args.source)
|
||||
if not source_rows:
|
||||
print(f"Error: No data rows found in source {args.source}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Determine module codes being merged
|
||||
source_codes = extract_module_codes(source_rows)
|
||||
if not source_codes:
|
||||
print("Error: Could not determine module code from source rows", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if args.verbose:
|
||||
print(f"Source module codes: {source_codes}", file=sys.stderr)
|
||||
print(f"Source rows: {len(source_rows)}", file=sys.stderr)
|
||||
|
||||
# Read existing target (may not exist)
|
||||
target_header, target_rows = read_csv_rows(args.target)
|
||||
target_existed = Path(args.target).exists()
|
||||
|
||||
if args.verbose:
|
||||
print(f"Target exists: {target_existed}", file=sys.stderr)
|
||||
if target_existed:
|
||||
print(f"Existing target rows: {len(target_rows)}", file=sys.stderr)
|
||||
|
||||
# Use source header if target doesn't exist or has no header
|
||||
header = target_header if target_header else (source_header if source_header else HEADER)
|
||||
|
||||
# Anti-zombie: remove all rows for each source module code
|
||||
filtered_rows = target_rows
|
||||
removed_count = 0
|
||||
for code in source_codes:
|
||||
before_count = len(filtered_rows)
|
||||
filtered_rows = filter_rows(filtered_rows, code)
|
||||
removed_count += before_count - len(filtered_rows)
|
||||
|
||||
if args.verbose and removed_count > 0:
|
||||
print(f"Removed {removed_count} existing rows (anti-zombie)", file=sys.stderr)
|
||||
|
||||
# Append source rows
|
||||
merged_rows = filtered_rows + source_rows
|
||||
|
||||
# Write result
|
||||
write_csv(args.target, header, merged_rows, args.verbose)
|
||||
|
||||
# Legacy cleanup: delete old per-module CSV files
|
||||
legacy_deleted = []
|
||||
if args.legacy_dir:
|
||||
if not args.module_code:
|
||||
print(
|
||||
"Error: --module-code is required when --legacy-dir is provided",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
legacy_deleted = cleanup_legacy_csvs(
|
||||
args.legacy_dir, args.module_code, args.verbose
|
||||
)
|
||||
|
||||
# Output result summary as JSON
|
||||
result = {
|
||||
"status": "success",
|
||||
"target_path": str(Path(args.target).resolve()),
|
||||
"target_existed": target_existed,
|
||||
"module_codes": sorted(source_codes),
|
||||
"rows_removed": removed_count,
|
||||
"rows_added": len(source_rows),
|
||||
"total_rows": len(merged_rows),
|
||||
"legacy_csvs_deleted": legacy_deleted,
|
||||
}
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user