Back
View source
AI Engineering··15 min

Building a Real-Time Fraud Detection System with FastAPI, ML, and Azure

Learn how to build an enterprise-grade fraud detection system featuring multi-model ensemble scoring, real-time WebSocket updates, and AI-powered explanations. Complete with Rule Engine, Isolation Forest ML, and Azure integration.

Building a Real-Time Fraud Detection System with FastAPI, ML, and Azure#

Financial fraud costs businesses billions of dollars annually. In this tutorial, I'll show you how to build an enterprise-grade fraud detection system that processes transactions in real-time, combines multiple detection strategies, and provides AI-powered explanations for flagged transactions.

This isn't just another toy project—it's a production-ready system that demonstrates:

  • 🔥 Real-time scoring with <100ms latency target
  • 🧠 Multi-model ensemble combining rules, ML, and Azure AI
  • 📊 Live dashboards with WebSocket updates
  • 🤖 AI-powered explanations using Claude Haiku
  • 🐳 Production-ready with Docker, Kubernetes, and CI/CD

Live Demo#

Here's what the final system looks like:

Fraud Detection Analytics Dashboard

The analytics dashboard shows real-time metrics, fraud trends, merchant analysis, and score distribution.

Transaction Monitoring

High-risk transactions are automatically blocked and flagged for investigation.


Why Multi-Model Fraud Detection?#

The Single-Model Problem#

Relying on a single detection method has critical weaknesses:

ApproachWeakness
Rules OnlyCan't adapt to new fraud patterns
ML OnlyBlack box, hard to explain decisions
Threshold OnlyToo many false positives/negatives

Our Ensemble Solution#

We combine three detection strategies with weighted scoring:

Final Score = (Rule Score × 85%) + (ML Score × 10%) + (Azure Score × 5%)

This gives us:

  • Interpretability from business rules
  • Pattern detection from ML
  • Managed service benefits from Azure
  • Explainable decisions for analysts

System Architecture#

High-Level Overview#

┌─────────────────────────────────────────────────────────────┐
│                    Frontend (React 19)                       │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐    │
│  │Dashboard │  │Transactions│  │ Alerts  │  │Analytics │    │
│  └────┬─────┘  └─────┬─────┘  └────┬────┘  └────┬─────┘    │
│       │              │              │            │           │
│       └──────────────┴──────────────┴────────────┘           │
│                           │                                   │
│                    WebSocket + REST API                       │
└───────────────────────────┬───────────────────────────────────┘
                            │
┌───────────────────────────┴───────────────────────────────────┐
│                  FastAPI Backend (Python)                      │
│  ┌─────────────────────────────────────────────────────────┐  │
│  │                   Risk Scoring Engine                    │  │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐       │  │
│  │  │Rule Engine  │ │Isolation    │ │Azure Anomaly│       │  │
│  │  │(85% weight) │ │Forest (10%) │ │Detector (5%)│       │  │
│  │  └─────────────┘ └─────────────┘ └─────────────┘       │  │
│  │                         │                               │  │
│  │              ┌──────────┴──────────┐                   │  │
│  │              │  Claude Haiku AI    │                   │  │
│  │              │  (Explanations)     │                   │  │
│  │              └─────────────────────┘                   │  │
│  └─────────────────────────────────────────────────────────┘  │
└───────────────────────────┬───────────────────────────────────┘
                            │
        ┌───────────────────┼───────────────────┐
        │                   │                   │
┌───────┴───────┐   ┌───────┴───────┐   ┌───────┴───────┐
│  PostgreSQL   │   │    Redis      │   │  Azure APIs   │
│  (Primary DB) │   │   (Cache)     │   │  (Optional)   │
└───────────────┘   └───────────────┘   └───────────────┘

The Rule Engine (85% of Detection)#

Business rules provide interpretable, explainable fraud detection. Our engine implements 6 configurable rules:

1. Velocity Check (25 points)#

Detects rapid transaction patterns that indicate card theft or reuse.

class VelocityRule:
    """Detect rapid transaction patterns"""

    def evaluate(self, transaction, user_history):
        recent_txns = self.get_transactions_in_window(
            user_history,
            window_minutes=10
        )

        if len(recent_txns) > self.max_transactions:
            return RuleResult(
                triggered=True,
                score=25.0,
                reason=f"Velocity exceeded: {len(recent_txns)} transactions in 10 minutes"
            )
        return RuleResult(triggered=False, score=0.0)

2. High Amount Detection (20 points)#

Flags transactions significantly higher than user's typical spending.

def evaluate(self, transaction, user_history):
    avg_amount = self.calculate_average(user_history)
    if transaction.amount > avg_amount * 3:
        return RuleResult(
            triggered=True,
            score=20.0,
            reason=f"Amount ${transaction.amount} is 3x above average ${avg_amount:.2f}"
        )

3. Impossible Travel (30 points)#

Uses the Haversine formula to detect physically impossible location changes.

from math import radians, sin, cos, sqrt, atan2

def haversine_distance(lat1, lon1, lat2, lon2):
    """Calculate great-circle distance between two points"""
    R = 6371  # Earth's radius in km

    dlat = radians(lat2 - lat1)
    dlon = radians(lon2 - lon1)

    a = sin(dlat/2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon/2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))

    return R * c

