vault backup: 2025-12-26 02:09:22
All checks were successful
Deploy Quartz site to GitHub Pages / build (push) Successful in 2m29s
All checks were successful
Deploy Quartz site to GitHub Pages / build (push) Successful in 2m29s
This commit is contained in:
527
stroma/quiz/utils/importer.py
Normal file
527
stroma/quiz/utils/importer.py
Normal file
@@ -0,0 +1,527 @@
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Tuple
|
||||
import django.db.utils
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from quiz.models import Course, Exam, Question, Option
|
||||
from quiz.utils.question_parser import parse_question_from_content, Node
|
||||
|
||||
|
||||
class ImportStats:
|
||||
"""Track import statistics by exam folder"""
|
||||
|
||||
def __init__(self):
|
||||
self.total_files = 0
|
||||
self.mcq_questions = 0
|
||||
self.non_mcq_skipped = 0
|
||||
self.questions_with_answers = 0
|
||||
self.questions_with_todo = 0
|
||||
self.created = 0
|
||||
self.updated = 0
|
||||
self.errors = 0
|
||||
self.by_folder = defaultdict(lambda: {
|
||||
'total': 0,
|
||||
'mcq': 0,
|
||||
'answered': 0,
|
||||
'todo': 0
|
||||
})
|
||||
|
||||
def has_changes(self) -> bool:
|
||||
"""Check if there were any actual changes"""
|
||||
return self.created > 0 or self.updated > 0 or self.errors > 0
|
||||
|
||||
def format_output(self, show_if_no_changes: bool = True) -> str:
|
||||
"""
|
||||
Format statistics for console output
|
||||
|
||||
Args:
|
||||
show_if_no_changes: If False, returns empty string when no changes
|
||||
"""
|
||||
if not show_if_no_changes and not self.has_changes():
|
||||
return ""
|
||||
|
||||
lines = []
|
||||
lines.append("\n" + "="*70)
|
||||
lines.append("QUESTION IMPORT STATISTICS")
|
||||
lines.append("="*70)
|
||||
lines.append(f"Total .md files found: {self.total_files}")
|
||||
lines.append(f"MCQ questions found: {self.mcq_questions}")
|
||||
lines.append(f"Non-MCQ skipped: {self.non_mcq_skipped}")
|
||||
lines.append(f"Questions with answers: {self.questions_with_answers}")
|
||||
lines.append(f"Questions with TODO: {self.questions_with_todo}")
|
||||
lines.append(f"Created in database: {self.created}")
|
||||
lines.append(f"Updated in database: {self.updated}")
|
||||
if self.errors > 0:
|
||||
lines.append(f"Errors: {self.errors}")
|
||||
|
||||
if self.mcq_questions > 0:
|
||||
completion_pct = (self.questions_with_answers / self.mcq_questions * 100)
|
||||
lines.append(f"Overall completion: {completion_pct:.1f}%")
|
||||
|
||||
lines.append("\n" + "-"*70)
|
||||
lines.append("COMPLETION BY EXAM FOLDER")
|
||||
lines.append("-"*70)
|
||||
|
||||
sorted_folders = sorted(self.by_folder.items())
|
||||
for folder, stats in sorted_folders:
|
||||
if stats['mcq'] > 0:
|
||||
pct = (stats['answered'] / stats['mcq'] * 100)
|
||||
lines.append(f"{folder:20} {stats['answered']:3}/{stats['mcq']:3} MCQ ({pct:5.1f}%)")
|
||||
|
||||
lines.append("="*70 + "\n")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
|
||||
def parse_markdown_question(file_path: Path, content: str) -> Tuple[bool, dict]:
|
||||
"""
|
||||
Parse a markdown file and extract question data using the new question_parser.
|
||||
|
||||
Returns:
|
||||
(is_mcq, question_data) where question_data contains:
|
||||
- text: question text
|
||||
- options: list of (letter, text) tuples
|
||||
- correct_answer: the correct answer letter(s)
|
||||
- has_answer: whether it has an answer (not TODO)
|
||||
- tags: list of tag strings
|
||||
- question_type: type of question (mcq, scq, matching, etc.)
|
||||
"""
|
||||
# Parse from content string (works for both test cases and real files)
|
||||
parsed = parse_question_from_content(content)
|
||||
|
||||
# Extract metadata
|
||||
metadata = parsed.metadata
|
||||
tags = metadata.get('tags', [])
|
||||
|
||||
# Check for question type in tags
|
||||
question_type = None
|
||||
is_question = False
|
||||
|
||||
for tag in tags:
|
||||
if 'frågetyp/' in tag:
|
||||
is_question = True
|
||||
if 'frågetyp/mcq' in tag:
|
||||
question_type = 'mcq'
|
||||
elif 'frågetyp/scq' in tag:
|
||||
question_type = 'scq'
|
||||
elif 'frågetyp/matching' in tag:
|
||||
question_type = 'matching'
|
||||
elif 'frågetyp/textalternativ' in tag:
|
||||
question_type = 'textalternativ'
|
||||
elif 'frågetyp/textfält' in tag:
|
||||
question_type = 'textfält'
|
||||
|
||||
if not is_question:
|
||||
return False, {}
|
||||
|
||||
# Handle matching questions separately
|
||||
if question_type == 'matching':
|
||||
return parse_matching_question_from_nodes(parsed.nodes, tags)
|
||||
|
||||
# Extract question text from first paragraph (skip images and special instructions)
|
||||
question_text = None
|
||||
for node in parsed.nodes:
|
||||
if node.type != "paragraph":
|
||||
continue
|
||||
text = node.text.strip()
|
||||
# Skip empty paragraphs
|
||||
if not text:
|
||||
continue
|
||||
|
||||
# Remove inline images from text first
|
||||
text = re.sub(r'!\[\[.*?\]\]', '', text).strip()
|
||||
|
||||
# Skip if paragraph was only an image reference
|
||||
if not text:
|
||||
continue
|
||||
|
||||
# Skip "Välj X alternativ" instructions
|
||||
if 'Välj' in text and 'alternativ' in text:
|
||||
continue
|
||||
|
||||
# Clean up bold markers
|
||||
text = text.replace('**', '')
|
||||
if text:
|
||||
question_text = text
|
||||
break
|
||||
|
||||
if not question_text:
|
||||
return True, {
|
||||
'text': None,
|
||||
'options': [],
|
||||
'correct_answer': '',
|
||||
'has_answer': False,
|
||||
'question_type': question_type,
|
||||
'tags': tags
|
||||
}
|
||||
|
||||
# Extract options from list nodes
|
||||
options_data = []
|
||||
|
||||
for node in parsed.nodes:
|
||||
if node.type != "list":
|
||||
continue
|
||||
for item in node.children:
|
||||
# Get the text of the list item
|
||||
if item.type != "list_item":
|
||||
continue
|
||||
item_text = item.text.strip()
|
||||
|
||||
# Match "A: text" or just "A"
|
||||
match = re.match(r'^([A-Z]):\s*(.*)$', item_text)
|
||||
if match:
|
||||
letter = match.group(1)
|
||||
text = match.group(2).strip()
|
||||
options_data.append((letter, text))
|
||||
elif re.match(r'^([A-Z])$', item_text):
|
||||
letter = item_text
|
||||
options_data.append((letter, ''))
|
||||
elif question_type in ['textalternativ', 'textfält']:
|
||||
# For text-based questions, use incrementing letters
|
||||
if not re.match(r'^[a-z]\)', item_text): # Skip sub-question markers
|
||||
letter = chr(ord('A') + len(options_data))
|
||||
options_data.append((letter, item_text))
|
||||
|
||||
# For text-based questions, options are optional
|
||||
if not options_data:
|
||||
options_data = [('A', '')]
|
||||
elif len(options_data) < 2 and question_type in ['mcq', 'scq']:
|
||||
return True, {
|
||||
'text': question_text,
|
||||
'options': options_data,
|
||||
'correct_answer': '',
|
||||
'has_answer': False,
|
||||
'question_type': question_type,
|
||||
'tags': tags
|
||||
}
|
||||
|
||||
# Extract answer from spoiler block
|
||||
correct_answer = None
|
||||
has_answer = False
|
||||
|
||||
for node in parsed.nodes:
|
||||
if node.type == "block_code" and node.attrs.get("info") == "spoiler-block:":
|
||||
answer_text = node.raw.strip()
|
||||
|
||||
# Check for TODO
|
||||
if 'TODO' in answer_text.upper():
|
||||
has_answer = False
|
||||
else:
|
||||
has_answer = True
|
||||
|
||||
# For MCQ/SCQ: Extract capital letters
|
||||
if question_type in ['mcq', 'scq']:
|
||||
letters = re.findall(r'\b([A-Z])\b', answer_text)
|
||||
if letters:
|
||||
correct_answer = ','.join(sorted(set(letters)))
|
||||
else:
|
||||
# For text-based questions: Store the full answer text
|
||||
correct_answer = answer_text[:200] # Limit to 200 chars for database field
|
||||
|
||||
break
|
||||
|
||||
return True, {
|
||||
'text': question_text,
|
||||
'options': options_data,
|
||||
'correct_answer': correct_answer,
|
||||
'has_answer': has_answer,
|
||||
'question_type': question_type,
|
||||
'tags': tags
|
||||
}
|
||||
|
||||
|
||||
def parse_matching_question_from_nodes(nodes: list[Node], tags: list) -> Tuple[bool, dict]:
|
||||
"""
|
||||
Parse matching question from parsed nodes.
|
||||
|
||||
Expected format:
|
||||
- Two consecutive bullet lists
|
||||
- First list = left column items (rows)
|
||||
- Second list = top row items (columns)
|
||||
- Answer format: "LeftItem: TopItem" pairs
|
||||
|
||||
Returns:
|
||||
(is_matching, question_data)
|
||||
"""
|
||||
# Extract question text
|
||||
question_text = None
|
||||
for node in nodes:
|
||||
if node.type == "paragraph":
|
||||
text = node.text.strip()
|
||||
# Remove inline images
|
||||
text = re.sub(r'!\[\[.*?\]\]', '', text).strip()
|
||||
# Skip if empty after removing images
|
||||
if not text:
|
||||
continue
|
||||
question_text = text.replace('**', '')
|
||||
break
|
||||
|
||||
if not question_text:
|
||||
return True, {
|
||||
'text': None,
|
||||
'left_items': [],
|
||||
'top_items': [],
|
||||
'correct_pairs': [],
|
||||
'has_answer': False,
|
||||
'question_type': 'matching',
|
||||
'tags': tags
|
||||
}
|
||||
|
||||
# Extract two consecutive lists
|
||||
left_items = []
|
||||
top_items = []
|
||||
list_nodes = [node for node in nodes if node.type == "list"]
|
||||
|
||||
if len(list_nodes) >= 2:
|
||||
# First list = left items
|
||||
for item in list_nodes[0].children:
|
||||
if item.type == "list_item":
|
||||
left_items.append(item.text.strip())
|
||||
|
||||
# Second list = top items
|
||||
for item in list_nodes[1].children:
|
||||
if item.type == "list_item":
|
||||
top_items.append(item.text.strip())
|
||||
|
||||
# Parse answer from spoiler block
|
||||
correct_pairs = []
|
||||
has_answer = False
|
||||
|
||||
for node in nodes:
|
||||
if node.type == "block_code" and node.attrs.get("info") == "spoiler-block:":
|
||||
answer_text = node.raw.strip()
|
||||
|
||||
# Check for TODO
|
||||
if 'TODO' in answer_text.upper():
|
||||
has_answer = False
|
||||
break
|
||||
has_answer = True
|
||||
# Parse "Item: Match" format
|
||||
answer_lines = answer_text.split('\n')
|
||||
for line in answer_lines:
|
||||
line = line.strip()
|
||||
if ':' not in line:
|
||||
continue
|
||||
left_part, top_part = line.split(':', 1)
|
||||
left_part = left_part.strip()
|
||||
top_part = top_part.strip()
|
||||
|
||||
# Find indices
|
||||
left_idx = None
|
||||
top_idx = None
|
||||
|
||||
for idx, item in enumerate(left_items):
|
||||
if left_part.lower() in item.lower() or item.lower() in left_part.lower():
|
||||
left_idx = idx
|
||||
break
|
||||
|
||||
for idx, item in enumerate(top_items):
|
||||
if top_part.lower() in item.lower() or item.lower() in top_part.lower():
|
||||
top_idx = idx
|
||||
break
|
||||
|
||||
if left_idx is not None and top_idx is not None:
|
||||
correct_pairs.append([left_idx, top_idx])
|
||||
break
|
||||
|
||||
return True, {
|
||||
'text': question_text,
|
||||
'left_items': left_items,
|
||||
'top_items': top_items,
|
||||
'correct_pairs': correct_pairs,
|
||||
'has_answer': has_answer,
|
||||
'question_type': 'matching',
|
||||
'tags': tags
|
||||
}
|
||||
|
||||
|
||||
|
||||
def import_question_file(file_path: Path, base_path: Path, stats: ImportStats, force: bool = False):
|
||||
"""
|
||||
Import a single question file, checking modification time to avoid unnecessary updates.
|
||||
|
||||
Args:
|
||||
file_path: Path to the question file
|
||||
base_path: Base path for relative calculations
|
||||
stats: ImportStats object to track statistics
|
||||
force: If True, import regardless of mtime (for initial import)
|
||||
"""
|
||||
try:
|
||||
# Get file modification time
|
||||
file_mtime = file_path.stat().st_mtime
|
||||
|
||||
# Calculate path relative to project root
|
||||
project_root = settings.BASE_DIR.parent
|
||||
try:
|
||||
file_path_str = str(file_path.relative_to(project_root))
|
||||
except ValueError:
|
||||
file_path_str = str(file_path.relative_to(base_path))
|
||||
|
||||
# Check if file has changed by comparing mtime
|
||||
if not force:
|
||||
try:
|
||||
existing_question = Question.objects.get(file_path=file_path_str)
|
||||
if existing_question.file_mtime and existing_question.file_mtime >= file_mtime:
|
||||
# File hasn't changed, skip
|
||||
return 'skipped_unchanged'
|
||||
except Question.DoesNotExist:
|
||||
pass # New file, will import
|
||||
|
||||
content = file_path.read_text(encoding='utf-8')
|
||||
is_mcq, question_data = parse_markdown_question(file_path, content)
|
||||
|
||||
# Track folder stats
|
||||
relative_path = file_path.relative_to(base_path)
|
||||
folder_name = relative_path.parts[0] if len(relative_path.parts) > 1 else 'root'
|
||||
stats.by_folder[folder_name]['total'] += 1
|
||||
|
||||
if not is_mcq:
|
||||
stats.non_mcq_skipped += 1
|
||||
return 'skipped_not_mcq'
|
||||
|
||||
stats.mcq_questions += 1
|
||||
stats.by_folder[folder_name]['mcq'] += 1
|
||||
|
||||
if not question_data or not question_data.get('text'):
|
||||
stats.non_mcq_skipped += 1
|
||||
return 'skipped_invalid'
|
||||
|
||||
if not question_data['has_answer']:
|
||||
stats.questions_with_todo += 1
|
||||
stats.by_folder[folder_name]['todo'] += 1
|
||||
return 'skipped_todo'
|
||||
|
||||
stats.questions_with_answers += 1
|
||||
stats.by_folder[folder_name]['answered'] += 1
|
||||
|
||||
# Extract exam information from folder structure
|
||||
# Expected path: content/Anatomi & Histologi 2/Gamla tentor/2022-01-15/question.md
|
||||
exam = None
|
||||
relative_path = file_path.relative_to(base_path)
|
||||
path_parts = relative_path.parts
|
||||
|
||||
# Try to extract exam date from folder structure
|
||||
if len(path_parts) >= 2:
|
||||
# Get the parent folder name which should be the exam date (e.g., "2022-01-15")
|
||||
exam_folder = path_parts[-2] if len(path_parts) > 1 else None
|
||||
|
||||
# Try to parse as date
|
||||
if exam_folder and '-' in exam_folder:
|
||||
try:
|
||||
exam_date = datetime.strptime(exam_folder, '%Y-%m-%d').date()
|
||||
|
||||
# Get or create course (default to "Anatomi & Histologi 2")
|
||||
# Extract course name from path if available
|
||||
course_name = "Anatomi & Histologi 2"
|
||||
if len(path_parts) >= 3 and 'Anatomi' in ' '.join(path_parts):
|
||||
# Try to find course name in path
|
||||
for part in path_parts:
|
||||
if 'Anatomi' in part or 'Histologi' in part:
|
||||
course_name = part
|
||||
break
|
||||
|
||||
course, _ = Course.objects.get_or_create(
|
||||
name=course_name,
|
||||
defaults={'code': 'AH2'}
|
||||
)
|
||||
|
||||
# Get or create exam
|
||||
exam, _ = Exam.objects.get_or_create(
|
||||
course=course,
|
||||
date=exam_date,
|
||||
defaults={
|
||||
'name': exam_folder,
|
||||
'folder_path': '/'.join(path_parts[:-1])
|
||||
}
|
||||
)
|
||||
except (ValueError, ImportError):
|
||||
pass # If date parsing fails, exam remains None
|
||||
|
||||
# Import to database with mtime tracking
|
||||
# Prepare defaults dict
|
||||
defaults = {
|
||||
'exam': exam,
|
||||
'text': question_data['text'],
|
||||
'correct_answer': question_data.get('correct_answer', ''),
|
||||
'file_mtime': file_mtime,
|
||||
'question_type': question_data.get('question_type', 'mcq'),
|
||||
}
|
||||
|
||||
# Add matching_data if it's a matching question
|
||||
if question_data.get('question_type') == 'matching':
|
||||
defaults['matching_data'] = {
|
||||
'left_items': question_data.get('left_items', []),
|
||||
'top_items': question_data.get('top_items', []),
|
||||
'correct_pairs': question_data.get('correct_pairs', [])
|
||||
}
|
||||
|
||||
question, created = Question.objects.update_or_create(
|
||||
file_path=file_path_str,
|
||||
defaults=defaults
|
||||
)
|
||||
|
||||
if created:
|
||||
stats.created += 1
|
||||
else:
|
||||
stats.updated += 1
|
||||
|
||||
# Update tags
|
||||
from django.utils.text import slugify
|
||||
from quiz.models import Tag
|
||||
|
||||
question.tags.clear()
|
||||
for tag_name in question_data.get('tags', []):
|
||||
tag_slug = slugify(tag_name)
|
||||
tag, _ = Tag.objects.get_or_create(
|
||||
slug=tag_slug,
|
||||
defaults={'name': tag_name}
|
||||
)
|
||||
question.tags.add(tag)
|
||||
|
||||
# Update options (only for MCQ/SCQ questions)
|
||||
if question_data.get('question_type') not in ['matching']:
|
||||
question.options.all().delete()
|
||||
# Deduplicate options by letter (keep first occurrence)
|
||||
seen_letters = set()
|
||||
for letter, text in question_data.get('options', []):
|
||||
if letter not in seen_letters:
|
||||
Option.objects.create(question=question, letter=letter, text=text)
|
||||
seen_letters.add(letter)
|
||||
|
||||
return 'imported' if created else 'updated'
|
||||
|
||||
except (OSError, ValueError, django.db.utils.Error) as e:
|
||||
stats.errors += 1
|
||||
print(f"Error importing {file_path}: {e}")
|
||||
return 'error'
|
||||
|
||||
|
||||
def import_questions(folder_path: Path, base_path: Path = None, force: bool = False) -> ImportStats:
|
||||
if base_path is None:
|
||||
base_path = folder_path
|
||||
|
||||
stats = ImportStats()
|
||||
|
||||
for md_file in folder_path.rglob('*.md'):
|
||||
stats.total_files += 1
|
||||
import_question_file(md_file, base_path, stats, force=force)
|
||||
|
||||
return stats
|
||||
|
||||
|
||||
def delete_question_by_path(file_path: Path):
|
||||
try:
|
||||
project_root = settings.BASE_DIR.parent
|
||||
file_path_str = str(file_path.relative_to(project_root))
|
||||
deleted_count, _ = Question.objects.filter(file_path=file_path_str).delete()
|
||||
if deleted_count > 0:
|
||||
print(f"[Auto-delete] ✓ Deleted question: {file_path.name}")
|
||||
return deleted_count > 0
|
||||
except (OSError, django.db.utils.Error) as e:
|
||||
print(f"[Auto-delete] ✗ Error deleting question {file_path}: {e}")
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user