Smart Car AI Security Agent – Full Project Blueprint
This blueprint turns your 5 trained .pkl models into a production-style, on-vehicle
safety system with automated emergency response using a cloud LLM (Gemini) for
reasoning and coordination.
---
0) What you already have
attack.pkl → detects fuzzy/DOS/RPM anomalies on vehicle bus/telemetry
attack2.pkl → detects GPS spoofing
attack3.pkl → Accident detection from dashcam
attack4.pkl → Driver drowsiness detection
attack5.pkl → (you can dedicate this to lane-departure, distraction, or additional
anomaly layer — placeholders included below)
Gemini API access (for emergency reasoning, route planning prompts, composing
concise messages, etc.)
> Note: “LLM in .pkl” is unusual—pickle suggests scikit-learn/GBM/XGBoost, etc.
We’ll wrap them as Model Services regardless of framework.
---
1) High-level architecture
┌──────────────────────── Vehicle/Edge (Raspberry Pi 5 or Jetson)
────────────────────────┐
│
│
│ Sensors/Feeds → Ingestion → Preprocess → Model Services → Fusion/Policy →
Actions │
│ • CAN/OBD-II (async) (feature (5 .pkl) (severity
│
│ • GPS/IMU extraction) scoring,
rules, │
│ • Dashcam video state
machine) │
│ • DMS cam
│
│
│
│ MQTT/Kafka/Redis Streams
───────────────────────────────────────────────▶ │
│
│
│ Cloud
───────────────────────────────────┐│
└──────────────────────────────────────────────────────────────────────────────────
──────┘│
│
│
▼
│
Emergency Orchestrator (FastAPI)
│
• Calls Gemini for plan+reasoning
│
• Fetches police/hospital contacts
│
• Notifies: family, police, hospital
│
• Shares live location + best route
│
│
Observability: Prometheus + Grafana
│
Storage: Timeseries (TimescaleDB) + Logs
│
Core ideas
Each .pkl runs in its own microservice (or process) with a common input schema and
emits a standard event.
A Fusion & Policy Engine converts per-model signals into a unified risk score +
incident type.
An Emergency Orchestrator uses Gemini to choose steps and compose outreach
messages. Your system performs the actual calls/SMS/API pings.
---
2) Unified Event Schema (JSON)
{
"ts": 1735776482,
"source": "attack4_drowsiness",
"vehicle_id": "CAR-001",
"location": {"lat": 25.615, "lon": 85.141, "speed_kmh": 53.1, "accuracy_m": 4.2},
"evidence": {"score": 0.92, "features": {"eye_aspect_ratio": 0.14}},
"labels": ["drowsiness"],
"severity": 0.82,
"confidence": 0.91,
"ttl_s": 10
}
> All models should output this structure. severity and confidence can be derived
if your model only returns a probability.
---
3) Directory layout
smart-car-security/
├─ services/
│ ├─ gateway/ # FastAPI entrypoint on the car
│ ├─ ingestion/
│ │ ├─ can_reader.py # CAN/OBD-II via python-can
│ │ ├─ gps_reader.py # gpsd or serial NMEA
│ │ ├─ video_reader.py # OpenCV capture for dashcam & DMS
│ ├─ models/
│ │ ├─ attack_bus/ # wraps attack.pkl
│ │ ├─ gps_spoof/
│ │ ├─ accident/
│ │ ├─ drowsiness/
│ │ └─ aux_attack5/
│ ├─ fusion/
│ │ └─ policy_engine.py # event aggregator, state machine
│ ├─ actions/
│ │ ├─ alerts.py # SMS/call/webhook/email
│ │ └─ vehicle_actuators.py # horn, hazard lights (GPIO/CAN)
│ ├─ common/
│ │ ├─ schema.py # pydantic models for event schema
│ │ ├─ bus.py # MQTT/Redis stream helpers
│ │ └─ utils.py
│
├─ cloud/
│ ├─ emergency_orchestrator/ # FastAPI in cloud VM
│ │ ├─ gemini_client.py
│ │ ├─ responders.py # police/hospital/contacts
│ │ └─ routes.py
│ └─ storage/
│ ├─ timeseries_schema.sql
│ └─ grafana_dash.json
│
├─ deploy/
│ ├─ docker-compose.yml
│ ├─ prometheus.yml
│ └─ grafana/
│
├─ tests/
│ ├─ unit/
│ └─ e2e/
└─ README.md
---
4) FastAPI Gateway (Edge) – minimal skeleton
# services/gateway/main.py
from fastapi import FastAPI
from pydantic import BaseModel
from common.schema import Event
from common.bus import publish
app = FastAPI(title="Smart Car Gateway")
class Telemetry(BaseModel):
can: dict | None = None
gps: dict | None = None
dashcam_frame_path: str | None = None
dms_frame_path: str | None = None
@app.post("/ingest")
def ingest(t: Telemetry):
# You may fan-out to per-model topics
if t.can:
publish("topic.can", t.can)
if t.gps:
publish("topic.gps", t.gps)
if t.dashcam_frame_path:
publish("topic.dashcam", {"path": t.dashcam_frame_path})
if t.dms_frame_path:
publish("topic.dms", {"path": t.dms_frame_path})
return {"ok": True}
@app.post("/event")
def accept_event(e: Event):
publish("topic.events.raw", e.model_dump())
return {"ok": True}
common/schema.py
from pydantic import BaseModel
from typing import Optional, Dict, List
class Location(BaseModel):
lat: float
lon: float
speed_kmh: Optional[float] = None
accuracy_m: Optional[float] = None
class Event(BaseModel):
ts: int
source: str
vehicle_id: str
location: Optional[Location] = None
evidence: Dict
labels: List[str]
severity: float
confidence: float
ttl_s: int = 10
common/bus.py (MQTT sample; can swap to Redis Streams/Kafka)
import json, paho.mqtt.publish as publish
BROKER = "localhost"
def publish(topic: str, payload: dict):
publish.single(topic, json.dumps(payload), hostname=BROKER)
---
5) Model Wrappers (load .pkl safely)
Security note: Only load pickles you built. Prefer joblib and verify a local
checksum.
# services/models/attack_bus/service.py
import joblib, time
from fastapi import FastAPI
from common.schema import Event
from common.bus import publish
app = FastAPI(title="attack_bus_service")
MODEL = joblib.load("/models/attack.pkl") # verify SHA256 before load
@app.post("/predict")
def predict(can_features: dict):
# features → vector
x = feature_vectorize(can_features)
p = float(MODEL.predict_proba([x])[0][1]) # example
evt = Event(
ts=int(time.time()),
source="attack_bus",
vehicle_id=can_features.get("vehicle_id", "CAR-001"),
evidence={"score": p, "features": can_features},
labels=["bus_anomaly" if p>0.5 else "normal"],
severity=p,
confidence=p,
)
publish("topic.events.raw", evt.model_dump())
return {"prob": p}
> Duplicate the wrapper pattern for gps_spoof, accident (frame path → features →
probability), drowsiness (DMS), and aux_attack5.
Video wrappers (OpenCV)
# services/models/accident/service.py
import cv2, joblib, numpy as np, time
from fastapi import FastAPI
from common.schema import Event
from common.bus import publish
app = FastAPI(title="accident_service")
MODEL = joblib.load("/models/attack3.pkl")
@app.post("/predict")
def predict(frame_path: str, meta: dict = {}):
frame = cv2.imread(frame_path)
feats = extract_accident_features(frame)
p = float(MODEL.predict_proba([feats])[0][1])
evt = Event(
ts=int(time.time()), source="accident",
vehicle_id=meta.get("vehicle_id","CAR-001"),
evidence={"score": p}, labels=["accident_suspected" if p>0.6 else
"normal"],
severity=p, confidence=p
)
publish("topic.events.raw", evt.model_dump())
return {"prob": p}
---
6) Fusion & Policy Engine (state machine)
# services/fusion/policy_engine.py
import time, json
from collections import deque
from common.schema import Event
from common.bus import publish
WINDOW_S = 8
class IncidentState:
def _init_(self):
self.buffer = deque()
self.active = False
def push(self, e: Event):
now = int(time.time())
while self.buffer and now - self.buffer[0].ts > WINDOW_S:
self.buffer.popleft()
self.buffer.append(e)
def score(self):
# simple fusion: weighted max
labels = [l for e in self.buffer for l in e.labels]
sev = max([e.severity for e in self.buffer] + [0])
conf = max([e.confidence for e in self.buffer] + [0])
return labels, sev, conf
state = IncidentState()
# Subscribe to MQTT topic.events.raw and call on_event
def on_event(e_json):
e = Event(**json.loads(e_json))
state.push(e)
labels, sev, conf = state.score()
# Example policy
if "accident_suspected" in labels and sev > 0.7:
trigger_incident("accident", sev, conf, e.location)
elif "drowsiness" in labels and sev > 0.8 and "bus_anomaly" in labels:
trigger_incident("possible_hijack_or_impairment", sev, conf, e.location)
elif "gps_spoof" in labels and sev > 0.8:
trigger_incident("gps_spoofing", sev, conf, e.location)
def trigger_incident(kind: str, sev: float, conf: float, loc):
payload = {
"ts": int(time.time()),
"vehicle_id": "CAR-001",
"kind": kind,
"severity": sev,
"confidence": conf,
"location": loc.model_dump() if loc else None
}
publish("topic.incident.raised", payload)
---
7) Emergency Orchestrator (Cloud) with Gemini
Flow
1. Receives topic.incident.raised from edge.
2. Builds a structured prompt for Gemini: context + latest telematics + map hints.
3. Gets back: plan, message drafts, route preference.
4. System executes: fetch nearest hospital/police (your data or third-party APIs),
send notifications, and start a share-link.
Prompting pattern (pseudo)
SYSTEM: You are an emergency coordinator for in-vehicle incidents in India.
INPUT:
- Incident type: {kind}
- Severity: {sev}, Confidence: {conf}
- Last known location: {lat, lon, speed}
- Passenger profile (if available): blood group, allergies.
TASKS:
1) Summarize situation in <= 2 sentences.
2) Decide priority: police / hospital / breakdown.
3) Draft concise messages (< 280 chars) for: family, police, hospital.
4) Ask for missing critical info in one line if needed.
Code scaffolding
# cloud/emergency_orchestrator/routes.py
from fastapi import FastAPI
from pydantic import BaseModel
from responders import lookup_contacts, notify_all
from gemini_client import plan_actions
app = FastAPI(title="Emergency Orchestrator")
class Incident(BaseModel):
ts: int
vehicle_id: str
kind: str
severity: float
confidence: float
location: dict | None = None
@app.post("/incident")
def incident(i: Incident):
plan = plan_actions(i.model_dump()) # calls Gemini
contacts = lookup_contacts(i.location)
result = notify_all(plan, contacts, i.location)
return {"ok": True, "plan": plan, "notified": result}
gemini_client.py (pseudo; fill keys and SDK calls)
def plan_actions(incident: dict) -> dict:
# Compose prompt from incident; call Gemini
# Return {summary, priority, messages: {family, police, hospital}, route_prefs}
return {
"summary": "Collision likely; speed dropped to 0 near NH-19.",
"priority": ["hospital", "police"],
"messages": {
"family": "Emergency: possible accident near {lat,lon}. We are
dispatching help.",
"police": "Possible road accident; vehicle CAR-001; coords {lat,lon}.",
"hospital": "Incoming patient; suspected collision; coords {lat,lon}."
},
"route_prefs": {"avoid_tolls": True}
}
Responders
# cloud/emergency_orchestrator/responders.py
import requests
def lookup_contacts(location):
# 1) nearest hospital/police from your DB or third-party API
# 2) family contacts from your user profile DB
return {
"family": [{"name":"Brother","phone":"+91XXXX"}],
"police": [{"name":"112 Control Room","phone":"112"}],
"hospital": [{"name":"XYZ Trauma Center","phone":"+91YYYY"}]
}
def notify_all(plan, contacts, location):
# Implement SMS/call gateways (e.g., Twilio-like, local providers),
# and webhook notifications to your mobile app.
# Return dict of delivery receipts.
return {"family": "sent", "police": "sent", "hospital": "sent"}
> India emergency number is 112. You can still notify local police/hospital numbers
based on GPS.
---
8) Data & Feature Pipelines
CAN/OBD-II: python-can or cantools → parse PIDs → windowed stats (RPM jitter,
message entropy, missing IDs) → attack.pkl.
GPS Spoof: compare GPS vs IMU dead-reckoning & map-matching residuals; monitor
satellite ID churn → attack2.pkl.
Accident: frame-level motion vectors, sudden brightness change, lane boundary
violations (if available) → attack3.pkl.
Drowsiness: face landmarks, Eye Aspect Ratio, yawning rate, head pitch/roll →
attack4.pkl.
Aux (attack5): use for lane departure (Hough + CNN), phone distraction, or speeding
vs limit.
---
9) Persistence & Observability
Timeseries: TimescaleDB schema for events and incidents.
CREATE TABLE events (
ts TIMESTAMPTZ NOT NULL,
vehicle_id TEXT,
source TEXT,
labels TEXT[],
severity DOUBLE PRECISION,
confidence DOUBLE PRECISION,
location GEOGRAPHY(POINT),
evidence JSONB
);
SELECT create_hypertable('events','ts');
Metrics: Prometheus client in each service (latency, model confidence histograms,
incident counts).
Dashboards: Grafana panels for severity over time, false positives, model drift.
---
10) Deployment (Docker Compose – edge)
# deploy/docker-compose.yml
version: "3.9"
services:
mqtt:
image: eclipse-mosquitto
ports: ["1883:1883"]
gateway:
build: ./services/gateway
network_mode: host
attack_bus:
build: ./services/models/attack_bus
volumes:
- ./models:/models:ro
network_mode: host
gps_spoof:
build: ./services/models/gps_spoof
volumes:
- ./models:/models:ro
network_mode: host
accident:
build: ./services/models/accident
volumes:
- ./models:/models:ro
network_mode: host
drowsiness:
build: ./services/models/drowsiness
volumes:
- ./models:/models:ro
network_mode: host
aux_attack5:
build: ./services/models/aux_attack5
volumes:
- ./models:/models:ro
network_mode: host
fusion:
build: ./services/fusion
network_mode: host
---
11) Safety interlocks & legal notes
No automated calls to authorities unless your T&C and local law allow it; prefer
human-in-the-loop confirmation except for extreme confidence (e.g., accident &&
airbags_deployed).
Provide driver cancel within 10–20 seconds to prevent false alarms.
Store only necessary data; face video can be processed on-device and not uploaded.
Model versioning: keep MODEL_NAME@semver + SHA256 in every event.
---
12) Testing plan (quick wins)
1. Unit: mock feature vectors → assert per-model outputs.
2. Replay: feed recorded CAN & GPS logs with known anomalies.
3. Synthetic video: simulate accident/drowsy frames.
4. E2E: raise a fake incident → verify Gemini plan → check notifications delivered.
5. Latency: edge inference < 100 ms per model; fusion cycle 10 Hz.
---
13) Next steps (actionable checklist)
[ ] Convert each .pkl into a thin FastAPI model service (templates above).
[ ] Standardize feature extractors for CAN/GPS/Video.
[ ] Implement Fusion Policy with adjustable thresholds and cooldowns.
[ ] Stand up Emergency Orchestrator in cloud; wire to Gemini.
[ ] Build tests and Grafana dashboard.
[ ] Field test on a closed track / simulator; iterate thresholds.
---
14) Optional: On-device routing without maps API
If offline, pre-load hospital/police POIs and use A* on OpenStreetMap tiles cached
on SD card; Gemini can still author the outreach messages while your local router
picks a path.
---
Drop-in TODOs you can fill now
feature_vectorize, extract_accident_features, extract_drowsiness_features
Real Gemini SDK calls in gemini_client.py
SMS/call provider integration in responders.py
Real nearest-POI lookup (local DB + reverse geocode)
> Ping me with your .pkl model input features, and I’ll tailor the exact
feature_vectorize(...) code & video extractors.