def evaluate(self, transaction, last_transaction):
    distance = haversine_distance(
        transaction.lat, transaction.lon,
        last_transaction.lat, last_transaction.lon
    )
    time_diff_hours = (transaction.time - last_transaction.time).total_seconds() / 3600

    # 500km in 1 hour = impossible by commercial travel
    if distance > 500 and time_diff_hours < 1:
        return RuleResult(
            triggered=True,
            score=30.0,
            reason=f"Impossible travel: {distance:.0f}km in {time_diff_hours:.1f} hours"
        )

4. Unusual Hours (10 points)#

Transactions between 12am-5am are flagged.

5. New Device (15 points)#

First-time device with amount >$500 triggers this rule.

6. Country Blacklist (85 points)#

Immediate high-risk flag for sanctioned countries (KP, IR, SY).


Isolation Forest ML Model (10% of Detection)#

The ML component uses Isolation Forest, an unsupervised anomaly detection algorithm that requires no training data.

Why Isolation Forest?#

FeatureBenefit
UnsupervisedNo labeled fraud data needed
Fast inference<10ms per transaction
InterpretableAnomaly score is intuitive
Zero trainingWorks out of the box

Feature Engineering#

We extract 9 features from each transaction:

def extract_features(self, transaction, user_history):
    features = {
        'amount': transaction.amount,
        'hour_of_day': transaction.timestamp.hour,
        'day_of_week': transaction.timestamp.weekday(),
        'days_since_last_txn': self.days_since_last(user_history),
        'avg_amount_last_10': self.rolling_average(user_history, 10),
        'std_amount_last_10': self.rolling_std(user_history, 10),
        'txn_velocity_hour': self.count_last_hour(user_history),
        'txn_velocity_day': self.count_last_day(user_history),
        'geo_distance': self.distance_from_last(transaction, user_history),
    }
    return self.normalize(features)  # Z-score normalization

Model Implementation#

from sklearn.ensemble import IsolationForest

class AnomalyDetector:
    def __init__(self):
        self.model = IsolationForest(
            n_estimators=100,
            contamination=0.01,  # Expected fraud rate
            random_state=42
        )
        self.is_fitted = False

    def score(self, features):
        """Return anomaly score (0-100)"""
        if not self.is_fitted:
            # Use decision function without fitting
            return self.heuristic_score(features)

        # sklearn returns -1 to 1, we convert to 0-100
        raw_score = self.model.decision_function([features])[0]
        return max(0, min(100, (1 - raw_score) * 50))

Real-Time WebSocket Architecture#

The system uses three independent WebSocket channels for live updates:

Channel Design#

# backend/src/backend/api/v1/websocket.py

class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, List[WebSocket]] = {
            "transactions": [],
            "alerts": [],
            "stats": []
        }

    async def broadcast(self, channel: str, message: dict):
        """Broadcast to all connections on a channel"""
        for connection in self.active_connections[channel]:
            await connection.send_json(message)

@router.websocket("/ws/transactions")
async def transactions_websocket(websocket: WebSocket):
    await manager.connect("transactions", websocket)
    try:
        while True:
            # Heartbeat every 30 seconds
            await asyncio.sleep(30)
            await websocket.send_json({"type": "heartbeat"})
    except WebSocketDisconnect:
        manager.disconnect("transactions", websocket)

Frontend Integration#

// frontend/src/contexts/WebSocketContext.tsx

export const WebSocketProvider: React.FC = ({ children }) => {
  const [transactions, setTransactions] = useState<Transaction[]>([]);

  useEffect(() => {
    const ws = new WebSocket('ws://localhost:8000/api/v1/ws/transactions');

    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      if (data.type === 'new_transaction') {
        setTransactions(prev => [data.transaction, ...prev].slice(0, 100));
      }
    };

    return () => ws.close();
  }, []);

  return (
    <WebSocketContext.Provider value={{ transactions }}>
      {children}
    </WebSocketContext.Provider>
  );
};

AI-Powered Explanations with Claude#

For flagged transactions, we generate human-readable explanations using Claude Haiku:

# backend/src/backend/services/claude_explainer.py

class ClaudeExplainer:
    def __init__(self):
        self.client = anthropic.Anthropic()
        self.model = "claude-3-haiku-20240307"  # Cheapest, fastest

    async def explain(self, transaction, triggered_rules, score):
        prompt = f"""
        Analyze this transaction and explain the fraud risk in 2-3 sentences:

        Transaction: ${transaction.amount} at {transaction.merchant}
        Location: {transaction.city}, {transaction.country}
        Time: {transaction.timestamp}

        Triggered Rules: {triggered_rules}
        Risk Score: {score}/100

        Provide a clear, analyst-friendly explanation.
        """

        response = await self.client.messages.create(
            model=self.model,
            max_tokens=300,
            messages=[{"role": "user", "content": prompt}]
        )

        return response.content[0].text

Example output:

