All checks were successful
Deploy Quartz site to GitHub Pages / build (push) Successful in 2m29s
151 lines
5.5 KiB
Python
151 lines
5.5 KiB
Python
import time
|
|
import threading
|
|
from pathlib import Path
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler, FileSystemEvent
|
|
from django.conf import settings
|
|
from quiz.utils.importer import import_question_file, delete_question_by_path, ImportStats
|
|
|
|
|
|
class QuestionFileHandler(FileSystemEventHandler):
|
|
"""Handle file system events for question markdown files with mtime checking"""
|
|
|
|
def __init__(self, base_path: Path, watch_path: Path):
|
|
super().__init__()
|
|
self.base_path = base_path
|
|
self.watch_path = watch_path
|
|
self.pending_events = {}
|
|
self.debounce_seconds = 2
|
|
self.lock = threading.Lock()
|
|
|
|
def _debounced_import(self, file_path: Path, event_type: str):
|
|
"""Import file after debounce delay, checking mtime for actual changes"""
|
|
time.sleep(self.debounce_seconds)
|
|
|
|
with self.lock:
|
|
if file_path in self.pending_events:
|
|
del self.pending_events[file_path]
|
|
|
|
if not file_path.exists():
|
|
return
|
|
|
|
# Import with mtime checking (force=False means only import if changed)
|
|
stats = ImportStats()
|
|
result = import_question_file(file_path, self.watch_path, stats, force=False)
|
|
|
|
# Provide feedback based on result
|
|
if result == 'imported':
|
|
print(f"\n[Auto-import] ✓ Created: {file_path.name}")
|
|
elif result == 'updated':
|
|
print(f"\n[Auto-import] ✓ Updated: {file_path.name}")
|
|
elif result == 'skipped_unchanged':
|
|
# File hasn't actually changed (same mtime), no output
|
|
pass
|
|
elif result == 'skipped_todo':
|
|
print(f"\n[Auto-import] ⊘ Skipped: {file_path.name} (TODO answer)")
|
|
elif result == 'skipped_not_mcq':
|
|
# Silently skip non-MCQ files
|
|
pass
|
|
elif result == 'error':
|
|
print(f"\n[Auto-import] ✗ Error: {file_path.name}")
|
|
|
|
def _handle_file_change(self, file_path: Path, event_type: str = 'modified'):
|
|
"""Handle file creation or modification with debouncing"""
|
|
if not file_path.suffix == '.md':
|
|
return
|
|
|
|
with self.lock:
|
|
# Cancel pending import if exists
|
|
if file_path in self.pending_events:
|
|
self.pending_events[file_path].cancel()
|
|
|
|
# Schedule new import
|
|
timer = threading.Timer(
|
|
self.debounce_seconds,
|
|
self._debounced_import,
|
|
args=[file_path, event_type]
|
|
)
|
|
self.pending_events[file_path] = timer
|
|
timer.start()
|
|
|
|
def on_created(self, event: FileSystemEvent):
|
|
"""Handle file creation"""
|
|
if not event.is_directory:
|
|
self._handle_file_change(Path(event.src_path), 'created')
|
|
|
|
def on_modified(self, event: FileSystemEvent):
|
|
"""Handle file modification"""
|
|
if not event.is_directory:
|
|
self._handle_file_change(Path(event.src_path), 'modified')
|
|
|
|
def on_deleted(self, event: FileSystemEvent):
|
|
"""Handle file deletion"""
|
|
if not event.is_directory and event.src_path.endswith('.md'):
|
|
file_path = Path(event.src_path)
|
|
delete_question_by_path(file_path)
|
|
|
|
|
|
class QuestionWatcher:
|
|
"""Watch for changes in question markdown files and auto-import"""
|
|
|
|
def __init__(self, watch_path: Path, base_path: Path = None):
|
|
self.watch_path = watch_path
|
|
self.base_path = base_path or watch_path
|
|
self.observer = None
|
|
self.running = False
|
|
|
|
def start(self):
|
|
"""Start watching for file changes"""
|
|
if self.running:
|
|
return
|
|
|
|
self.observer = Observer()
|
|
event_handler = QuestionFileHandler(self.base_path, self.watch_path)
|
|
self.observer.schedule(event_handler, str(self.watch_path), recursive=True)
|
|
self.observer.start()
|
|
self.running = True
|
|
print(f"[QuestionWatcher] Started watching: {self.watch_path}")
|
|
|
|
def stop(self):
|
|
"""Stop watching for file changes"""
|
|
if self.observer and self.running:
|
|
self.observer.stop()
|
|
self.observer.join()
|
|
self.running = False
|
|
print("[QuestionWatcher] Stopped")
|
|
|
|
|
|
def start_watcher_thread():
|
|
"""Start the question watcher in a background thread"""
|
|
from quiz.utils.importer import import_questions
|
|
|
|
def run_watcher():
|
|
# Get watch path from settings
|
|
watch_path_str = getattr(settings, 'QUESTION_WATCH_PATH', 'content/Anatomi & Histologi 2/Gamla tentor')
|
|
watch_path = settings.BASE_DIR.parent / watch_path_str
|
|
|
|
if not watch_path.exists():
|
|
print(f"[QuestionWatcher] Warning: Watch path does not exist: {watch_path}")
|
|
return
|
|
|
|
# Initial import with mtime checking (force=False to only import changed files)
|
|
print("\n[QuestionWatcher] Checking for changes...")
|
|
stats = import_questions(watch_path, watch_path, force=False)
|
|
|
|
# Only show stats if there were changes
|
|
output = stats.format_output(show_if_no_changes=False)
|
|
if output:
|
|
print(output)
|
|
else:
|
|
print(f"[QuestionWatcher] ✓ All files up to date")
|
|
|
|
# Start watching for changes
|
|
watcher = QuestionWatcher(watch_path, watch_path)
|
|
watcher.start()
|
|
|
|
# Start in daemon thread so it doesn't block shutdown
|
|
thread = threading.Thread(target=run_watcher, name="QuestionWatcher", daemon=True)
|
|
thread.start()
|
|
print("[QuestionWatcher] Background thread started")
|
|
|