Building a Real-Time Fraud Detection System with FastAPI, ML, and Azure#
Financial fraud costs businesses billions of dollars annually. In this tutorial, I'll show you how to build an enterprise-grade fraud detection system that processes transactions in real-time, combines multiple detection strategies, and provides AI-powered explanations for flagged transactions.
This isn't just another toy project—it's a production-ready system that demonstrates:
- 🔥 Real-time scoring with <100ms latency target
- 🧠 Multi-model ensemble combining rules, ML, and Azure AI
- 📊 Live dashboards with WebSocket updates
- 🤖 AI-powered explanations using Claude Haiku
- 🐳 Production-ready with Docker, Kubernetes, and CI/CD
Live Demo#
Here's what the final system looks like:

The analytics dashboard shows real-time metrics, fraud trends, merchant analysis, and score distribution.

High-risk transactions are automatically blocked and flagged for investigation.
Why Multi-Model Fraud Detection?#
The Single-Model Problem#
Relying on a single detection method has critical weaknesses:
| Approach | Weakness |
|---|---|
| Rules Only | Can't adapt to new fraud patterns |
| ML Only | Black box, hard to explain decisions |
| Threshold Only | Too many false positives/negatives |
Our Ensemble Solution#
We combine three detection strategies with weighted scoring:
Final Score = (Rule Score × 85%) + (ML Score × 10%) + (Azure Score × 5%)
This gives us:
- ✅ Interpretability from business rules
- ✅ Pattern detection from ML
- ✅ Managed service benefits from Azure
- ✅ Explainable decisions for analysts
System Architecture#
High-Level Overview#
┌─────────────────────────────────────────────────────────────┐
│ Frontend (React 19) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Dashboard │ │Transactions│ │ Alerts │ │Analytics │ │
│ └────┬─────┘ └─────┬─────┘ └────┬────┘ └────┬─────┘ │
│ │ │ │ │ │
│ └──────────────┴──────────────┴────────────┘ │
│ │ │
│ WebSocket + REST API │
└───────────────────────────┬───────────────────────────────────┘
│
┌───────────────────────────┴───────────────────────────────────┐
│ FastAPI Backend (Python) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Risk Scoring Engine │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │Rule Engine │ │Isolation │ │Azure Anomaly│ │ │
│ │ │(85% weight) │ │Forest (10%) │ │Detector (5%)│ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │ │
│ │ ┌──────────┴──────────┐ │ │
│ │ │ Claude Haiku AI │ │ │
│ │ │ (Explanations) │ │ │
│ │ └─────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
└───────────────────────────┬───────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
┌───────┴───────┐ ┌───────┴───────┐ ┌───────┴───────┐
│ PostgreSQL │ │ Redis │ │ Azure APIs │
│ (Primary DB) │ │ (Cache) │ │ (Optional) │
└───────────────┘ └───────────────┘ └───────────────┘
The Rule Engine (85% of Detection)#
Business rules provide interpretable, explainable fraud detection. Our engine implements 6 configurable rules:
1. Velocity Check (25 points)#
Detects rapid transaction patterns that indicate card theft or reuse.
class VelocityRule: """Detect rapid transaction patterns""" def evaluate(self, transaction, user_history): recent_txns = self.get_transactions_in_window( user_history, window_minutes=10 ) if len(recent_txns) > self.max_transactions: return RuleResult( triggered=True, score=25.0, reason=f"Velocity exceeded: {len(recent_txns)} transactions in 10 minutes" ) return RuleResult(triggered=False, score=0.0)
2. High Amount Detection (20 points)#
Flags transactions significantly higher than user's typical spending.
def evaluate(self, transaction, user_history): avg_amount = self.calculate_average(user_history) if transaction.amount > avg_amount * 3: return RuleResult( triggered=True, score=20.0, reason=f"Amount ${transaction.amount} is 3x above average ${avg_amount:.2f}" )
3. Impossible Travel (30 points)#
Uses the Haversine formula to detect physically impossible location changes.
from math import radians, sin, cos, sqrt, atan2 def haversine_distance(lat1, lon1, lat2, lon2): """Calculate great-circle distance between two points""" R = 6371 # Earth's radius in km dlat = radians(lat2 - lat1) dlon = radians(lon2 - lon1) a = sin(dlat/2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon/2)**2 c = 2 * atan2(sqrt(a), sqrt(1-a)) return R * c def evaluate(self, transaction, last_transaction): distance = haversine_distance( transaction.lat, transaction.lon, last_transaction.lat, last_transaction.lon ) time_diff_hours = (transaction.time - last_transaction.time).total_seconds() / 3600 # 500km in 1 hour = impossible by commercial travel if distance > 500 and time_diff_hours < 1: return RuleResult( triggered=True, score=30.0, reason=f"Impossible travel: {distance:.0f}km in {time_diff_hours:.1f} hours" )
4. Unusual Hours (10 points)#
Transactions between 12am-5am are flagged.
5. New Device (15 points)#
First-time device with amount >$500 triggers this rule.
6. Country Blacklist (85 points)#
Immediate high-risk flag for sanctioned countries (KP, IR, SY).
Isolation Forest ML Model (10% of Detection)#
The ML component uses Isolation Forest, an unsupervised anomaly detection algorithm that requires no training data.
Why Isolation Forest?#
| Feature | Benefit |
|---|---|
| Unsupervised | No labeled fraud data needed |
| Fast inference | <10ms per transaction |
| Interpretable | Anomaly score is intuitive |
| Zero training | Works out of the box |
Feature Engineering#
We extract 9 features from each transaction:
def extract_features(self, transaction, user_history): features = { 'amount': transaction.amount, 'hour_of_day': transaction.timestamp.hour, 'day_of_week': transaction.timestamp.weekday(), 'days_since_last_txn': self.days_since_last(user_history), 'avg_amount_last_10': self.rolling_average(user_history, 10), 'std_amount_last_10': self.rolling_std(user_history, 10), 'txn_velocity_hour': self.count_last_hour(user_history), 'txn_velocity_day': self.count_last_day(user_history), 'geo_distance': self.distance_from_last(transaction, user_history), } return self.normalize(features) # Z-score normalization
Model Implementation#
from sklearn.ensemble import IsolationForest class AnomalyDetector: def __init__(self): self.model = IsolationForest( n_estimators=100, contamination=0.01, # Expected fraud rate random_state=42 ) self.is_fitted = False def score(self, features): """Return anomaly score (0-100)""" if not self.is_fitted: # Use decision function without fitting return self.heuristic_score(features) # sklearn returns -1 to 1, we convert to 0-100 raw_score = self.model.decision_function([features])[0] return max(0, min(100, (1 - raw_score) * 50))
Real-Time WebSocket Architecture#
The system uses three independent WebSocket channels for live updates:
Channel Design#
# backend/src/backend/api/v1/websocket.py class ConnectionManager: def __init__(self): self.active_connections: Dict[str, List[WebSocket]] = { "transactions": [], "alerts": [], "stats": [] } async def broadcast(self, channel: str, message: dict): """Broadcast to all connections on a channel""" for connection in self.active_connections[channel]: await connection.send_json(message) @router.websocket("/ws/transactions") async def transactions_websocket(websocket: WebSocket): await manager.connect("transactions", websocket) try: while True: # Heartbeat every 30 seconds await asyncio.sleep(30) await websocket.send_json({"type": "heartbeat"}) except WebSocketDisconnect: manager.disconnect("transactions", websocket)
Frontend Integration#
// frontend/src/contexts/WebSocketContext.tsx export const WebSocketProvider: React.FC = ({ children }) => { const [transactions, setTransactions] = useState<Transaction[]>([]); useEffect(() => { const ws = new WebSocket('ws://localhost:8000/api/v1/ws/transactions'); ws.onmessage = (event) => { const data = JSON.parse(event.data); if (data.type === 'new_transaction') { setTransactions(prev => [data.transaction, ...prev].slice(0, 100)); } }; return () => ws.close(); }, []); return ( <WebSocketContext.Provider value={{ transactions }}> {children} </WebSocketContext.Provider> ); };
AI-Powered Explanations with Claude#
For flagged transactions, we generate human-readable explanations using Claude Haiku:
# backend/src/backend/services/claude_explainer.py class ClaudeExplainer: def __init__(self): self.client = anthropic.Anthropic() self.model = "claude-3-haiku-20240307" # Cheapest, fastest async def explain(self, transaction, triggered_rules, score): prompt = f""" Analyze this transaction and explain the fraud risk in 2-3 sentences: Transaction: ${transaction.amount} at {transaction.merchant} Location: {transaction.city}, {transaction.country} Time: {transaction.timestamp} Triggered Rules: {triggered_rules} Risk Score: {score}/100 Provide a clear, analyst-friendly explanation. """ response = await self.client.messages.create( model=self.model, max_tokens=300, messages=[{"role": "user", "content": prompt}] ) return response.content[0].text
Example output:
"This transaction was flagged due to impossible travel detection. The cardholder made a purchase in New York at 2:15 PM, but the previous transaction was in Los Angeles just 45 minutes earlier—a physical impossibility. Combined with the unusual $2,500 amount (3x the user's average), this strongly indicates potential card cloning. Recommend immediate card freeze and customer verification."
API Design#
The system exposes a comprehensive REST API:

Core Endpoints#
| Endpoint | Method | Description |
|---|---|---|
/api/v1/transactions/score | POST | Score a transaction in real-time |
/api/v1/transactions | GET | List transactions with filters |
/api/v1/alerts | GET | Get fraud alerts |
/api/v1/analytics/dashboard | GET | Dashboard metrics |
/api/v1/analytics/trends | GET | 7-day fraud trends |
Scoring Request/Response#
curl -X POST http://localhost:8000/api/v1/transactions/score \ -H "Content-Type: application/json" \ -d '{ "user_id": "user_123", "amount": 1500.00, "currency": "USD", "merchant_name": "Electronics Store", "merchant_category": "electronics", "location": { "latitude": 40.7128, "longitude": -74.0060, "country": "US", "city": "New York" }, "device_id": "device_456" }'
Response:
{ "transaction_id": "txn_abc123", "fraud_score": 72.3, "risk_level": "HIGH", "is_blocked": true, "triggered_rules": ["velocity_check", "high_amount"], "explanation": "Transaction blocked due to rapid transaction velocity...", "processing_time_ms": 45 }
Database Schema#
Transaction Model#
# backend/src/backend/models/transaction.py class Transaction(Base): __tablename__ = "transactions" id = Column(UUID, primary_key=True, default=uuid4) user_id = Column(UUID, ForeignKey("users.id"), nullable=False) # Financial amount = Column(Numeric(12, 2), nullable=False) currency = Column(String(3), default="USD") # Merchant merchant_name = Column(String(255)) merchant_category = Column(String(100)) # Location latitude = Column(Float) longitude = Column(Float) country = Column(String(2)) city = Column(String(100)) # Device device_id = Column(String(255)) ip_address = Column(String(45)) user_agent = Column(Text) # Fraud Detection Results fraud_score = Column(Float, default=0.0) risk_level = Column(Enum('LOW', 'MEDIUM', 'HIGH')) is_fraud = Column(Boolean, default=False) is_blocked = Column(Boolean, default=False) triggered_rules = Column(JSONB, default=list) # Timestamps created_at = Column(DateTime, server_default=func.now())
Deployment with Docker#
Docker Compose Setup#
# docker-compose.yml services: postgres: image: postgres:16-alpine environment: POSTGRES_DB: fraud_detection POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres volumes: - postgres_data:/var/lib/postgresql/data healthcheck: test: ["CMD-SHELL", "pg_isready -U postgres"] interval: 10s redis: image: redis:7-alpine volumes: - redis_data:/data healthcheck: test: ["CMD", "redis-cli", "ping"] backend: build: ./backend environment: - DATABASE_URL=postgresql+asyncpg://postgres:postgres@postgres:5432/fraud_detection - REDIS_HOST=redis ports: - "8000:8000" depends_on: postgres: condition: service_healthy redis: condition: service_healthy frontend: build: ./frontend environment: - VITE_API_URL=http://localhost:8000 ports: - "3000:80" depends_on: - backend
Quick Start#
# Clone and start git clone https://github.com/MinhQuanBuiSco/Azure.git cd Azure/fraud_detection # Start all services make init # First time setup make up # Start services # Generate test transactions make generate-txns # Access # Frontend: http://localhost:3000 # API Docs: http://localhost:8000/docs
Performance Considerations#
Caching Strategy#
# User history caching with Redis class CacheService: async def get_user_history(self, user_id: str) -> List[Transaction]: cached = await self.redis.get(f"user_history:{user_id}") if cached: return json.loads(cached) # Fetch from DB and cache history = await self.db.get_user_transactions(user_id, limit=100) await self.redis.setex( f"user_history:{user_id}", 300, # 5 minute TTL json.dumps(history) ) return history
Async Everything#
# All external calls are async async def score_transaction(self, transaction: TransactionCreate): # Run detection methods concurrently rule_score, ml_score, azure_score = await asyncio.gather( self.rule_engine.evaluate(transaction), self.anomaly_detector.score(transaction), self.azure_detector.detect(transaction) ) final_score = ( rule_score * 0.85 + ml_score * 0.10 + azure_score * 0.05 ) return final_score
Key Takeaways#
Building a production fraud detection system requires:
- Multi-model ensemble - Don't rely on single detection method
- Interpretable rules - Business rules provide explainability
- Real-time processing - WebSockets for instant updates
- AI explanations - Claude Haiku for analyst-friendly insights
- Proper caching - Redis for sub-millisecond user history lookup
- Async architecture - Concurrent processing for low latency
What's Next?#
Future enhancements could include:
- Graph neural networks for network fraud detection
- A/B testing framework for rule optimization
- Feedback loop for model retraining
- Multi-region deployment with Azure Traffic Manager
Resources#
Found this helpful? Check out my other posts on building production AI systems!