"This transaction was flagged due to impossible travel detection. The cardholder made a purchase in New York at 2:15 PM, but the previous transaction was in Los Angeles just 45 minutes earlier—a physical impossibility. Combined with the unusual $2,500 amount (3x the user's average), this strongly indicates potential card cloning. Recommend immediate card freeze and customer verification."


API Design#

The system exposes a comprehensive REST API:

API Documentation

Core Endpoints#

EndpointMethodDescription
/api/v1/transactions/scorePOSTScore a transaction in real-time
/api/v1/transactionsGETList transactions with filters
/api/v1/alertsGETGet fraud alerts
/api/v1/analytics/dashboardGETDashboard metrics
/api/v1/analytics/trendsGET7-day fraud trends

Scoring Request/Response#

curl -X POST http://localhost:8000/api/v1/transactions/score \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": "user_123",
    "amount": 1500.00,
    "currency": "USD",
    "merchant_name": "Electronics Store",
    "merchant_category": "electronics",
    "location": {
      "latitude": 40.7128,
      "longitude": -74.0060,
      "country": "US",
      "city": "New York"
    },
    "device_id": "device_456"
  }'

Response:

{
  "transaction_id": "txn_abc123",
  "fraud_score": 72.3,
  "risk_level": "HIGH",
  "is_blocked": true,
  "triggered_rules": ["velocity_check", "high_amount"],
  "explanation": "Transaction blocked due to rapid transaction velocity...",
  "processing_time_ms": 45
}

Database Schema#

Transaction Model#

# backend/src/backend/models/transaction.py

class Transaction(Base):
    __tablename__ = "transactions"

    id = Column(UUID, primary_key=True, default=uuid4)
    user_id = Column(UUID, ForeignKey("users.id"), nullable=False)

    # Financial
    amount = Column(Numeric(12, 2), nullable=False)
    currency = Column(String(3), default="USD")

    # Merchant
    merchant_name = Column(String(255))
    merchant_category = Column(String(100))

    # Location
    latitude = Column(Float)
    longitude = Column(Float)
    country = Column(String(2))
    city = Column(String(100))

    # Device
    device_id = Column(String(255))
    ip_address = Column(String(45))
    user_agent = Column(Text)

    # Fraud Detection Results
    fraud_score = Column(Float, default=0.0)
    risk_level = Column(Enum('LOW', 'MEDIUM', 'HIGH'))
    is_fraud = Column(Boolean, default=False)
    is_blocked = Column(Boolean, default=False)
    triggered_rules = Column(JSONB, default=list)

    # Timestamps
    created_at = Column(DateTime, server_default=func.now())

Deployment with Docker#

Docker Compose Setup#

# docker-compose.yml
services:
  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: fraud_detection
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 10s

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]

  backend:
    build: ./backend
    environment:
      - DATABASE_URL=postgresql+asyncpg://postgres:postgres@postgres:5432/fraud_detection
      - REDIS_HOST=redis
    ports:
      - "8000:8000"
    depends_on:
      postgres:
        condition: service_healthy
      redis:
        condition: service_healthy

  frontend:
    build: ./frontend
    environment:
      - VITE_API_URL=http://localhost:8000
    ports:
      - "3000:80"
    depends_on:
      - backend

Quick Start#

# Clone and start
git clone https://github.com/MinhQuanBuiSco/Azure.git
cd Azure/fraud_detection

# Start all services
make init  # First time setup
make up    # Start services

# Generate test transactions
make generate-txns

# Access
# Frontend: http://localhost:3000
# API Docs: http://localhost:8000/docs

Performance Considerations#

Caching Strategy#

# User history caching with Redis
class CacheService:
    async def get_user_history(self, user_id: str) -> List[Transaction]:
        cached = await self.redis.get(f"user_history:{user_id}")
        if cached:
            return json.loads(cached)

        # Fetch from DB and cache
        history = await self.db.get_user_transactions(user_id, limit=100)
        await self.redis.setex(
            f"user_history:{user_id}",
            300,  # 5 minute TTL
            json.dumps(history)
        )
        return history

Async Everything#

# All external calls are async
async def score_transaction(self, transaction: TransactionCreate):
    # Run detection methods concurrently
    rule_score, ml_score, azure_score = await asyncio.gather(
        self.rule_engine.evaluate(transaction),
        self.anomaly_detector.score(transaction),
        self.azure_detector.detect(transaction)
    )

    final_score = (
        rule_score * 0.85 +
        ml_score * 0.10 +
        azure_score * 0.05
    )

    return final_score

Key Takeaways#

Building a production fraud detection system requires:

  1. Multi-model ensemble - Don't rely on single detection method
  2. Interpretable rules - Business rules provide explainability
  3. Real-time processing - WebSockets for instant updates
  4. AI explanations - Claude Haiku for analyst-friendly insights
  5. Proper caching - Redis for sub-millisecond user history lookup
  6. Async architecture - Concurrent processing for low latency

What's Next?#

Future enhancements could include:

  • Graph neural networks for network fraud detection
  • A/B testing framework for rule optimization
  • Feedback loop for model retraining
  • Multi-region deployment with Azure Traffic Manager

Resources#


Found this helpful? Check out my other posts on building production AI systems!