airflowdoc
Last updated by: BINIL TOM JOSE, Last updated on: 29/06/2025
Apache Airflow Documentation for Face Detection and Feature Extraction Pipelines
Table of Contents
- 1. Introduction
- 2. Purpose
- 3. System Requirements
- 4. Access Credentials
- 5. Project Structure
- 6. Environment Setup
- 7. How the DAGs Work
- 8. Detailed DAG Documentation
- 9. Kafka Integration
- 10. Logging and Monitoring
- 11. Common Issues & Troubleshooting
- 12. Glossary
- 13. Testing Procedures
- 14. Error Handling Strategies
- 15. Performance Considerations
- 16. API Integration
1. Introduction
This repository contains a collection of Apache Airflow DAGs used in a real-time face detection and crowd monitoring system. The pipeline orchestrates various tasks like frame ingestion from Kafka, running deep learning inference (YOLO, emotion, mask detection), and publishing the results.
Airflow plays a key role in:
- Orchestrating complex workflows
- Scheduling and triggering detection jobs
- Running image processing in isolated environments
- Managing dependencies between tasks
The system includes a FastAPI service (api.py
) that provides endpoints for uploading images, triggering DAGs, and retrieving results, creating a complete end-to-end pipeline from image upload to result delivery.
2. Purpose
The goal of this documentation is to:
- Explain how each DAG contributes to the system
- Clarify dependencies and input/output
- Enable easy onboarding for new developers and contributors
- Provide setup, monitoring, and troubleshooting steps
- Ensure code consistency and reliability
3. System Requirements
- Python: 3.8 or later
- Apache Airflow: 2.5 or later
- Kafka Cluster (already hosted on Redback)
- Libraries: Pillow, OpenCV, SAHI, Ultralytics, kafka-python, numpy, dlib
- Storage: Sufficient disk space for model weights (~300MB)
- Memory: Minimum 4GB RAM recommended for YOLO inference
4. Access Credentials
You can access the Airflow UI using:
- URL: http://redback.it.deakin.edu.au:8888
- username = input("Enter your username: ")
- password = input("Enter your password: ") The FastAPI service has these credentials configured as environment variables and handles authentication with the Airflow API automatically.
5. Project Structure
project/
├── airflow/
│ └── dags/
│ ├── face_detection.py
│ ├── detect_faces_and_features.py
│ ├── face_dag.py
│ ├── heatmaps_kafka.py
│ ├── example_dag.py
│ └── sahi.py
├── api/
│ └── api.py # FastAPI service for image upload and DAG triggering
6. Environment Setup
Airflow operators use Python virtual environments via PythonVirtualenvOperator. Required model weights and config files are downloaded dynamically at runtime.
Required ENV_VARS for Airflow DAGs
KAFKA_BOOTSTRAP_SERVER=redback.it.deakin.edu.au:9092
YOLO_CFG=/tmp/yolov4-face.cfg
YOLO_WEIGHTS=/tmp/yolov4-face.weights
MASK_MODEL=/tmp/mask_detector.model
AGE_PROTO=/tmp/age_deploy.prototxt
AGE_MODEL=/tmp/age_net.caffemodel
EMOTION_MODEL=/tmp/emotion-ferplus-8.onnx
For the heatmaps and SAHI-based object detection:
HEATMAP_TOPIC=heatmap
HEATMAP_DETECTED_TOPIC=heatmap_detected
RESULTS_TOPIC=results_topic
YOLO_WEIGHTS_PATH=/tmp/yolov8n.pt
Required ENV_VARS for FastAPI Service
KAFKA_SERVER=redback.it.deakin.edu.au:9092
AIRFLOW_BASE_URL=http://redback.it.deakin.edu.au:8888
USERNAME=project_4
PASSWORD=TYojTPXO14gtRoFbkNVYUQ9y2cBagSwsCWyvbqs_REA
Model URLs are embedded in the DAGs and downloaded using urllib if not already present.
7. How the DAGs Work
Each DAG represents a specific pipeline, some DAGs are standalone and others work together in a sequence. They operate on API-triggered runs through the FastAPI service. Common steps include:
- Consuming images from Kafka topics
- Running detection/inference (YOLO, age, emotion, mask)
- Annotating images
- Publishing JSON + image results to Kafka
The DAGs utilize task dependencies to ensure operations are performed in the correct sequence. For model-based operations, the workflow typically includes:
- Download model weights (if not present)
- Consume data from Kafka
- Process data using models
- Publish results back to Kafka
8. Detailed DAG Documentation
8.1 face_detection.py
-
DAG ID: face_feature_extraction
-
Trigger: API-triggered via FastAPI service
-
Operator: PythonVirtualenvOperator
API Trigger Details:
- Method: POST to FastAPI endpoint
/upload/
- Parameters: Image file upload with
dag_id
parameter set to "face_feature_extraction" - Process:
- Image is compressed and sent to face_images Kafka topic
- DAG is triggered via Airflow API with session authentication
- Results are retrieved from face_results Kafka topic
What it does:
- Downloads missing models
- Subscribes to face_images topic
- Runs detection for face, age, mask, emotion
- Publishes result JSON to face_results
- Requirements: opencv-python, numpy, kafka-python, Pillow
Error Handling:
- Downloads missing models only if needed
- Sets consumer timeout to handle Kafka connection issues
- Uses confidence thresholds to filter reliable predictions
8.2 detect_faces_and_features.py
-
DAG ID: face_feature_extraction
-
Purpose: Main pipeline for face detection and feature extraction
-
Trigger: API-triggered via FastAPI service (same as face_detection.py)
-
Input Topic: face_images
-
Output Topic: face_results
Flow:
- Downloads required models if missing
- Initializes neural networks for face detection, age estimation, mask detection, and emotion analysis
- Consumes images from Kafka
- Processes each image through multiple models
- Publishes detailed JSON results with bounding boxes, age, mask status, and emotion data
- Uses environment variables for configuration
Requirements:
- opencv-python
- numpy
- kafka-python
- Pillow
8.3 face_dag.py
-
DAG ID: tutorial
-
Purpose: Basic template DAG showing Airflow configuration
-
Trigger: Manual
-
Structure: Minimal DAG showing standard configuration options
Configuration:
- Standard Airflow options (owner, retries, email settings)
- Catchup enabled
- No actual tasks defined in this template
- Good for understanding basic Airflow DAG structure
- Use as a starting point for new DAGs
8.4 heatmaps_kafka.py
-
DAG ID: process_heatmap_images
-
Purpose: Slice-based object detection using YOLOv8 and SAHI
-
Trigger: API-triggered via FastAPI service
-
Input Topic: heatmap
-
Output Topics: results_topic, heatmap_detected
API Trigger Details:
- Method: POST to FastAPI endpoint
/upload/
- Parameters: Image file upload with
dag_id
parameter set to "process_heatmap_images" - Process:
- Image is compressed and sent to heatmap Kafka topic
- DAG is triggered via Airflow API with session authentication
- Results are retrieved from results_topic Kafka topic
Flow:
- Pulls image from Kafka
- Runs get_sliced_prediction with SAHI
- Publishes both JSON results and annotated image
- Uses slicing technique for better detection of small objects
- Sets confidence threshold at 0.3
Technical Details:
- Processes images as 256x256 slices with 20% overlap
- Converts SAHI prediction format to standardized JSON
- Visualizes predictions on original image
- Publishes both data types to separate topics
8.5 example_dag.py
-
DAG ID: example_dag
-
Purpose: A simple test DAG to demonstrate sequential task execution
-
Trigger: Manual
Tasks:
- print_hello - Logs a greeting
- wait_for_a_while - Sleeps for 5 seconds
- Good for: Testing DAG scheduling, dependencies, and logs
A separate DAG with ID yolo_face_detection_venv
is also present in this file, which demonstrates:
- Loading a test image
- Preprocessing for YOLO
- Running YOLOv3 for face detection
- Postprocessing + drawing boxes
- Uses pickle to pass data between tasks
- Good for understanding YOLO output formats
8.6 sahi.py
-
Purpose: Integration helper for SAHI (Slicing Aided Hyper Inference)
-
Functionality:
- Provides utilities for sliced object detection
- Integrates with YOLOv8 models
- Helps with better detection of small objects in images
-
Usage:
- Imported by heatmaps_kafka.py
- Handles slicing logic and prediction visualization
- Formats predictions for consistent output
9. Kafka Integration
Airflow DAGs interface directly with Kafka topics using the kafka-python package:
- KafkaConsumer: Reads image blobs from topics
- KafkaProducer: Sends JSON metadata and image bytes to output topics
Example Topics:
- face_images
- face_results
- heatmap
- results_topic
- heatmap_detected
Connection Settings:
- Default bootstrap server: redback.it.deakin.edu.au:9092
- Consumer timeout: 10000ms (10 seconds)
- Auto offset reset: 'earliest'
Data Formats:
- Images: Raw binary (JPEG bytes)
- Results: JSON format with standardized fields
- Annotated images: Raw binary (JPEG bytes with visual annotations)
10. Logging and Monitoring
- All Airflow logs are available in the UI > DAG > Task Logs
- Important checkpoints (e.g., image received, model loaded, JSON result sent) are logged using print()
- You can add more structured logs using Python's logging module if needed
- When debugging, check:
- Airflow task instance logs
- Kafka consumer group offsets
- Model download success/failure messages
11. Common Issues & Troubleshooting
Problem | Possible Cause | Solution |
---|---|---|
DAG not appearing in UI | File not in dags/ or syntax error | Check file name and Airflow scheduler logs |
Kafka timeout | No messages in topic | Use Kafka UI to confirm topic state |
Operator failure due to deps | Missing Python packages in requirements | Add them explicitly in the DAG operator |
Model path not found | Weight files not downloaded | Ensure URL is correct and os.path.exists() logic works |
Tasks stuck in queued state | Resource constraints or scheduler issues | Check Airflow worker logs and executor status |
YOLO inference errors | Incompatible model format | Verify model version matches code expectations |
API upload fails | File too large or incorrect format | Check file size and format before upload |
Session authentication fails | Incorrect credentials or CSRF token | Verify environment variables for API service |
Missing response from Kafka | Consumer timeout before processing completes | Increase consumer timeout in FastAPI or implement polling |
12. Glossary
Term | Description |
---|---|
DAG | Directed Acyclic Graph, represents a pipeline |
Task | A single step in the DAG, defined by an operator |
Kafka | Distributed event streaming platform |
SAHI | Slicing Aided Hyper Inference library for object detection |
YOLO | You Only Look Once, object detection model |
Virtualenv | Isolated Python environment per operator task |
NMS | Non-Maximum Suppression, technique to reduce duplicate detections |
Confidence | Probability score of detection accuracy |
FastAPI | Python web framework for building APIs with automatic input validation |
CSRF Token | Cross-Site Request Forgery token, used for secure authentication |
Session Authentication | Authentication method using cookies and server-side sessions |
API Trigger | Method to start DAG runs via REST API calls |
13. Testing Procedures
Local Testing:
- Verify DAG Syntax: Run
python -c "from airflow.models import DagBag; d = DagBag()"
- Test Individual Tasks: Use Airflow's test command:
airflow tasks test [dag_id] [task_id] [execution_date]
- Test Kafka Connectivity: Use simple consumer/producer scripts before running the full DAG
- Test API Triggers: Use curl or Postman to verify API trigger functionality
Test Data:
- Store sample images in a designated test directory
- Create test Kafka topics with
-test
suffix
Validation:
- Check that detections match expected objects
- Verify bounding box coordinates are reasonable
- Ensure emotion and age classifications are consistent
- Confirm API-triggered DAGs execute as expected
13. Testing Procedures
Local Testing:
- Verify DAG Syntax: Run
python -c "from airflow.models import DagBag; d = DagBag()"
- Test Individual Tasks: Use Airflow's test command:
airflow tasks test [dag_id] [task_id] [execution_date]
- Test Kafka Connectivity: Use simple consumer/producer scripts before running the full DAG
- Test FastAPI Service: Use the
/trigger-test-kafka-dag/
endpoint to verify end-to-end connectivity
FastAPI Testing:
- Unit Testing: Test image compression function and Kafka producer separately
- Integration Testing: Use test images to verify the complete upload-process-result flow
- Error Handling: Confirm proper error responses for invalid inputs or connection issues
Test Data:
- Store sample images in a designated test directory
- Create test Kafka topics with
-test
suffix
Validation:
- Check that detections match expected objects
- Verify bounding box coordinates are reasonable
- Ensure emotion and age classifications are consistent
- Confirm API responses contain all expected fields
14. Error Handling Strategies
Retry Mechanism:
- Most DAGs use
retries: 1
with a 5-minute delay - Consider increasing for production environments
Error Catching:
- Kafka consumer timeouts prevent indefinite blocking
- Model download errors are handled with existence checks
- Confidence thresholds filter low-quality detections
- API trigger failures provide detailed error responses
Recovery:
- Failed tasks can be manually rerun from the Airflow UI
- Consider implementing dead-letter queues for failed messages
- Add explicit error handling in Python callable functions
- Use API status checks to confirm successful DAG initiation
15. Performance Considerations
Scaling:
- YOLO inference is CPU-intensive; consider GPU support for production
- SAHI slicing increases accuracy but adds processing time
- Use multiple workers for parallel task execution
Resource Allocation:
- Increase memory for large image processing
- Monitor CPU utilization during inference
- Consider batching for higher throughput
Optimization:
- Reduce slice overlap in SAHI for faster processing
- Adjust confidence thresholds based on application needs
- Pre-download models during deployment rather than runtime
- Implement rate limiting for API triggers to prevent system overload
16. API Integration
16.1 Triggering DAGs via FastAPI Service
The system provides a FastAPI web service (api.py
) that serves as the primary interface for uploading images and triggering Airflow DAGs. This service handles:
- Image upload and compression
- Kafka message publication
- Airflow DAG triggering via session authentication
- Result retrieval from Kafka output topics
Available DAGs for Triggering
The following DAGs can be triggered via the API:
object_detection_single_task
(default)face_feature_extraction
process_heatmap_images
test_kafka_in_virtualenv_dag
(test endpoint only)
API Endpoints
-
Image Upload and Processing
- Endpoint:
/upload/
- Method: POST
- Parameters:
file
: Image file upload (required)dag_id
: DAG identifier to trigger (optional, defaults toobject_detection_single_task
)
- Actions:
- Compresses the uploaded image
- Sends image to appropriate Kafka topic based on DAG ID
- Triggers the specified Airflow DAG
- Waits for and returns processing results from result topic
- Endpoint:
-
Test Kafka DAG Trigger
- Endpoint:
/trigger-test-kafka-dag/
- Method: POST
- Actions:
- Triggers the test_kafka_in_virtualenv_dag
- Waits for response in kafka_test topic
- Returns status and kafka message
- Endpoint:
Authentication Flow
The service uses Airflow's session-based authentication:
- Sends GET request to Airflow's login page to retrieve CSRF token
- Authenticates with username/password and CSRF token
- Uses the authenticated session for DAG triggering API calls
Airflow DAG Trigger Process
# Example payload for triggering a DAG
payload = {
"conf": {},
"dag_run_id": f"run_{requests.utils.default_headers()['User-Agent'][-4:]}",
"logical_date": dt.strftime('%Y-%m-%dT%H:%M:%S.') + f'{int(dt.microsecond / 1000):03d}Z',
"note": "triggered via API"
}
16.2 End-to-End Workflow
- Client uploads an image via the
/upload/
endpoint with optionaldag_id
parameter - The FastAPI service:
- Compresses the image using PIL
- Sends the compressed image to the appropriate Kafka topic:
image_blob_topic
for object detection DAGheatmap
for other DAGs
- Triggers the specified Airflow DAG via session authentication
- Waits for results from the
results_topic
Kafka topic
- The triggered Airflow DAG:
- Consumes the image from the specified Kafka topic
- Processes the image (face detection, feature extraction, etc.)
- Publishes results back to the designated output topic
- The API service returns the complete result to the client
Example Response
{
"status": "success",
"kafka": {
"topic": "heatmap",
"partition": 0,
"offset": 42
},
"airflow": {
"conf": {},
"dag_id": "process_heatmap_images",
"dag_run_id": "run_abcd",
"end_date": null,
"execution_date": "2025-05-19T12:00:00+00:00",
"external_trigger": true,
"start_date": "2025-05-19T12:00:00+00:00",
"state": "running"
},
"result": "{\"detections\": [{\"type\": \"person\", \"confidence\": 0.92, \"bbox\": [23, 45, 67, 89]}]}"
}
16.3 Configuration and Environment Variables
The API service relies on several environment variables:
KAFKA_SERVER
: Kafka bootstrap server addressAIRFLOW_BASE_URL
: Base URL for the Airflow web serverUSERNAME
: Airflow username for authenticationPASSWORD
: Airflow password for authentication
16.4 Implementation Details
Image Processing
- Uploaded images are compressed to JPEG format with quality=50
- RGB conversion is applied if needed
- Compressed images are serialized to bytes before Kafka transmission
Authentication Method
The API uses session-based authentication with CSRF token instead of Basic Auth:
- Retrieves CSRF token from Airflow login page
- Submits login credentials along with CSRF token
- Uses the authenticated session cookie for subsequent API calls
Airflow DAG Trigger URL
{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns
Error Handling
The API implements comprehensive error handling:
- HTTP exceptions with appropriate status codes (500 for internal errors)
- Detailed error messages in response
- Timeout handling for Kafka operations (10 seconds)
16.5 Using the API Service
Example API Call Using curl
curl -X POST http://your-fastapi-service-url/upload/ \
-F "file=@your_image.jpg" \
-F "dag_id=face_feature_extraction"
Example API Call Using Python Requests
import requests
url = "http://your-fastapi-service-url/upload/"
files = {"file": open("your_image.jpg", "rb")}
data = {"dag_id": "face_feature_extraction"}
response = requests.post(url, files=files, data=data)
result = response.json()
print(result)
Testing the Kafka DAG Integration
curl -X POST http://your-fastapi-service-url/trigger-test-kafka-dag/
16.6 Best Practices for Using the API Service
- Ensure images are in common formats (JPEG, PNG)
- Keep image sizes reasonable to avoid performance issues
- Use the appropriate
dag_id
for your processing needs:- Use
object_detection_single_task
for basic object detection - Use
process_heatmap_images
for SAHI-based detection - Use
face_feature_extraction
for face analysis
- Use
- Implement timeouts and retries in client applications
- Handle potential delays between DAG triggering and result availability
- Monitor the API service logs for debugging issues# Apache Airflow Documentation for Face Detection and Feature Extraction Pipelines