Les enseignants ont besoin de moyennes à jour immédiatement après la publication ou modification des notes, sans attendre un batch nocturne. Le système recalcule via Domain Events synchrones : statistiques d'évaluation (min/max/moyenne/médiane), moyennes matières pondérées (normalisation /20), et moyenne générale par élève. Les résultats sont stockés dans des tables dénormalisées avec cache Redis (TTL 5 min). Trois endpoints API exposent les données avec contrôle d'accès par rôle. Une commande console permet le backfill des données historiques au déploiement.
205 lines
7.5 KiB
Python
205 lines
7.5 KiB
Python
"""Tests for analyze_sources.py"""
|
|
|
|
import json
|
|
import os
|
|
import tempfile
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
# Add parent dir to path so we can import the script
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from analyze_sources import (
|
|
resolve_inputs,
|
|
detect_doc_type,
|
|
suggest_groups,
|
|
analyze,
|
|
INCLUDE_EXTENSIONS,
|
|
SKIP_DIRS,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_dir():
|
|
"""Create a temp directory with sample files."""
|
|
with tempfile.TemporaryDirectory() as d:
|
|
# Create sample files
|
|
(Path(d) / "product-brief-foo.md").write_text("# Product Brief\nContent here")
|
|
(Path(d) / "product-brief-foo-discovery-notes.md").write_text("# Discovery\nNotes")
|
|
(Path(d) / "architecture-doc.md").write_text("# Architecture\nDesign here")
|
|
(Path(d) / "research-report.md").write_text("# Research\nFindings")
|
|
(Path(d) / "random.txt").write_text("Some text content")
|
|
(Path(d) / "image.png").write_bytes(b"\x89PNG")
|
|
# Create a subdirectory with more files
|
|
sub = Path(d) / "subdir"
|
|
sub.mkdir()
|
|
(sub / "prd-v2.md").write_text("# PRD\nRequirements")
|
|
# Create a skip directory
|
|
skip = Path(d) / "node_modules"
|
|
skip.mkdir()
|
|
(skip / "junk.md").write_text("Should be skipped")
|
|
yield d
|
|
|
|
|
|
class TestResolveInputs:
|
|
def test_single_file(self, temp_dir):
|
|
f = str(Path(temp_dir) / "product-brief-foo.md")
|
|
result = resolve_inputs([f])
|
|
assert len(result) == 1
|
|
assert result[0].name == "product-brief-foo.md"
|
|
|
|
def test_folder_recursion(self, temp_dir):
|
|
result = resolve_inputs([temp_dir])
|
|
names = {f.name for f in result}
|
|
assert "product-brief-foo.md" in names
|
|
assert "prd-v2.md" in names
|
|
assert "random.txt" in names
|
|
|
|
def test_folder_skips_excluded_dirs(self, temp_dir):
|
|
result = resolve_inputs([temp_dir])
|
|
names = {f.name for f in result}
|
|
assert "junk.md" not in names
|
|
|
|
def test_folder_skips_non_text_files(self, temp_dir):
|
|
result = resolve_inputs([temp_dir])
|
|
names = {f.name for f in result}
|
|
assert "image.png" not in names
|
|
|
|
def test_glob_pattern(self, temp_dir):
|
|
pattern = str(Path(temp_dir) / "product-brief-*.md")
|
|
result = resolve_inputs([pattern])
|
|
assert len(result) == 2
|
|
names = {f.name for f in result}
|
|
assert "product-brief-foo.md" in names
|
|
assert "product-brief-foo-discovery-notes.md" in names
|
|
|
|
def test_deduplication(self, temp_dir):
|
|
f = str(Path(temp_dir) / "product-brief-foo.md")
|
|
result = resolve_inputs([f, f, f])
|
|
assert len(result) == 1
|
|
|
|
def test_mixed_inputs(self, temp_dir):
|
|
file_path = str(Path(temp_dir) / "architecture-doc.md")
|
|
folder_path = str(Path(temp_dir) / "subdir")
|
|
result = resolve_inputs([file_path, folder_path])
|
|
names = {f.name for f in result}
|
|
assert "architecture-doc.md" in names
|
|
assert "prd-v2.md" in names
|
|
|
|
def test_nonexistent_path(self):
|
|
result = resolve_inputs(["/nonexistent/path/file.md"])
|
|
assert len(result) == 0
|
|
|
|
|
|
class TestDetectDocType:
|
|
@pytest.mark.parametrize("filename,expected", [
|
|
("product-brief-foo.md", "product-brief"),
|
|
("product_brief_bar.md", "product-brief"),
|
|
("foo-discovery-notes.md", "discovery-notes"),
|
|
("foo-discovery_notes.md", "discovery-notes"),
|
|
("architecture-overview.md", "architecture-doc"),
|
|
("my-prd.md", "prd"),
|
|
("research-report-q4.md", "research-report"),
|
|
("foo-distillate.md", "distillate"),
|
|
("changelog.md", "changelog"),
|
|
("readme.md", "readme"),
|
|
("api-spec.md", "specification"),
|
|
("design-doc-v2.md", "design-doc"),
|
|
("meeting-notes-2026.md", "meeting-notes"),
|
|
("brainstorm-session.md", "brainstorming"),
|
|
("user-interview-notes.md", "interview-notes"),
|
|
("random-file.md", "unknown"),
|
|
])
|
|
def test_detection(self, filename, expected):
|
|
assert detect_doc_type(filename) == expected
|
|
|
|
|
|
class TestSuggestGroups:
|
|
def test_groups_brief_with_discovery_notes(self, temp_dir):
|
|
files = [
|
|
Path(temp_dir) / "product-brief-foo.md",
|
|
Path(temp_dir) / "product-brief-foo-discovery-notes.md",
|
|
]
|
|
groups = suggest_groups(files)
|
|
# Should produce one group with both files
|
|
paired = [g for g in groups if len(g["files"]) > 1]
|
|
assert len(paired) == 1
|
|
filenames = {f["filename"] for f in paired[0]["files"]}
|
|
assert "product-brief-foo.md" in filenames
|
|
assert "product-brief-foo-discovery-notes.md" in filenames
|
|
|
|
def test_standalone_files(self, temp_dir):
|
|
files = [
|
|
Path(temp_dir) / "architecture-doc.md",
|
|
Path(temp_dir) / "research-report.md",
|
|
]
|
|
groups = suggest_groups(files)
|
|
assert len(groups) == 2
|
|
for g in groups:
|
|
assert len(g["files"]) == 1
|
|
|
|
def test_mixed_grouped_and_standalone(self, temp_dir):
|
|
files = [
|
|
Path(temp_dir) / "product-brief-foo.md",
|
|
Path(temp_dir) / "product-brief-foo-discovery-notes.md",
|
|
Path(temp_dir) / "architecture-doc.md",
|
|
]
|
|
groups = suggest_groups(files)
|
|
paired = [g for g in groups if len(g["files"]) > 1]
|
|
standalone = [g for g in groups if len(g["files"]) == 1]
|
|
assert len(paired) == 1
|
|
assert len(standalone) == 1
|
|
|
|
|
|
class TestAnalyze:
|
|
def test_basic_analysis(self, temp_dir):
|
|
f = str(Path(temp_dir) / "product-brief-foo.md")
|
|
output_file = str(Path(temp_dir) / "output.json")
|
|
analyze([f], output_file)
|
|
result = json.loads(Path(output_file).read_text())
|
|
assert result["status"] == "ok"
|
|
assert result["summary"]["total_files"] == 1
|
|
assert result["files"][0]["doc_type"] == "product-brief"
|
|
assert result["files"][0]["estimated_tokens"] > 0
|
|
|
|
def test_routing_single_small_input(self, temp_dir):
|
|
f = str(Path(temp_dir) / "product-brief-foo.md")
|
|
output_file = str(Path(temp_dir) / "output.json")
|
|
analyze([f], output_file)
|
|
result = json.loads(Path(output_file).read_text())
|
|
assert result["routing"]["recommendation"] == "single"
|
|
|
|
def test_routing_fanout_many_files(self, temp_dir):
|
|
# Create enough files to trigger fan-out (> 3 files)
|
|
for i in range(5):
|
|
(Path(temp_dir) / f"doc-{i}.md").write_text("x" * 1000)
|
|
output_file = str(Path(temp_dir) / "output.json")
|
|
analyze([temp_dir], output_file)
|
|
result = json.loads(Path(output_file).read_text())
|
|
assert result["routing"]["recommendation"] == "fan-out"
|
|
|
|
def test_folder_analysis(self, temp_dir):
|
|
output_file = str(Path(temp_dir) / "output.json")
|
|
analyze([temp_dir], output_file)
|
|
result = json.loads(Path(output_file).read_text())
|
|
assert result["status"] == "ok"
|
|
assert result["summary"]["total_files"] >= 4 # at least the base files
|
|
assert len(result["groups"]) > 0
|
|
|
|
def test_no_files_found(self):
|
|
output_file = "/tmp/test_analyze_empty.json"
|
|
analyze(["/nonexistent/path"], output_file)
|
|
result = json.loads(Path(output_file).read_text())
|
|
assert result["status"] == "error"
|
|
os.unlink(output_file)
|
|
|
|
def test_stdout_output(self, temp_dir, capsys):
|
|
f = str(Path(temp_dir) / "product-brief-foo.md")
|
|
analyze([f])
|
|
captured = capsys.readouterr()
|
|
result = json.loads(captured.out)
|
|
assert result["status"] == "ok"
|