Build a Reliable Directory Watcher in Python (Step‑by‑Step)Monitoring a filesystem directory for changes—new files arriving, files being modified or removed—is a common task for automation, ETL pipelines, deployment systems, and many other applications. A reliable directory watcher must handle real-world issues: missed events, race conditions, partial writes, platform differences, high file churn, and fault tolerance. This guide shows how to build a robust, production-ready directory watcher in Python, step by step, with concrete examples and best practices.
Why a robust directory watcher matters
Many simple examples use a naive loop that polls a directory or rely on a basic filesystem event library without accounting for edge cases. That can lead to:
- Missed or duplicate events when the watcher restarts or the underlying OS batches notifications.
- Race conditions where a file is detected but still being written.
- Platform-specific differences (Linux inotify vs macOS FSEvents vs Windows ReadDirectoryChangesW).
- Resource exhaustion under heavy load.
A reliable watcher treats filesystem events as hints rather than guarantees, uses atomic processing patterns, and offers resilience to failures.
Key design principles
- Treat events as notifications, not the single source of truth: validate file state before processing.
- Use durable bookkeeping (on-disk state) to survive restarts.
- Implement “stability” checks to avoid processing partially written files.
- Debounce/coalesce frequent events to reduce redundant work.
- Design for cross-platform compatibility or clearly pick platform-specific tools.
- Make processing idempotent so retries are safe.
Step 1 — Choose how you’ll observe the filesystem
Option A — Use a native event library (recommended for real-time):
- Linux: inotify (via watchdog, pyinotify)
- macOS: FSEvents (via watchdog)
- Windows: ReadDirectoryChangesW (via watchdog) The watchdog package (https://pypi.org/project/watchdog/) is cross-platform and commonly used. It provides a high-level API and uses the best backend available on each OS.
Option B — Polling
- Simpler and more portable; use when inotify-like facilities are unavailable or events are unreliable.
- Polling interval is a tradeoff: shorter = more CPU; longer = higher latency.
Example: install watchdog
pip install watchdog
Step 2 — Define your processing semantics
Decide what “process a file” means and how to ensure correctness.
- Atomicity: ensure each file is fully processed exactly once or at least once with idempotent processing.
- Ordering: do you need to process files in the order they arrived?
- Retries: how many times to retry on transient failures?
- Visibility: move or rename processed files to an archive or delete them after success.
- Metadata: store processing metadata (hash, timestamp, status) in a persistent store (SQLite, Redis, disk manifests).
Best practice: write a small manifest (SQLite or JSON) that records filename, checksum, last-modified, processing status, attempts, and timestamps.
Step 3 — Handle partial writes and “file stability”
Problem: when a creator writes a large file, your watcher may see the file before writing completes.
Solutions:
- Use atomic handoff by convention: have producers write to a temp name and rename into the watched directory once complete (e.g., write myfile.tmp → mv myfile.tmp myfile.csv). Rename is atomic on the same filesystem.
- If you can’t control producers, implement stability checks: wait until the file’s size and mtime are unchanged for a short duration before processing.
- Use file locking where possible (POSIX advisory locks) to check if another process is writing.
Example stability check pseudo-code:
def is_stable(path, wait_seconds=2): size1 = path.stat().st_size time.sleep(wait_seconds) size2 = path.stat().st_size return size1 == size2
Step 4 — Implement a resilient architecture
Core components:
- Watcher: receives event notifications.
- Scheduler/queue: debounces events, deduplicates, and enqueues tasks for processing.
- Worker(s): pull tasks and process files with retries and error handling.
- Persistence layer: track processed files and retry state.
- Optional: backoff & alerting when failures exceed thresholds.
Use an in-process queue (queue.Queue) for simple setups or a distributed queue (RabbitMQ, Redis/Sidekiq, AWS SQS) for scale.
Example architecture flow:
- Watcher triggers on file creation/modification.
- Watcher writes/updates an entry in the persistent manifest and pushes a task key to the queue.
- Worker pops the task, checks manifest and file stability, processes file, updates manifest success or failure.
- On failure, worker increments attempts and schedules retry.
Step 5 — Minimal working example (watchdog + SQLite + worker)
Below is a concise, production-minded example. It uses watchdog for events, SQLite for simple persistence, a thread pool for workers, and a stability check before processing.
# watcher.py import os import time import sqlite3 import hashlib from pathlib import Path from queue import Queue from threading import Thread from concurrent.futures import ThreadPoolExecutor from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler WATCH_DIR = Path("watched") DB_PATH = Path("watcher.db") WORKERS = 4 STABLE_SECONDS = 2 RETRY_LIMIT = 3 # init DB conn = sqlite3.connect(DB_PATH, check_same_thread=False) conn.execute(""" CREATE TABLE IF NOT EXISTS files ( path TEXT PRIMARY KEY, mtime REAL, size INTEGER, status TEXT, attempts INTEGER DEFAULT 0, last_error TEXT ) """) conn.commit() q = Queue() def record_file(path): st = path.stat() conn.execute(""" INSERT OR REPLACE INTO files(path, mtime, size, status, attempts) VALUES (?, ?, ?, COALESCE((SELECT attempts FROM files WHERE path=?), 0), ?) """, (str(path), st.st_mtime, st.st_size, str(path), 'pending')) conn.commit() def mark_success(path): conn.execute("UPDATE files SET status='done' WHERE path=?", (str(path),)) conn.commit() def mark_failure(path, err): conn.execute("UPDATE files SET status='error', attempts=attempts+1, last_error=? WHERE path=?", (str(err), str(path))) conn.commit() def is_stable(path, wait=STABLE_SECONDS): try: s1 = path.stat().st_size t1 = time.time() except FileNotFoundError: return False time.sleep(wait) try: s2 = path.stat().st_size except FileNotFoundError: return False return s1 == s2 def process(path): p = Path(path) # check status cur = conn.execute("SELECT status, attempts FROM files WHERE path=?", (str(p),)).fetchone() attempts = cur[1] if cur else 0 if attempts >= RETRY_LIMIT: return if not is_stable(p): # re-enqueue after delay time.sleep(1) q.put(str(p)) return try: # Example processing: compute sha256 h = hashlib.sha256() with p.open('rb') as f: for chunk in iter(lambda: f.read(8192), b''): h.update(chunk) print(f"Processed {p}: {h.hexdigest()}") mark_success(p) except Exception as e: mark_failure(p, repr(e)) # requeue if attempts left cur = conn.execute("SELECT attempts FROM files WHERE path=?", (str(p),)).fetchone() if cur and cur[0] < RETRY_LIMIT: time.sleep(2 ** cur[0]) # backoff q.put(str(p)) class Handler(FileSystemEventHandler): def on_created(self, event): if event.is_directory: return p = Path(event.src_path) record_file(p) q.put(str(p)) def on_modified(self, event): if event.is_directory: return p = Path(event.src_path) record_file(p) q.put(str(p)) def worker(): while True: path = q.get() try: process(path) finally: q.task_done() if __name__ == "__main__": WATCH_DIR.mkdir(exist_ok=True) for _ in range(WORKERS): t = Thread(target=worker, daemon=True) t.start() observer = Observer() observer.schedule(Handler(), str(WATCH_DIR), recursive=False) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()
Step 6 — Tests and validation
- Unit test processing logic with small files and simulated partial writes.
- Integration test: start a watcher and create/rename/write files, including large files.
- Fault injection: kill workers, restart watcher, ensure manifest prevents duplicate processing.
- Load test: generate many files quickly to measure latency and memory/CPU usage.
Step 7 — Deployment & operational considerations
- Run the watcher as a supervised service (systemd, Docker + restart policies).
- Rotate or prune the manifest (or use TTL) to bound DB size.
- Add metrics (processed/sec, errors, queue length) and alerts for high error rates.
- Secure file locations and permissions; run workers with least privilege.
- For distributed processing, use a shared durable queue and ensure workers coordinate using the persistent manifest or distributed locks.
Alternatives & when to use them
- High-scale systems: use cloud storage events (S3 Event Notifications) or a message queue to avoid filesystem watcher complexity.
- Simple, single-user scripts: a polling loop with checks may be sufficient.
- Language-specific ecosystems: use platform-native libraries when extreme performance is required.
Common pitfalls and troubleshooting
- Ignoring rename/atomic-write patterns leads to partial reads.
- Assuming event order—batching or reordering may occur.
- Not persisting state—restarts will cause duplicate work.
- File permission errors—ensure the watcher has read access.
- Too short stability wait on network or slow storage—tune wait times.
Summary
Build production-grade directory watchers by combining event notification with solidity checks, durable bookkeeping, idempotent processing, and proper error handling. Use watchdog for cross-platform event watching, add a manifest for restart resilience, and process files only after verifying they’re stable. This approach turns noisy filesystem events into a reliable pipeline component suitable for automation, ETL, and file-driven workflows.
Leave a Reply