AIOps & MLOps DevOps Projects
Part-1
AIOps Projects (AI in DevOps)
1. Log and Incident Management
Project 1. Intelligent Log Analysis: Use AI/ML to analyze logs from Kubernetes,
Jenkins, or Docker and automatically detect anomalies.
Project 2. AI-Driven Log Parsing & Alerting: Train an NLP model to classify
logs (info, warning, error, critical) and generate alerts in real time.
Project 3. AI-Driven Log Aggregation & Summarization: Use NLP to analyze
and summarize logs from multiple sources (Kubernetes, Jenkins, CloudWatch).
Project 4. Self-Learning Incident Management System: Build a system that
suggests automated fixes based on past incidents.
Project 5. AI-Driven Incident Response Playbook: Create a system that suggests
incident resolution steps based on past issues.
2. Resource and Cost Optimization
Project 1. Predictive Auto-Scaling: Develop an AI-driven system to predict
server/resource usage and auto-scale Kubernetes clusters.
Project 2. AI-Powered Cost Optimization: Use ML to analyze cloud billing data
and recommend cost-saving measures.
Project 3. AI-Powered Cloud Resource Optimization: Train an ML model to
recommend the best instance types and scaling configurations.
Project 4. AI-Assisted Infrastructure Cost Forecasting: Use time-series
forecasting to predict cloud costs and prevent budget overruns.
Project 5. AI-Assisted Container Resource Allocation: Use reinforcement
learning to optimize CPU/memory allocation in Docker containers.
3. Anomaly Detection & Failure Prediction
Project 1. Anomaly Detection in DevSecOps: Train an AI model to detect
security vulnerabilities in containerized applications.
Project 2. Kubernetes Node Failure Prediction: Predict pod/node failures in
Kubernetes clusters using AI-based anomaly detection.
Project 3. Anomaly Detection for Network Traffic: Use ML to identify unusual
patterns in network traffic and detect potential DDoS attacks.
Project 4. Predictive Disk Failure Monitoring: Analyze disk I/O metrics using
ML to predict hardware failures in advance.
Project 5. Smart CI/CD Failure Prediction: Train an AI model to analyze
Jenkins pipeline logs and predict build failures before they occur.
4. Incident Prediction and Root Cause Analysis
Project 1. Incident Prediction & Root Cause Analysis: Build a machine learning
model that predicts system failures based on historical monitoring data.
Project 2. AI-Based Root Cause Analysis (RCA): Build a model that correlates
incidents, logs, and metrics to identify the root cause of failures.
5. Security and Compliance
Project 1. Automated Security Policy Enforcement with AI: Use AI to detect
misconfigurations in firewall rules, IAM policies, and network security.
Project 2. AI-Powered SLA Compliance Monitoring: Analyze service response
times and uptime metrics using ML to predict SLA violations.
6. Self-Healing and Automation
Project 1. Self-Healing Infrastructure: Use AI to detect and auto-remediate
cloud infrastructure issues (e.g., restarting failed pods in Kubernetes).
Project 2. AI-Based Configuration Drift Detection: Build a model that monitors
infrastructure-as-code (Terraform, Ansible) for unintended changes.
7. AI for Log Analysis & Monitoring
Project 1. AI-Powered Log Filtering & Categorization: Implementing AI to
automatically filter out noise in logs and categorize relevant events for quicker
analysis.
Project 2. Real-Time Anomaly Detection in Logs: AI system that processes logs
in real time and raises alerts when unusual patterns or behavior are detected.
Project 3. Log Correlation for Performance Issues: Using AI to correlate logs
from different services to identify root causes of performance degradation or
service outages.
Project 4. AI-Based Multi-Source Log Aggregation: Aggregating logs from
diverse sources (cloud, on-prem, containers, etc.) using AI to spot cross-system
anomalies.
Project 5. Automated Log Tagging: Using AI to automatically tag logs with
metadata for faster identification and analysis.
8. AI for Predictive Scaling & Performance Optimization
Project 1. Predictive Load Balancing: AI model that predicts incoming traffic
and adjusts load balancing strategies accordingly to optimize resource usage and
minimize latency.
Project 2. AI-Driven Predictive Resource Allocation: Using AI to dynamically
allocate resources (CPU, memory, storage) based on predicted workloads in
containers and VMs.
Project 3. Predictive Autoscaling with Customizable Metrics: AI-based
auto-scaling system that considers custom application-specific metrics in addition
to CPU/memory load.
Project 4. AI-Powered Resource Bottleneck Detection: AI to analyze
performance metrics and detect resource bottlenecks that may affect scaling
decisions.
Project 5. Multi-Tenant Cloud Optimization: Using AI to ensure efficient
resource sharing in multi-tenant cloud environments without compromising
performance.
9. AI for Incident Prediction & Automated Remediation
Project 1. Automated Health Checks with AI: AI-powered health check system
that automatically checks infrastructure health and suggests fixes before failure.
Project 2. Dynamic Incident Severity Prediction: AI model that predicts the
potential severity of an incident based on past data, helping teams prioritize
responses.
Project 3. Proactive Failure Prevention System: AI-based system that uses
failure trends to predict and prevent critical infrastructure failures before they
happen.
Project 4. Predictive Incident Management in Multi-Cloud: AI to predict
incidents across different cloud environments and suggest remediation actions.
Project 5. AI-Powered Predictive Alerting: Using machine learning models to
identify patterns that precede incidents and proactively alert teams before failure
occurs.
10. AI for CI/CD & DevSecOps
Project 1. AI-Driven Test Suite Optimization: Using AI to automatically
optimize the sequence of tests in CI/CD pipelines to reduce the overall pipeline
runtime.
Project 2. AI for Continuous Security Assessment: Real-time security
vulnerability detection during the CI/CD pipeline, integrated into DevSecOps
practices.
Project 3. AI-Based Dependency Vulnerability Scanning: Implement AI-based
scanning of dependencies in code repositories for potential vulnerabilities or
license compliance issues.
Project 4. Automated Code Quality Review with AI: AI models that scan code
during CI/CD builds and provide insights into code quality, security, and
performance improvements.
Project 5. AI-Enhanced Test Failure Analysis: Using AI to automatically
analyze failed tests in CI/CD pipelines and suggest possible causes and fixes.
11. AI for Infrastructure & Network Monitoring
Project 1. AI-Powered Load Forecasting for Infrastructure: Predicting
infrastructure load for upcoming days or weeks using historical data and adjusting
resource allocation accordingly.
Project 2. Proactive Infrastructure Health Monitoring: AI model for identifying
potential infrastructure failures before they occur by monitoring system health in
real time.
Project 3. Network Traffic Anomaly Detection with AI: Using machine learning
to detect outliers in network traffic data (e.g., unusual spikes or drops), potentially
identifying attacks.
Project 4. Distributed Network Monitoring with AI: AI to monitor network
performance across distributed environments (hybrid clouds, multi-region setups)
and provide insights.
AIOps Projects (AI in DevOps)
1. Log and Incident Management
Project 1. Intelligent Log Analysis: Use AI/ML to analyze logs from Kubernetes,
Jenkins, or Docker and automatically detect anomalies.
In modern cloud-native environments like Kubernetes, Jenkins, and Docker, logs
are crucial for monitoring and troubleshooting applications. However, manually
analyzing vast amounts of log data can be overwhelming. Intelligent log analysis
powered by AI/ML automates the detection of anomalies, such as errors or unusual
behavior, in real-time. By leveraging models like Isolation Forest and LSTM, this
project aims to automatically identify issues from logs, reducing manual effort and
enabling quicker responses. It also integrates real-time monitoring with
Prometheus and visualization using Grafana, enhancing operational efficiency and
system reliability.
Intelligent Log Analysis with AI/ML
1. Log Collection and Integration
Logs from Kubernetes, Jenkins, and Docker will be collected using respective tools
and commands.
Kubernetes Logs:
To collect logs from Kubernetes:
kubectl logs <pod-name> -n <namespace> > kubernetes_logs.txt
Jenkins Logs:
Jenkins stores logs for jobs and system logs. You can extract logs using:
tail -f /var/log/jenkins/jenkins.log > jenkins_logs.txt
Docker Logs:
For Docker containers:
docker logs <container-id> > docker_logs.txt
Alternatively, set up Logstash or Fluentd to ingest logs from these services in
real-time.
2. Log Preprocessing
Logs will be preprocessed to clean, parse, and structure them for analysis.
Python Script for Log Preprocessing:
Install necessary libraries:
pip install pandas numpy
Preprocess the logs by reading them, cleaning, and structuring them:
python
import pandas as pd
import numpy as np
def preprocess_logs(log_file):
# Load logs
logs_df = pd.read_csv(log_file, sep="|", header=None, names=["timestamp",
"level", "message"])
# Convert timestamp to datetime
logs_df['timestamp'] = pd.to_datetime(logs_df['timestamp'])
# Create additional features
logs_df['hour'] = logs_df['timestamp'].dt.hour
logs_df['error_level'] = logs_df['level'].apply(lambda x: 1 if x == 'ERROR' else
0)
return logs_df
logs_df = preprocess_logs('logs.txt')
print(logs_df.head())
3. Anomaly Detection
Unsupervised Learning Model (Isolation Forest)
Isolation Forest can be used for detecting anomalies in log patterns.
Install necessary libraries:
pip install scikit-learn
Use Isolation Forest to detect anomalies:
python
from sklearn.ensemble import IsolationForest
# Prepare feature columns (hour and error_level)
X = logs_df[['hour', 'error_level']]
# Initialize Isolation Forest model
model = IsolationForest(contamination=0.05)
# Fit the model to the data
logs_df['anomaly'] = model.fit_predict(X)
# Mark anomalies
anomalies = logs_df[logs_df['anomaly'] == -1]
print(anomalies)
Deep Learning Model (LSTM)
For more advanced anomaly detection, a Long Short-Term Memory (LSTM)
model can be used for time-series data.
Install necessary libraries:
pip install tensorflow
LSTM model for anomaly detection in logs:
python
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
# Normalize the data
scaler = MinMaxScaler(feature_range=(0, 1))
logs_df[['hour', 'error_level']] = scaler.fit_transform(logs_df[['hour', 'error_level']])
# Prepare the data for LSTM (time-series format)
X = logs_df[['hour', 'error_level']].values
X = X.reshape((X.shape[0], X.shape[1], 1)) # Reshaping for LSTM input
# Define the LSTM model
model = tf.keras.Sequential([
tf.keras.layers.LSTM(50, activation='relu', input_shape=(X.shape[1], 1)),
tf.keras.layers.Dense(1)
])
model.compile(optimizer='adam', loss='mean_squared_error')
# Train the model (using part of the data as the training set)
model.fit(X, X, epochs=10, batch_size=32)
4. Real-time Anomaly Detection
To integrate real-time log collection and anomaly detection:
Set up Fluentd or Logstash to Collect Logs:
Logstash example configuration:
yaml
input {
file {
path => "/var/log/containers/*.log"
start_position => "beginning"
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "logs"
Prometheus and Alertmanager for Monitoring and Alerting:
1. Install Prometheus and Alertmanager for monitoring and alerting on
anomalies.
Prometheus rule to alert when anomalies are detected:
yaml
groups:
- name: anomaly_detection
rules:
- alert: AnomalyDetected
expr: anomaly_rate > 5
for: 1m
5. Visualization and Reporting
Grafana for Visualization:
Install Grafana:
sudo apt-get install grafana
● Create a Grafana dashboard that queries Elasticsearch for logs and displays
anomalies in real-time.
Kibana for Log Exploration:
Install Kibana:
sudo apt-get install kibana
● Configure Kibana to connect to Elasticsearch and create visualizations for
error trends, anomaly counts, and other key metrics.
6. Model Evaluation and Retraining
Model Evaluation:
Evaluate the anomaly detection model using classification metrics like
Precision, Recall, and F1-Score:
python
from sklearn.metrics import classification_report
# Assuming `y_true` is the actual labels and `y_pred` is the predicted
anomalies
print(classification_report(y_true, y_pred))
Retraining the Model:
To ensure the model adapts to new log patterns, retrain it periodically with
fresh logs:
python
model.fit(new_log_data, new_labels)
7. Complete Workflow for Logs, Model, and Alerting
1. Log Collection: Collect logs from Kubernetes, Jenkins, or Docker.
2. Preprocessing: Clean and structure the logs.
3. Model Training: Train an unsupervised model like Isolation Forest or a
time-series LSTM model for anomaly detection.
4. Real-time Detection: Use Fluentd or Logstash for real-time log collection
and integrate it with Prometheus for alerting.
5. Visualization: Use Grafana and Kibana for visualizing anomalies and log
trends.
6. Evaluation and Retraining: Continuously evaluate and retrain the model as
new logs come in.
Conclusion:
This project provides a comprehensive framework for analyzing logs from
Kubernetes, Jenkins, and Docker, leveraging AI/ML models to detect anomalies. It
integrates log collection, preprocessing, anomaly detection, and real-time
monitoring with visualization tools like Grafana and Kibana. Additionally, it
provides a feedback loop for evaluating and retraining the model as new data
comes in.
Project 2. AI-Driven Log Parsing & Alerting: Train an NLP model to classify
logs (info, warning, error, critical) and generate alerts in real time.
This project aims to teach how to use Artificial Intelligence (AI) to process logs
(records of system activities) and classify them into categories like info, warning,
error, and critical. Once classified, the system will alert you if something goes
wrong (for example, when an error or critical event happens).
Steps to Build the Project
1. Setting Up Your Environment
● Install Python, which is the programming language we will use.
● Install libraries that will help us process and analyze text. These libraries are
like tools that make tasks easier.
Command to install necessary libraries:
pip install scikit-learn pandas nltk tensorflow
2. Prepare Your Log Data
Logs are records that show what happens in a system. For example, a log could say
"The server started" or "Database connection failed."
● Collect your logs, either from a file or a live system.
● Make sure your logs have a "log level" (like info, error) and a message (like
"System started").
Example log data:
pgsql
25-02-06 00:12:45 [INFO] System started
2025-02-06 00:15:30 [ERROR] Database connection failed
3. Preprocessing the Data
The logs need to be cleaned up so the AI can understand them better. We'll:
● Make all the text lowercase (so the system doesn’t get confused by different
capitalizations).
● Remove punctuation and unnecessary words.
Example code to clean the logs:
python
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import string
def preprocess_text(text):
text = text.lower() # Convert everything to lowercase
text = ''.join([char for char in text if char not in string.punctuation]) # Remove
punctuation
tokens = word_tokenize(text) # Split the text into words
stop_words = set(stopwords.words('english'))
tokens = [word for word in tokens if word not in stop_words] # Remove
unnecessary words
return ' '.join(tokens)
data['processed_message'] = data['message'].apply(preprocess_text)
4. Labeling the Log Levels
To help the AI understand the log's type, we need to label the log levels (info,
warning, error, critical) into numbers. This makes it easier for the machine to work
with the data.
Code to convert log levels to numbers:
python
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
data['label'] = le.fit_transform(data['log_level'])
5. Train the AI to Classify Logs
Now, we train a machine learning model. This model learns from past logs and
tries to classify new logs into categories like info, error, etc.
● Split the data into training data (which the model will learn from) and testing
data (which we will use to check if the model is working well).
● We’ll use a method called Logistic Regression to train the model. It’s like
teaching the AI how to recognize patterns in logs.
Example code to train the model:
python
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
X_train, X_test, y_train, y_test = train_test_split(data['processed_message'],
data['label'], test_size=0.2)
tfidf = TfidfVectorizer(max_features=5000)
X_train_tfidf = tfidf.fit_transform(X_train)
X_test_tfidf = tfidf.transform(X_test)
model = LogisticRegression()
model.fit(X_train_tfidf, y_train)
y_pred = model.predict(X_test_tfidf)
print(classification_report(y_test, y_pred))
6. Real-Time Log Classification and Alerts
● Now, we set up the system to keep checking for new logs. Whenever a new
log appears, the system will classify it and send an alert if it’s an error or
critical log.
Example code to classify and send alerts:
python
import time
def classify_and_alert(log_message):
processed_message = preprocess_text(log_message)
message_tfidf = tfidf.transform([processed_message])
log_class = model.predict(message_tfidf)
log_level = le.inverse_transform(log_class)[0]
if log_level in ['error', 'critical']:
send_alert(log_message, log_level)
def send_alert(log_message, log_level):
# Logic for sending alerts (email, SMS, Slack, etc.)
print(f"ALERT: {log_level.upper()} log detected: {log_message}")
while True:
new_log = get_new_log_from_file_or_stream() # Implement log fetching logic
classify_and_alert(new_log)
time.sleep(1) # Check for new logs every second
7. Testing and Deploying
● Test the system with some logs to see how it works.
● Once it works, you can deploy it on a server or in the cloud, where it can
monitor logs in real-time.
8. Improvement & Scaling
● You can improve the model by training it with more data.
● You can also connect this system with log management tools like ELK Stack
or Splunk for better monitoring.
Conclusion
This AI-driven log parsing and alerting system helps you monitor logs, detect
problems, and get alerts when something goes wrong in real-time. It’s a great
starting point for learning about machine learning, AI, and how to handle logs in a
system.
Project 3. AI-Driven Log Aggregation & Summarization: Use NLP to analyze
and summarize logs from multiple sources (Kubernetes, Jenkins, CloudWatch).
Project Introduction
This project focuses on log aggregation and summarization using Natural
Language Processing (NLP). Logs from various sources like Kubernetes,
Jenkins, and AWS CloudWatch are collected, analyzed, and summarized using
AI. This helps in quick issue detection, reducing noise, and improving
observability.
Tech Stack
● Python (FastAPI for API, Pandas for log processing)
● NLP (spaCy, OpenAI/GPT, Transformers for summarization)
● Log Sources (Kubernetes logs, Jenkins logs, AWS CloudWatch)
● Elasticsearch (Optional, for centralized storage)
● Docker & Kubernetes (Deployment)
Step 1: Environment Setup
Install dependencies:
# Create and activate a virtual environment
python3 -m venv env
source env/bin/activate # On Windows, use `env\Scripts\activate`
# Install necessary libraries
pip install fastapi uvicorn pandas transformers spacy boto3 elasticsearch
Step 2: Collect Logs from Different Sources
Kubernetes Logs
kubectl logs <pod-name> -n <namespace> > logs/k8s_logs.txt
Jenkins Logs
tail -n 100 /var/log/jenkins/jenkins.log > logs/jenkins_logs.txt
AWS CloudWatch Logs (Using Boto3)
python
import boto3
def get_cloudwatch_logs(log_group, start_time, end_time):
client = boto3.client('logs', region_name='us-east-1')
response = client.filter_log_events(
logGroupName=log_group,
startTime=start_time,
endTime=end_time
logs = [event['message'] for event in response['events']]
return "\n".join(logs)
logs = get_cloudwatch_logs('/aws/lambda/my-function', 1700000000000,
1700100000000)
with open('logs/cloudwatch_logs.txt', 'w') as f:
f.write(logs)
Step 3: Process & Clean Logs
python
import pandas as pd
def clean_logs(file_path):
with open(file_path, 'r') as f:
logs = f.readlines()
logs = [log.strip() for log in logs if log.strip()]
return pd.DataFrame({'log_entry': logs})
df = clean_logs('logs/k8s_logs.txt')
print(df.head()) # Check processed logs
Step 4: Summarize Logs Using NLP
python
from transformers import pipeline
# Load a pre-trained summarization model
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
def summarize_logs(logs):
text = " ".join(logs[:500]) # Limit to avoid token limit
summary = summarizer(text, max_length=100, min_length=30,
do_sample=False)
return summary[0]['summary_text']
logs = df['log_entry'].tolist()
summary = summarize_logs(logs)
print("Summary:", summary)
Step 5: Deploy as API using FastAPI
python
from fastapi import FastAPI
app = FastAPI()
@app.post("/summarize/")
async def summarize_endpoint(logs: list[str]):
summary = summarize_logs(logs)
return {"summary": summary}
# Run API server
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Start API server:
uvicorn main:app --reload
Step 6: Dockerize & Deploy on Kubernetes
Dockerfile
FROM python:3.9
WORKDIR /app
COPY . /app
RUN pip install -r requirements.txt
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Build & Run Docker Image
docker build -t log-summarizer .
docker run -p 8000:8000 log-summarizer
Kubernetes Deployment
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: log-summarizer
spec:
replicas: 1
selector:
matchLabels:
app: log-summarizer
template:
metadata:
labels:
app: log-summarizer
spec:
containers:
- name: log-summarizer
image: log-summarizer:latest
ports:
- containerPort: 8000
Apply in Kubernetes:
kubectl apply -f deployment.yaml
Conclusion
This project automates log aggregation and summarization using NLP-based
AI. The API can be integrated with monitoring tools like Grafana for better
observability.
Project 4. Self-Learning Incident Management System: Build a system that
suggests automated fixes based on past incidents.
Introduction
Incident management is crucial for IT and DevOps teams. A Self-Learning
Incident Management System automates issue resolution by analyzing past
incidents and suggesting fixes. Using Flask, MongoDB, and Machine Learning,
this project helps reduce downtime and improve operational efficiency.
Project Features
● Incident Logging: Users can report incidents with descriptions.
● Database Storage: Incidents are stored in MongoDB.
● Machine Learning Model: Suggests fixes based on past incidents.
● Web Interface: Users can log and view incident details.
● REST API: Allows integration with other tools.
Technology Stack
● Backend: Flask (Python)
● Database: MongoDB
● Machine Learning: scikit-learn (TF-IDF & Logistic Regression)
● Frontend: HTML, Bootstrap
● Deployment: Docker, Kubernetes (Optional)
Step-by-Step Guide
1. Setup Environment
mkdir incident-management
cd incident-management
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install flask pymongo scikit-learn pandas nltk
2. MongoDB Installation (Ubuntu)
sudo apt update
sudo apt install -y mongodb
sudo systemctl start mongodb
sudo systemctl enable mongodb
Verify MongoDB is running:
mongo --eval "db.runCommand({ connectionStatus: 1 })"
3. Create a MongoDB Database
Connect to MongoDB and create a collection:
python
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017/")
db = client["incident_db"]
collection = db["incidents"]
sample_incident = {
"title": "Server Down",
"description": "The application server is not responding",
"solution": "Restart the server"
collection.insert_one(sample_incident)
print("Sample Incident Added!")
4. Create a Flask API
Create a file app.py:
python
from flask import Flask, request, jsonify
from pymongo import MongoClient
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
import pandas as pd
app = Flask(__name__)
# Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["incident_db"]
collection = db["incidents"]
# Train ML Model
def train_model():
data = list(collection.find({}, {"_id": 0, "description": 1, "solution": 1}))
df = pd.DataFrame(data)
if df.empty:
return None, None
vectorizer = TfidfVectorizer()
X = vectorizer.fit_transform(df["description"])
y = df["solution"]
model = LogisticRegression()
model.fit(X, y)
return model, vectorizer
model, vectorizer = train_model()
@app.route("/log_incident", methods=["POST"])
def log_incident():
data = request.json
collection.insert_one(data)
return jsonify({"message": "Incident Logged!"})
@app.route("/suggest_fix", methods=["POST"])
def suggest_fix():
if not model:
return jsonify({"error": "No data to train the model"}), 400
data = request.json
desc_vector = vectorizer.transform([data["description"]])
suggestion = model.predict(desc_vector)[0]
return jsonify({"suggested_fix": suggestion})
if __name__ == "__main__":
app.run(debug=True)
5. Run Flask App
export FLASK_APP=app.py
flask run
The API will be available at http://127.0.0.1:5000.
6. Test API (Using curl or Postman)
Log an Incident
curl -X POST http://127.0.0.1:5000/log_incident \
-H "Content-Type: application/json" \
-d '{"title": "Database Error", "description": "Connection timeout issue",
"solution": "Check network and restart DB"}'
Get Suggested Fix
curl -X POST http://127.0.0.1:5000/suggest_fix \
-H "Content-Type: application/json" \
-d '{"description": "The server is down"}'
7. Build a Simple Frontend
Create templates/index.html:
html
<!DOCTYPE html>
<html>
<head>
<title>Incident Management</title>
<link rel="stylesheet"
href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css">
</head>
<body class="container mt-5">
<h2>Incident Management System</h2>
<form id="incidentForm">
<input type="text" id="description" placeholder="Enter incident description"
class="form-control mb-2">
<button type="button" class="btn btn-primary" onclick="suggestFix()">Get
Fix</button>
</form>
<h4 class="mt-3" id="solution"></h4>
<script>
async function suggestFix() {
const desc = document.getElementById("description").value;
const response = await fetch('/suggest_fix', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({"description": desc})
});
const data = await response.json();
document.getElementById("solution").innerText = "Suggested Fix: " +
data.suggested_fix;
</script>
</body>
</html>
Run the Flask app and open http://127.0.0.1:5000 in a browser.
8. Containerize with Docker
Create a Dockerfile:
dockerfile
FROM python:3.9
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "app.py"]
Build and Run:
docker build -t incident-management .
docker run -p 5000:5000 incident-management
9. Deploy with Kubernetes (Optional)
Create deployment.yaml:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: incident-management
spec:
replicas: 2
selector:
matchLabels:
app: incident-management
template:
metadata:
labels:
app: incident-management
spec:
containers:
- name: incident-app
image: incident-management:latest
ports:
- containerPort: 5000
Apply Deployment:
kubectl apply -f deployment.yaml
Code Explanation
1. Flask API: Handles logging incidents and suggesting fixes.
2. MongoDB Storage: Stores incidents and solutions.
3. Machine Learning: Uses TF-IDF Vectorization and Logistic Regression
to suggest solutions.
4. Frontend (HTML, Bootstrap): Simple form to get incident fixes.
5. Docker & Kubernetes: Containerization and deployment for scalability.
Conclusion
This Self-Learning Incident Management System helps automate issue
resolution based on past incidents. By integrating Flask, MongoDB, and Machine
Learning, it improves IT incident response, reducing downtime and manual effort.
Project 5. AI-Driven Incident Response Playbook: Create a system that suggests
incident resolution steps based on past issues.
Incident management is crucial in IT operations. Traditional methods rely on
manual playbooks, which can be time-consuming and inconsistent. This project
introduces an AI-Driven Incident Response Playbook, which learns from past
incidents and suggests resolution steps automatically.
We will use:
● Python & Flask (Backend API)
● MongoDB (Storing past incidents)
● Machine Learning (Scikit-learn) (AI model for recommendations)
● Docker (Containerization)
● Jenkins (CI/CD pipeline)
Step-by-Step Implementation
1. Install Dependencies
Ensure you have the required tools installed:
sudo apt update && sudo apt install python3 python3-pip docker.io -y
pip3 install flask pymongo scikit-learn joblib
2. Set Up MongoDB for Incident Storage
MongoDB will store previous incidents and their resolutions.
Start MongoDB
docker run -d --name mongo -p 27017:27017 mongo
Create Incident Database
python
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017/")
db = client["incident_db"]
collection = db["incidents"]
incident_data = {
"issue": "Server down",
"resolution": "Restart the service using systemctl restart apache2"
collection.insert_one(incident_data)
print("Sample incident inserted")
3. Build AI Model
The model will predict the best resolution based on historical data.
Train AI Model
python
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.neighbors import KNeighborsClassifier
import joblib
# Sample Data
data = [
{"issue": "CPU usage high", "resolution": "Kill unnecessary processes"},
{"issue": "Server down", "resolution": "Restart the service"},
{"issue": "Memory leak", "resolution": "Check for memory-intensive apps"}
df = pd.DataFrame(data)
vectorizer = TfidfVectorizer()
X = vectorizer.fit_transform(df["issue"])
y = df["resolution"]
model = KNeighborsClassifier(n_neighbors=1)
model.fit(X, y)
joblib.dump(model, "incident_model.pkl")
joblib.dump(vectorizer, "vectorizer.pkl")
print("Model trained and saved")
4. Create Flask API
This API will suggest resolutions based on user input.
Install Flask
pip3 install flask
Create app.py
python
from flask import Flask, request, jsonify
import joblib
import pymongo
app = Flask(__name__)
# Load model
model = joblib.load("incident_model.pkl")
vectorizer = joblib.load("vectorizer.pkl")
# MongoDB Connection
client = pymongo.MongoClient("mongodb://localhost:27017/")
db = client["incident_db"]
collection = db["incidents"]
@app.route("/predict", methods=["POST"])
def predict():
data = request.json
issue_text = data["issue"]
vectorized_text = vectorizer.transform([issue_text])
prediction = model.predict(vectorized_text)[0]
# Save to MongoDB
collection.insert_one({"issue": issue_text, "suggested_resolution": prediction})
return jsonify({"resolution": prediction})
if __name__ == "__main__":
app.run(debug=True)
5. Dockerize the Project
Create Dockerfile
FROM python:3.9
WORKDIR /app
COPY . .
RUN pip install flask pymongo joblib scikit-learn
CMD ["python", "app.py"]
Build & Run
docker build -t ai-playbook .
docker run -p 5000:5000 ai-playbook
6. Testing the API
Send an Incident
curl -X POST http://localhost:5000/predict -H "Content-Type: application/json" -d
'{"issue": "CPU usage high"}'
Expected Response
json
{"resolution": "Kill unnecessary processes"}
7. Set Up CI/CD in Jenkins
Create Jenkinsfile
groovy
pipeline {
agent any
stages {
stage('Build') {
steps {
sh 'docker build -t ai-playbook .'
}
stage('Test') {
steps {
sh 'docker run -d --name test-ai -p 5000:5000 ai-playbook'
sh 'curl -X POST http://localhost:5000/predict -H "Content-Type:
application/json" -d \'{"issue": "Server down"}\''
stage('Deploy') {
steps {
sh 'docker tag ai-playbook your-dockerhub-username/ai-playbook:latest'
sh 'docker push your-dockerhub-username/ai-playbook:latest'
Run Pipeline
jenkins
Conclusion
✅ Uses AI to suggest solutions
✅
This AI-Driven Incident Response Playbook:
Stores incidents in MongoDB
✅ Exposes predictions via an API
✅ Runs in Docker for easy deployment
2. Resource and Cost Optimization
Project 1. Predictive Auto-Scaling: Develop an AI-driven system to predict
server/resource usage and auto-scale Kubernetes clusters.
Introduction
Auto-scaling is essential in cloud environments to handle traffic spikes efficiently.
This project builds an AI-driven predictive auto-scaler for Kubernetes clusters,
using machine learning to forecast resource usage and adjust cluster size
dynamically.
Step-by-Step Implementation
1. Prerequisites
Ensure you have the following installed:
● Kubernetes (kind/minikube/EKS/GKE/AKS)
● kubectl (Kubernetes CLI)
● Prometheus (for collecting metrics)
● Grafana (for visualization)
● Python (for ML model)
● Flask (to serve predictions)
● Docker (for containerization)
● KEDA (Kubernetes Event-Driven Autoscaling)
● Helm (for managing applications)
2. Set Up Kubernetes Cluster
kind create cluster --name auto-scaler
kubectl cluster-info
3. Deploy Prometheus for Metrics Collection
Install Prometheus using Helm:
helm repo add prometheus-community
https://prometheus-community.github.io/helm-charts
helm repo update
helm install prometheus prometheus-community/kube-prometheus-stack
Check Prometheus is running:
kubectl get pods -n default | grep prometheus
4. Deploy Grafana for Monitoring
kubectl port-forward svc/prometheus-grafana 3000:80
Access Grafana at http://localhost:3000
(Default username: admin, password: prom-operator)
5. Collect Metrics Using Prometheus API
Check resource utilization:
kubectl top nodes
kubectl top pods
Prometheus Query Example:
http://localhost:9090/api/v1/query?query=node_cpu_seconds_total
6. Train a Machine Learning Model for Prediction
Install Python Dependencies:
pip install pandas scikit-learn flask requests
Train the ML Model (train_model.py)
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
import joblib
# Simulated CPU Usage Data
data = pd.DataFrame({
'timestamp': np.arange(1, 101),
'cpu_usage': np.random.randint(30, 90, 100)
})
X = data[['timestamp']]
y = data['cpu_usage']
model = LinearRegression()
model.fit(X, y)
joblib.dump(model, 'cpu_predictor.pkl')
print("Model trained and saved.")
Run the script:
python train_model.py
7. Create a Flask API for Predictions
Create app.py:
python
from flask import Flask, request, jsonify
import joblib
import numpy as np
app = Flask(__name__)
model = joblib.load('cpu_predictor.pkl')
@app.route('/predict', methods=['POST'])
def predict():
data = request.json
timestamp = np.array(data['timestamp']).reshape(-1, 1)
prediction = model.predict(timestamp).tolist()
return jsonify({'predicted_cpu': prediction})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run the API:
python app.py
Test API:
curl -X POST http://localhost:5000/predict -H "Content-Type: application/json" -d
'{"timestamp": [101, 102, 103]}'
8. Containerize the Flask App
Create Dockerfile:
Dockerfile
FROM python:3.9
WORKDIR /app
COPY . /app
RUN pip install -r requirements.txt
CMD ["python", "app.py"]
Build and run the container:
docker build -t auto-scaler .
docker run -p 5000:5000 auto-scaler
9. Deploy Flask API in Kubernetes
Create deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
name: auto-scaler
spec:
replicas: 1
selector:
matchLabels:
app: auto-scaler
template:
metadata:
labels:
app: auto-scaler
spec:
containers:
- name: auto-scaler
image: auto-scaler:latest
ports:
- containerPort: 5000
---
apiVersion: v1
kind: Service
metadata:
name: auto-scaler-service
spec:
selector:
app: auto-scaler
ports:
- protocol: TCP
port: 80
targetPort: 5000
type: LoadBalancer
Apply deployment:
kubectl apply -f deployment.yaml
10. Configure KEDA for Auto-Scaling
Install KEDA:
helm repo add kedacore https://kedacore.github.io/charts
helm repo update
helm install keda kedacore/keda
Create scaledobject.yaml:
yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: auto-scaler
spec:
scaleTargetRef:
name: auto-scaler
minReplicaCount: 1
maxReplicaCount: 5
triggers:
- type: prometheus
metadata:
serverAddress: http://prometheus-server.default.svc.cluster.local
query: avg(rate(node_cpu_seconds_total[2m])) * 100
threshold: '70'
Apply scaling rule:
kubectl apply -f scaledobject.yaml
11. Test Auto-Scaling
Simulate load using hey (install via apt install hey):
hey -n 10000 -c 50 http://localhost:5000/predict
Check pods scaling up:
kubectl get pods -w
12. Monitor Auto-Scaling with Grafana
1. Open Grafana (http://localhost:3000)
2. Add Prometheus as a data source
Use queries like:
sql
avg(rate(node_cpu_seconds_total[2m])) * 100
1. ML Model (train_model.py):
○ Generates fake CPU data
○ Trains a Linear Regression model
○ Saves the model using joblib
2. Flask API (app.py):
○ Loads the trained model
○ Accepts a timestamp and predicts CPU usage
○ Returns prediction in JSON format
3. Dockerfile:
○ Defines a Python-based container
○ Copies app files and installs dependencies
○ Runs the Flask server
4. Kubernetes (deployment.yaml):
○ Deploys the Flask app
○ Exposes it as a LoadBalancer service
5. KEDA (scaledobject.yaml):
○ Uses Prometheus metrics to trigger auto-scaling
○ Scales when CPU usage exceeds 70%
Final Outcome
● Machine Learning predicts CPU usage
● KEDA auto-scales Kubernetes pods based on predictions
● Prometheus collects real-time metrics
● Grafana visualizes performance
Project 2. AI-Powered Cost Optimization: Use ML to analyze cloud billing data
and recommend cost-saving measures.
Introduction
Cloud costs can quickly spiral out of control if not monitored effectively. This
project leverages Machine Learning to analyze cloud billing data and suggest
cost-saving strategies. By using Python, Pandas, Scikit-Learn, and Matplotlib,
we’ll process billing data, detect cost anomalies, and predict future cloud expenses.
Project Steps
Step 1: Setup the Environment
Install Required Packages
Ensure you have Python installed and set up a virtual environment:
python3 -m venv cost-opt-env
source cost-opt-env/bin/activate # On Windows: cost-opt-env\Scripts\activate
pip install pandas numpy scikit-learn matplotlib seaborn
Step 2: Prepare the Cloud Billing Data
Obtain your cloud billing data from AWS, Azure, or GCP. The format should
include:
● Service Name (EC2, S3, RDS, etc.)
● Cost (USD)
● Usage Hours
● Region
● Instance Type
Example CSV File (cloud_billing.csv):
cs
Service, Cost, Usage_Hours, Region, Instance_Type
EC2, 120, 500, us-east-1, t3.medium
S3, 30, 200, us-east-1, N/A
RDS, 80, 300, us-west-2, db.m5.large
Step 3: Load and Preprocess Data
Create a Python script cost_optimization.py
python
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
# Load cloud billing data
df = pd.read_csv("cloud_billing.csv")
# Check for missing values
print(df.isnull().sum())
# Convert categorical data to numerical values
df = pd.get_dummies(df, columns=["Service", "Region", "Instance_Type"],
drop_first=True)
# Scale the data
scaler = StandardScaler()
scaled_data = scaler.fit_transform(df.drop(columns=["Cost"]))
# Display processed data
print(df.head())
Step 4: Detect Cost Anomalies with K-Means Clustering
We'll use K-Means Clustering to detect outliers (high-cost services).
python
# Apply K-Means Clustering
kmeans = KMeans(n_clusters=3, random_state=42)
df["Cluster"] = kmeans.fit_predict(scaled_data)
# Visualize clusters
plt.figure(figsize=(8,5))
sns.scatterplot(x=df["Usage_Hours"], y=df["Cost"], hue=df["Cluster"],
palette="viridis")
plt.xlabel("Usage Hours")
plt.ylabel("Cost")
plt.title("Cloud Cost Clustering")
plt.show()
📌 Interpretation:
● Services in high-cost clusters can be optimized (switch to reserved
instances, downgrade instance types, reduce unused services).
Step 5: Predict Future Cloud Costs using Linear Regression
We’ll train a model to predict next month’s cost based on historical usage.
python
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error
# Define input (X) and output (y) variables
X = df.drop(columns=["Cost", "Cluster"])
y = df["Cost"]
# Split dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
random_state=42)
# Train Linear Regression Model
model = LinearRegression()
model.fit(X_train, y_train)
# Make predictions
y_pred = model.predict(X_test)
# Evaluate model
mae = mean_absolute_error(y_test, y_pred)
print(f"Mean Absolute Error: {mae}")
# Predict cost for new data
new_data = np.array([[400, 1, 0, 0, 1]]) # Example usage hours & instance type
predicted_cost = model.predict(new_data)
print(f"Predicted Next Month's Cost: ${predicted_cost[0]:.2f}")
Step 6: Automate Cost-Saving Recommendations
We can automate cost-saving tips based on thresholds:
python
def suggest_cost_savings(row):
if row["Cost"] > 100:
return "Consider Reserved Instances or Auto-scaling"
elif row["Usage_Hours"] > 400:
return "Optimize Instance Usage or Rightsize"
else:
return "No changes needed"
df["Recommendation"] = df.apply(suggest_cost_savings, axis=1)
print(df[["Service", "Cost", "Recommendation"]])
Step 7: Deploy as a Flask API (Optional)
You can create a Flask API to accept billing data and return cost-saving
recommendations.
Install Flask
pip install flask
Create app.py
python
from flask import Flask, request, jsonify
import pandas as pd
app = Flask(__name__)
@app.route('/predict', methods=['POST'])
def predict():
data = request.json
df = pd.DataFrame([data])
df["Recommendation"] = df.apply(suggest_cost_savings, axis=1)
return jsonify(df.to_dict(orient="records"))
if __name__ == '__main__':
app.run(debug=True)
Run API
python app.py
Test API using cURL
curl -X POST -H "Content-Type: application/json" -d '{"Service": "EC2", "Cost":
150, "Usage_Hours": 500, "Region": "us-east-1", "Instance_Type": "t3.medium"}'
http://127.0.0.1:5000/predict
Conclusion
● We used K-Means Clustering to detect high-cost services.
● Linear Regression was used to predict future costs.
● Automated cost-saving recommendations help optimize cloud spending.
● Optional API enables integration with real-world applications.
Project 3. AI-Powered Cloud Resource Optimization: Train an ML model to
recommend the best instance types and scaling configurations.
Introduction
Cloud computing offers flexibility, but choosing the right instance type and scaling
strategy can be complex. This project focuses on training a Machine Learning
(ML) model to analyze past resource usage data and recommend optimal cloud
instance types and auto-scaling configurations. The goal is to minimize cost while
maintaining performance.
Step-by-Step Guide
1. Setup the Environment
Prerequisites
● Python 3.x
● AWS CLI (or any cloud provider SDK)
● Jupyter Notebook
● Required Python libraries: pandas, numpy, scikit-learn, matplotlib, seaborn
Install Required Libraries
pip install pandas numpy scikit-learn matplotlib seaborn boto3
2. Collect and Prepare Data
Cloud resource optimization requires data such as:
● CPU, memory, and network usage logs
● Instance type and cost details
● Scaling history
Fetch Cloud Metrics Using AWS CLI
aws cloudwatch get-metric-statistics --namespace AWS/EC2 \
--metric-name CPUUtilization --start-time 2024-02-01T00:00:00Z \
--end-time 2024-02-07T00:00:00Z --period 300 --statistics Average \
--dimensions Name=InstanceId,Value=i-1234567890abcdef \
--region us-east-1
3. Load and Explore Data
python
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# Load dataset (assuming we have a CSV file)
df = pd.read_csv("cloud_metrics.csv")
# Display first few rows
print(df.head())
# Basic statistics
print(df.describe())
# Visualize CPU usage
plt.figure(figsize=(10,5))
sns.lineplot(x=df["timestamp"], y=df["cpu_utilization"])
plt.title("CPU Utilization Over Time")
plt.show()
4. Feature Engineering
python
# Extract useful features
df['hour'] = pd.to_datetime(df['timestamp']).dt.hour
df['day'] = pd.to_datetime(df['timestamp']).dt.dayofweek
# Drop unnecessary columns
df.drop(columns=['timestamp'], inplace=True)
5. Train the Machine Learning Model
python
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error
# Define input features and target variable
X = df.drop(columns=["instance_type"])
y = df["instance_type"] # Labels: Optimal instance types
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
random_state=42)
# Train model
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Predict and evaluate
predictions = model.predict(X_test)
print("Mean Absolute Error:", mean_absolute_error(y_test, predictions))
6. Make Predictions
python
# Example: Predict best instance type for new usage data
new_data = [[30, 4]] # Example: CPU utilization 30%, Sunday
predicted_instance = model.predict(new_data)
print("Recommended Instance Type:", predicted_instance)
7. Deploy Model as an API (Flask)
python
from flask import Flask, request, jsonify
import pickle
app = Flask(__name__)
# Load trained model
with open("ml_model.pkl", "rb") as file:
model = pickle.load(file)
@app.route("/predict", methods=["POST"])
def predict():
data = request.json
prediction = model.predict([data["features"]])
return jsonify({"recommended_instance": prediction.tolist()})
if __name__ == "__main__":
app.run(port=5000)
Run API
python app.py
Test API
curl -X POST http://127.0.0.1:5000/predict -H "Content-Type: application/json" \
-d '{"features": [30, 4]}'
Conclusion
This project leverages ML to suggest the best cloud instances based on historical
usage. It reduces cost and improves performance by recommending optimal scaling
configurations.
Project 4. AI-Assisted Infrastructure Cost Forecasting: Use time-series
forecasting to predict cloud costs and prevent budget overruns.
Introduction
Cloud cost forecasting is crucial for optimizing infrastructure expenses and
avoiding budget overruns. This project leverages time-series forecasting
techniques using Python, Pandas, Matplotlib, Scikit-learn, and Facebook's
Prophet to analyze past cloud usage data and predict future costs.
By implementing AI-assisted forecasting, businesses can make informed
decisions about resource allocation, cost-saving strategies, and scaling policies.
Project Setup and Execution
Step 1: Prerequisites
Ensure you have the required dependencies installed.
# Update package list
sudo apt update
# Install Python and pip if not already installed
sudo apt install python3 python3-pip -y
# Create and activate a virtual environment (optional but recommended)
python3 -m venv cost_forecast_env
source cost_forecast_env/bin/activate
Step 2: Install Required Python Libraries
pip install pandas numpy matplotlib scikit-learn prophet
Step 3: Data Collection & Preprocessing
Create a file cloud_cost_data.csv with historical cost data.
Example CSV Format:
Date Cost
($)
2024-01-01
1200
2024-02-01 1250
2024-03-01 1300
2024-04-01 1100
2024-05-01 1350
Step 4: Implement AI-Based Forecasting
Create a Python script forecast_cost.py and add the following code:
python
import pandas as pd
import matplotlib.pyplot as plt
from prophet import Prophet
# Load dataset
df = pd.read_csv("cloud_cost_data.csv")
# Rename columns for Prophet
df.rename(columns={"Date": "ds", "Cost ($)": "y"}, inplace=True)
# Initialize Prophet model
model = Prophet()
model.fit(df)
# Create future dataframe (next 6 months)
future = model.make_future_dataframe(periods=6, freq='M')
# Predict future costs
forecast = model.predict(future)
# Plot results
fig = model.plot(forecast)
plt.title("Cloud Cost Forecast")
plt.xlabel("Date")
plt.ylabel("Cost ($)")
plt.show()
# Save forecast to CSV
forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].to_csv("cost_forecast.csv",
index=False)
Step 5: Running the Project
Execute the script:
python3 forecast_cost.py
This will generate a forecast graph and save the predicted values in
cost_forecast.csv.
Step 6: Explanation of Code
● Loading Data: Reads the historical cloud cost data from CSV.
● Preprocessing: Renames columns for compatibility with Prophet.
● Training Model: Fits the Prophet model to learn patterns from past data.
● Forecasting: Generates predictions for the next 6 months.
● Visualization: Displays a graph of historical and forecasted costs.
● Saving Output: Stores predicted values in a CSV file for further analysis.
Conclusion
This AI-based cost forecasting solution helps businesses anticipate infrastructure
expenses, optimize cloud usage, and prevent unexpected budget spikes. You can
further enhance the model by integrating real-time cloud billing data using APIs
from AWS, GCP, or Azure.
Project 5. AI-Assisted Container Resource Allocation: Use reinforcement
learning to optimize CPU/memory allocation in Docker containers.
Introduction
Managing CPU and memory allocation in Docker containers is challenging.
Allocating too many resources wastes capacity, while allocating too few degrades
performance. Reinforcement Learning (RL) can dynamically adjust these
allocations based on real-time usage, maximizing efficiency.
We will build an AI model using OpenAI Gym, Stable-Baselines3, and Docker
SDK for Python to optimize resource allocation.
Step 1: Setting Up the Environment
Install Dependencies
Ensure you have Python 3.8+, Docker, and required libraries installed.
# Update system and install Docker
sudo apt update && sudo apt install docker.io -y
sudo systemctl start docker
sudo systemctl enable docker
# Install Python and dependencies
python3 -m venv rl-container-env
source rl-container-env/bin/activate
pip install numpy pandas gym docker stable-baselines3
Check if Docker is working:
docker run hello-world
Step 2: Creating a Custom Gym Environment for Resource Allocation
Reinforcement Learning works by training an agent to interact with an
environment and learn the best actions. We will create a custom Gym
environment to simulate resource allocation for containers.
Create the Environment File
Create a new Python file docker_env.py
python
import gym
import docker
import numpy as np
from gym import spaces
class DockerResourceEnv(gym.Env):
def __init__(self):
super(DockerResourceEnv, self).__init__()
# Connect to Docker
self.client = docker.from_env()
self.container_name = "test_container"
# Action Space: CPU (0.1 to 2 cores), Memory (128MB to 2GB)
self.action_space = spaces.Box(low=np.array([0.1, 128]), high=np.array([2.0,
2048]), dtype=np.float32)
# Observation Space: CPU usage and Memory usage
self.observation_space = spaces.Box(low=0, high=np.inf, shape=(2,),
dtype=np.float32)
# Start a test container
self.container = self.client.containers.run("nginx", detach=True,
name=self.container_name, cpu_period=100000, cpu_quota=10000,
mem_limit="128m")
def step(self, action):
cpu, memory = action
# Apply new resource limits
self.container.update(cpu_quota=int(cpu * 100000),
mem_limit=f"{int(memory)}m")
# Simulate performance (use actual Docker stats)
stats = self.container.stats(stream=False)
cpu_usage = stats["cpu_stats"]["cpu_usage"]["total_usage"] /
stats["cpu_stats"]["system_cpu_usage"]
memory_usage = stats["memory_stats"]["usage"]
reward = -abs(cpu_usage - 0.5) - abs(memory_usage / int(memory) - 0.5) #
Penalize large deviations
return np.array([cpu_usage, memory_usage]), reward, False, {}
def reset(self):
return np.array([0.5, 128])
def render(self, mode="human"):
pass
def close(self):
self.container.stop()
self.container.remove()
Step 3: Training the RL Model
Create a new file train_rl.py to train the model.
python
import gym
from stable_baselines3 import PPO
from docker_env import DockerResourceEnv
# Create the environment
env = DockerResourceEnv()
# Load the RL model
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=50000)
# Save the trained model
model.save("rl_docker_allocator")
env.close()
This trains an AI agent using the Proximal Policy Optimization (PPO) algorithm
to optimize resource allocation.
Step 4: Running the AI Model for Real-Time Resource Allocation
Create run_ai.py to apply the trained model to live containers.
python
from stable_baselines3 import PPO
from docker_env import DockerResourceEnv
# Load trained model
model = PPO.load("rl_docker_allocator")
# Create environment
env = DockerResourceEnv()
# Run optimization loop
obs = env.reset()
for _ in range(100):
action, _states = model.predict(obs)
obs, reward, done, _ = env.step(action)
print(f"CPU: {action[0]}, Memory: {action[1]}, Reward: {reward}")
env.close()
Step 5: Testing the AI Model
Run the AI-powered resource allocator:
python run_ai.py
It will dynamically adjust CPU and memory allocation based on real-time
container usage.
Code Explanation for New Learners
1. Custom Gym Environment (docker_env.py)
○ Defines an RL environment where Docker containers act as agents.
○ The RL agent learns to optimize CPU/memory.
○ Uses Docker SDK to control container resources dynamically.
2. Training the RL Model (train_rl.py)
○ Uses Stable-Baselines3's PPO algorithm to train an AI model.
○ The AI learns the best CPU/memory allocation over time.
3. Applying AI Model (run_ai.py)
○ Loads the trained AI model.
○ Dynamically adjusts CPU/memory allocation based on live data.
Conclusion
This project demonstrates how AI and Reinforcement Learning can optimize
container resource allocation in real time. By training an RL model with OpenAI
Gym and Docker, we can efficiently manage CPU and memory in Docker
containers, improving performance and resource utilization.
3. Anomaly Detection & Failure Prediction
Project 1. Anomaly Detection in DevSecOps: Train an AI model to detect
security vulnerabilities in containerized applications.
Anomaly Detection in DevSecOps involves identifying unusual patterns that may
indicate security vulnerabilities in applications. Using machine learning (ML)
and security scanning tools, we can train a model to predict vulnerabilities based
on historical data.
2. Prerequisites
Ensure you have the following installed:
● Python (>=3.8)
● TensorFlow or PyTorch
● Docker & Kubernetes
● Trivy (for vulnerability scanning)
● Jupyter Notebook (for ML training)
3. Setup Environment
Install necessary dependencies:
sudo apt update && sudo apt install python3-pip -y
pip install tensorflow pandas numpy matplotlib scikit-learn seaborn trivy
4. Collect Security Data
Scan a Docker image using Trivy and save the output as a JSON file.
trivy image --format json -o vulnerabilities.json nginx:latest
This will provide a dataset containing vulnerabilities.
5. Preprocess Data
Convert JSON to CSV for ML training.
python
import json
import pandas as pd
# Load Trivy scan result
with open("vulnerabilities.json") as f:
data = json.load(f)
# Extract relevant fields
df = pd.DataFrame([
"package": vuln["PkgName"],
"severity": vuln["Severity"],
"vulnerability_id": vuln["VulnerabilityID"],
}
for result in data["Results"] for vuln in result["Vulnerabilities"]
])
# Save to CSV
df.to_csv("vulnerabilities.csv", index=False)
6. Train an AI Model
Using TensorFlow to detect vulnerabilities.
python
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
# Load dataset
df = pd.read_csv("vulnerabilities.csv")
# Encode categorical data
le = LabelEncoder()
df["severity"] = le.fit_transform(df["severity"])
# Train-test split
X_train, X_test, y_train, y_test = train_test_split(df[["severity"]], df["severity"],
test_size=0.2, random_state=42)
# Build a simple ML model
model = tf.keras.Sequential([
tf.keras.layers.Dense(16, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.fit(X_train, y_train, epochs=10, batch_size=8)
7. Containerize the Application
Create a Dockerfile for the trained model:
dockerfile
FROM python:3.8
WORKDIR /app
COPY . /app
RUN pip install tensorflow pandas numpy scikit-learn
CMD ["python", "predict.py"]
Build and run the container:
docker build -t anomaly-detector .
docker run -it anomaly-detector
8. Deploy on Kubernetes
Create a deployment.yaml file:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: anomaly-detector
spec:
replicas: 1
selector:
matchLabels:
app: anomaly-detector
template:
metadata:
labels:
app: anomaly-detector
spec:
containers:
- name: anomaly-detector
image: anomaly-detector:latest
ports:
- containerPort: 5000
Apply the deployment:
kubectl apply -f deployment.yaml
9. Monitor the Deployment
Check running pods and logs:
kubectl get pods
kubectl logs -f <pod-name>
10. Summary
● Used Trivy to scan for vulnerabilities.
● Processed the scan data for ML training.
● Built a TensorFlow-based anomaly detection model.
● Containerized and deployed it on Kubernetes.
● Monitored and tested the deployment.
This project integrates AI into DevSecOps to enhance automated vulnerability
detection in CI/CD pipelines
Project 2. Kubernetes Node Failure Prediction: Predict pod/node failures in
Kubernetes clusters using AI-based anomaly detection.
Kubernetes is widely used for managing containerized applications, but node
failures can impact availability and performance. This project leverages AI-based
anomaly detection to predict failures in advance, allowing proactive measures like
workload redistribution or auto-scaling.
Project Overview
● Use Case: Monitor Kubernetes node metrics and detect anomalies using
machine learning.
● Technology Stack: Kubernetes, Prometheus, Grafana, Python, Scikit-learn
(or TensorFlow/PyTorch), Flask (optional for API), Docker.
● Workflow:
○ Collect real-time metrics from Kubernetes nodes using Prometheus.
○ Process data and extract features.
○ Train an anomaly detection model.
○ Deploy the model in Kubernetes for real-time predictions.
Step-by-Step Implementation
Step 1: Set Up a Kubernetes Cluster
If using a local cluster:
kind create cluster --name k8s-ai
kubectl cluster-info
For a cloud-based setup (EKS, AKS, GKE), follow their respective guides.
Step 2: Install Prometheus for Metrics Collection
Create a monitoring namespace:
kubectl create namespace monitoring
Deploy Prometheus:
kubectl apply -f
https://github.com/prometheus-operator/prometheus-operator/releases/latest/downl
oad/bundle.yaml
Verify Prometheus is running:
kubectl get pods -n monitoring
Step 3: Set Up Node Exporter to Collect Node Metrics
kubectl apply -f
https://raw.githubusercontent.com/prometheus/node_exporter/main/examples/kube
rnetes/node-exporter-daemonset.yaml
Check logs:
kubectl logs -l app=node-exporter -n monitoring
Step 4: Build the Machine Learning Model
Install dependencies:
pip install pandas scikit-learn prometheus-api-client flask
Python Script (Train the Model)
Create train_model.py:
python
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
import joblib
# Simulated dataset (replace with Prometheus metrics in real implementation)
data = pd.DataFrame({
'cpu_usage': np.random.normal(50, 10, 1000),
'memory_usage': np.random.normal(60, 15, 1000),
'disk_io': np.random.normal(30, 5, 1000),
})
# Train an anomaly detection model
model = IsolationForest(contamination=0.05)
model.fit(data)
# Save the model
joblib.dump(model, "failure_prediction_model.pkl")
print("Model trained and saved.")
Run the script:
python train_model.py
Step 5: Deploy the Prediction API in Kubernetes
Create predictor.py:
python
from flask import Flask, request, jsonify
import joblib
import numpy as np
app = Flask(__name__)
model = joblib.load("failure_prediction_model.pkl")
@app.route('/predict', methods=['POST'])
def predict():
data = request.get_json()
features = np.array([data['cpu_usage'], data['memory_usage'],
data['disk_io']]).reshape(1, -1)
prediction = model.predict(features)
result = "Anomaly detected (Possible Failure)" if prediction[0] == -1 else
"Normal"
return jsonify({'prediction': result})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run locally to test:
python predictor.py
Test API:
curl -X POST http://localhost:5000/predict -H "Content-Type: application/json" -d
'{"cpu_usage": 80, "memory_usage": 90, "disk_io": 50}'
Step 6: Containerize and Deploy in Kubernetes
Create a Dockerfile:
dockerfile
FROM python:3.9
WORKDIR /app
COPY predictor.py failure_prediction_model.pkl /app/
RUN pip install flask joblib numpy
CMD ["python", "predictor.py"]
Build and push the image:
docker build -t <your-dockerhub-username>/k8s-failure-predictor .
docker push <your-dockerhub-username>/k8s-failure-predictor
Create a Kubernetes Deployment (predictor-deployment.yaml):
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: predictor
labels:
app: predictor
spec:
replicas: 1
selector:
matchLabels:
app: predictor
template:
metadata:
labels:
app: predictor
spec:
containers:
- name: predictor
image: <your-dockerhub-username>/k8s-failure-predictor
ports:
- containerPort: 5000
---
apiVersion: v1
kind: Service
metadata:
name: predictor-service
spec:
selector:
app: predictor
ports:
- protocol: TCP
port: 80
targetPort: 5000
type: LoadBalancer
Apply the deployment:
kubectl apply -f predictor-deployment.yaml
Check running pods:
kubectl get pods
Step 7: Visualizing Anomalies with Grafana
Deploy Grafana:
kubectl apply -f
https://raw.githubusercontent.com/grafana/grafana/main/deploy/kubernetes/grafana
.yaml
Access Grafana:
kubectl port-forward svc/grafana 3000:80 -n monitoring
Login (default: admin/admin) and configure Prometheus as a data source.
Explanation for New Learners
● Kubernetes Cluster: Manages applications and resources.
● Prometheus: Collects real-time node metrics.
● Node Exporter: Exposes system-level metrics.
● Machine Learning Model: Detects anomalies using IsolationForest.
● Flask API: Serves predictions via REST API.
● Docker & Kubernetes: Containerizes and deploys the predictor service.
● Grafana: Visualizes anomalies for monitoring.
Project 3. Anomaly Detection for Network Traffic: Use ML to identify unusual
patterns in network traffic and detect potential DDoS attacks.
Anomaly detection in network traffic is essential for cybersecurity. Machine
learning models can analyze network patterns and identify unusual activities that
may indicate potential attacks, such as Distributed Denial-of-Service (DDoS)
attacks. This project will guide you through building an anomaly detection model
using Python and Scikit-learn.
Project Steps
Step 1: Set Up Your Environment
Ensure you have Python installed and required libraries. Run the following
commands:
pip install numpy pandas scikit-learn matplotlib seaborn
Step 2: Import Required Libraries
python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
Step 3: Load and Prepare the Dataset
We'll use a synthetic network traffic dataset. You can also download a real dataset
like the CICIDS2017 dataset.
python
# Load dataset (simulated data for network traffic)
data = pd.read_csv("network_traffic.csv")
# Display first few rows
print(data.head())
# Check for missing values
print(data.isnull().sum())
Step 4: Data Preprocessing
Normalize and clean the data to prepare for model training.
python
# Select relevant features (assuming numerical columns)
features = ['packet_size', 'flow_duration', 'num_bytes', 'src_port', 'dst_port']
X = data[features]
# Normalize data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Split into training and testing sets
X_train, X_test = train_test_split(X_scaled, test_size=0.2, random_state=42)
Step 5: Train the Isolation Forest Model
The Isolation Forest is an unsupervised learning algorithm for anomaly
detection.
python
# Train Isolation Forest model
model = IsolationForest(contamination=0.05, random_state=42)
model.fit(X_train)
# Predict anomalies
y_pred = model.predict(X_test)
# Convert predictions (-1: Anomaly, 1: Normal) to readable format
y_pred = np.where(y_pred == -1, "Anomaly", "Normal")
# Add results to DataFrame
results = pd.DataFrame(X_test, columns=features)
results['Prediction'] = y_pred
# Display some predictions
print(results.head(10))
Step 6: Visualize Anomalies
python
# Convert predictions to numeric values (1: Normal, -1: Anomaly)
results['Prediction'] = np.where(results['Prediction'] == "Anomaly", -1, 1)
# Plot anomalies
plt.figure(figsize=(10, 6))
sns.scatterplot(x=results['flow_duration'], y=results['num_bytes'],
hue=results['Prediction'], palette={1: "blue", -1: "red"})
plt.title("Anomalies in Network Traffic")
plt.xlabel("Flow Duration")
plt.ylabel("Number of Bytes")
plt.show()
1. Importing Libraries
○ We use numpy and pandas for data handling.
○ matplotlib and seaborn help in visualization.
○ IsolationForest detects anomalies based on data distribution.
2. Loading and Preprocessing Data
○ We select relevant numerical features for model training.
○ The data is scaled to ensure consistent value ranges.
3. Training the Model
○ The IsolationForest algorithm identifies outliers in the dataset.
○ A contamination value of 0.05 means 5% of data is considered
anomalous.
4. Predicting and Visualizing Results
○ The model classifies network traffic as normal or anomalous.
○ A scatter plot visualizes unusual patterns in network traffic.
Project 4. Predictive Disk Failure Monitoring: Analyze disk I/O metrics using
ML to predict hardware failures in advance.
Hard drive failures can lead to data loss and downtime. Predicting failures in
advance helps in preventive maintenance. This project will use Machine
Learning (ML) to analyze disk I/O metrics and predict potential failures based
on SMART (Self-Monitoring, Analysis, and Reporting Technology) data.
Step 1: Set Up the Environment
1.1 Install Required Packages
Ensure your system has Python installed. Install the required libraries:
pip install pandas numpy scikit-learn matplotlib seaborn
For handling SMART data, install smartmontools:
sudo apt update && sudo apt install smartmontools
Step 2: Collect Disk Metrics
2.1 Enable SMART Monitoring
Check if SMART is enabled on your disk:
sudo smartctl -i /dev/sda
If it's disabled, enable it:
sudo smartctl -s on /dev/sda
2.2 Fetch SMART Data
To get disk health data:
sudo smartctl -A /dev/sda
To export SMART data to a file:
sudo smartctl -A /dev/sda > smart_data.txt
Step 3: Preprocess Data
3.1 Convert SMART Data to CSV
We extract attributes like Reallocated Sectors, Power-On Hours, Temperature,
and Error Rates into a CSV.
Create extract_smart_data.py:
python
import os
import pandas as pd
def parse_smart_data(file_path):
data = {}
with open(file_path, 'r') as file:
for line in file:
parts = line.split()
if len(parts) > 9 and parts[0].isdigit():
attr_id = int(parts[0])
value = int(parts[9]) # Raw value
data[attr_id] = value
return data
# Read SMART data file
smart_data = parse_smart_data("smart_data.txt")
# Convert to DataFrame
df = pd.DataFrame([smart_data])
# Save as CSV
df.to_csv("smart_metrics.csv", index=False)
print("SMART data extracted and saved as smart_metrics.csv")
Run the script:
python extract_smart_data.py
Step 4: Train Machine Learning Model
4.1 Load and Prepare Data
Create train_model.py:
python
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
# Load dataset (Assuming you have past failure data)
df = pd.read_csv("disk_failure_dataset.csv")
# Define features and labels
X = df.drop(columns=["failure"]) # Features: SMART attributes
y = df["failure"] # Labels: 0 (healthy), 1 (failed)
# Split into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
random_state=42)
# Train Random Forest Model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Predict and check accuracy
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Model Accuracy: {accuracy:.2f}")
# Save model
import joblib
joblib.dump(model, "disk_failure_model.pkl")
print("Model saved as disk_failure_model.pkl")
Run the script:
python train_model.py
Step 5: Predict Disk Failure in Real-Time
5.1 Create Prediction Script
Create predict_failure.py:
python
import joblib
import pandas as pd
import subprocess
# Load trained model
model = joblib.load("disk_failure_model.pkl")
# Function to get live SMART data
def get_smart_metrics():
result = subprocess.run(["sudo", "smartctl", "-A", "/dev/sda"],
capture_output=True, text=True)
data = {}
for line in result.stdout.split("\n"):
parts = line.split()
if len(parts) > 9 and parts[0].isdigit():
attr_id = int(parts[0])
value = int(parts[9])
data[attr_id] = value
return data
# Get live disk data
smart_metrics = get_smart_metrics()
df_live = pd.DataFrame([smart_metrics])
# Predict failure
prediction = model.predict(df_live)
status = "Failure Predicted! Backup your data!" if prediction[0] == 1 else "Disk is
healthy."
print(status)
Run the script:
python predict_failure.py
Step 6: Automate with a Cron Job
To automate failure detection, schedule a cron job:
crontab -e
Add this line to run the prediction script every hour:
0 * * * * /usr/bin/python3 /path/to/predict_failure.py
Step 7: Visualizing Disk Health Metrics
Create visualize_metrics.py:
python
import pandas as pd
import matplotlib.pyplot as plt
df = pd.read_csv("disk_failure_dataset.csv")
# Plot SMART attribute trends
plt.figure(figsize=(10, 6))
plt.plot(df["Power_On_Hours"], df["Reallocated_Sector_Ct"], marker="o",
label="Reallocated Sectors")
plt.xlabel("Power-On Hours")
plt.ylabel("Reallocated Sectors")
plt.title("Disk Health Over Time")
plt.legend()
plt.show()
Run:
python visualize_metrics.py
Conclusion
✅
We successfully:
✅
Collected SMART disk metrics
✅
Trained an ML model to predict failures
✅
Automated real-time failure prediction
Visualized disk health trends
Project 5. Smart CI/CD Failure Prediction: Train an AI model to analyze
Jenkins pipeline logs and predict build failures before they occur.
Introduction
CI/CD pipelines are critical in modern DevOps workflows, but frequent build
failures slow down development. This project aims to train an AI model to
analyze Jenkins pipeline logs and predict build failures before they happen, helping
teams take preventive action.
We will:
● Collect Jenkins logs
● Preprocess and clean data
● Train an AI/ML model
● Deploy the model in a Jenkins pipeline for real-time failure prediction
Step-by-Step Guide
Step 1: Set Up Your Environment
Install Required Tools
Ensure you have:
● Python (3.8+)
● Jenkins (with logs available)
● Docker (optional for containerization)
● Jupyter Notebook (for model development)
Install dependencies:
pip install pandas numpy scikit-learn joblib flask
Step 2: Collect Jenkins Logs
Jenkins stores logs in /var/log/jenkins/jenkins.log or you can extract them from the
Jenkins API.
To get logs using API:
curl -u USER:TOKEN
http://JENKINS_URL/job/JOB_NAME/lastBuild/consoleText > logs.txt
Step 3: Preprocess the Logs
Load and clean log data in Python:
python
import pandas as pd
import re
def load_logs(file_path):
with open(file_path, 'r') as f:
logs = f.readlines()
return logs
def preprocess_logs(logs):
cleaned_logs = []
for log in logs:
log = re.sub(r'\d+', '', log) # Remove numbers
log = log.lower().strip() # Convert to lowercase
cleaned_logs.append(log)
return cleaned_logs
logs = load_logs("logs.txt")
cleaned_logs = preprocess_logs(logs)
Step 4: Prepare Data for AI Model
Convert logs into numerical features for AI training.
python
from sklearn.feature_extraction.text import CountVectorizer
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(cleaned_logs)
● X is now a matrix representation of logs for training.
Label failed builds as 1 and successful builds as 0:
python
y = [1 if 'error' in log or 'failed' in log else 0 for log in cleaned_logs]
Step 5: Train an AI Model
Use Logistic Regression to predict failures.
python
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
random_state=42)
model = LogisticRegression()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
print("Model Accuracy:", accuracy_score(y_test, y_pred))
Step 6: Save & Deploy the Model
Save the model:
python
import joblib
joblib.dump(model, "failure_predictor.pkl")
joblib.dump(vectorizer, "vectorizer.pkl")
Step 7: Deploy AI Model in a Flask API
Create app.py to expose an API:
python
from flask import Flask, request, jsonify
import joblib
app = Flask(__name__)
model = joblib.load("failure_predictor.pkl")
vectorizer = joblib.load("vectorizer.pkl")
@app.route('/predict', methods=['POST'])
def predict():
data = request.json['log']
transformed_log = vectorizer.transform([data])
prediction = model.predict(transformed_log)
return jsonify({"failure": bool(prediction[0])})
if __name__ == '__main__':
app.run(port=5000)
Run the API:
python app.py
Step 8: Integrate AI Model into Jenkins
Modify your Jenkinsfile to send logs to the API:
groovy
pipeline {
agent any
stages {
stage('Build') {
steps {
script {
def logText = sh(script: 'cat logs.txt', returnStdout: true).trim()
def response = sh(script: """
curl -X POST http://localhost:5000/predict -H "Content-Type:
application/json" \
-d '{"log": "${logText}"}'
""", returnStdout: true).trim()
def failure = readJSON text: response
if (failure.failure) {
error "Build Failure Predicted! Stopping pipeline..."
}
Step 9: Test Your Pipeline
Trigger a Jenkins build and check if the AI model predicts failures correctly.
Conclusion
This project helps prevent build failures in CI/CD by analyzing logs with AI. You
can further:
● Train with real historical build logs.
● Use advanced NLP models (e.g., BERT) for better accuracy.
● Integrate with Slack for alerts.
4. Incident Prediction and Root Cause
Analysis
Project 1. Incident Prediction & Root Cause Analysis: Build a machine learning
model that predicts system failures based on historical monitoring data.
Introduction
Incident prediction and root cause analysis help organizations prevent system
failures by leveraging machine learning on historical monitoring data. This project
involves collecting system logs, training a model to predict failures, and providing
insights into root causes.
Step 1: Setup Environment
Install Required Dependencies
Ensure you have Python and necessary libraries installed:
pip install pandas numpy scikit-learn matplotlib seaborn xgboost
Step 2: Data Collection
For this project, we'll assume a dataset containing system metrics like CPU usage,
memory, disk I/O, network traffic, and failure logs. You can generate synthetic data
if no real dataset is available.
Sample Dataset (system_logs.csv)
yaml
timestamp,cpu_usage,memory_usage,disk_io,network_traffic,error_code
2024-02-01 10:00:00,70,65,120,300,0
2024-02-01 10:05:00,85,75,140,400,1
2024-02-01 10:10:00,90,80,160,450,1
...
● error_code=1 → System failure
● error_code=0 → No failure
Step 3: Load & Preprocess Data
Python Code for Data Loading
python
import pandas as pd
import numpy as np
# Load data
df = pd.read_csv("system_logs.csv")
# Convert timestamp to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'])
# Check for missing values
df.fillna(df.mean(), inplace=True)
print(df.head())
Step 4: Exploratory Data Analysis (EDA)
Before model training, visualize data trends.
Data Distribution
python
import matplotlib.pyplot as plt
import seaborn as sns
# Plot CPU Usage vs Failures
plt.figure(figsize=(8,5))
sns.boxplot(x=df["error_code"], y=df["cpu_usage"])
plt.title("CPU Usage vs System Failures")
plt.show()
Step 5: Feature Engineering
Convert categorical variables and scale numerical features.
python
from sklearn.preprocessing import StandardScaler
features = ["cpu_usage", "memory_usage", "disk_io", "network_traffic"]
X = df[features]
y = df["error_code"]
# Scale data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
Step 6: Train Machine Learning Model
Using XGBoost for Prediction
python
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# Split data
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2,
random_state=42)
# Train model
model = XGBClassifier()
model.fit(X_train, y_train)
# Predictions
y_pred = model.predict(X_test)
# Evaluate model
accuracy = accuracy_score(y_test, y_pred)
print(f"Model Accuracy: {accuracy:.2f}")
Step 7: Root Cause Analysis
Find key features contributing to failures.
python
importances = model.feature_importances_
# Plot feature importance
plt.figure(figsize=(8,5))
sns.barplot(x=features, y=importances)
plt.title("Feature Importance in System Failures")
plt.show()
Interpretation:
● If CPU Usage has the highest importance, optimizing CPU-heavy processes
can reduce failures.
● If Memory Usage is critical, increasing RAM or memory management
tuning might help.
Step 8: Deployment (Optional)
You can deploy the model as a REST API using Flask:
Flask API for Real-time Prediction
python
from flask import Flask, request, jsonify
import numpy as np
app = Flask(__name__)
@app.route('/predict', methods=['POST'])
def predict():
data = request.json
features = np.array([data["cpu_usage"], data["memory_usage"], data["disk_io"],
data["network_traffic"]]).reshape(1, -1)
prediction = model.predict(features)
return jsonify({"failure_prediction": int(prediction[0])})
if __name__ == '__main__':
app.run(debug=True)
Step 9: Run & Test API
Start the API:
python app.py
Test API with curl:
curl -X POST http://127.0.0.1:5000/predict -H "Content-Type: application/json" -d
'{"cpu_usage": 90, "memory_usage": 85, "disk_io": 180, "network_traffic": 500}'
Conclusion
This project provides a real-world approach to predicting system failures using
ML and performing root cause analysis. You can extend it with real-time
monitoring, alert systems, or integrations with DevOps tools.
Project 2. AI-Based Root Cause Analysis (RCA): Build a model that correlates
incidents, logs, and metrics to identify the root cause of failures.
Introduction
Root Cause Analysis (RCA) is crucial in IT operations to diagnose failures by
analyzing logs, metrics, and incidents. An AI-based RCA system automates this
process using machine learning, helping teams quickly identify and resolve issues.
In this project, we will develop a model that processes logs and metrics to
determine the root cause of failures.
Project Steps
1. Setup Environment
Ensure Python and necessary dependencies are installed.
# Update packages
sudo apt update && sudo apt upgrade -y
# Install Python and virtual environment
sudo apt install python3 python3-pip python3-venv -y
# Create and activate a virtual environment
python3 -m venv rca_env
source rca_env/bin/activate
# Install required Python libraries
pip install numpy pandas scikit-learn tensorflow keras matplotlib seaborn loguru
2. Prepare Dataset
We will use synthetic log data or fetch logs from a real system.
python
import pandas as pd
# Simulated log data
data = {
"timestamp": ["2024-02-10 10:00:00", "2024-02-10 10:01:00", "2024-02-10
10:02:00"],
"service": ["Database", "API", "Server"],
"log_message": ["Timeout error", "Slow response", "CPU overload"],
"error_level": ["High", "Medium", "Critical"]
df = pd.DataFrame(data)
print(df.head())
3. Data Preprocessing
Convert text-based logs into numerical form using NLP techniques like TF-IDF.
python
from sklearn.feature_extraction.text import TfidfVectorizer
vectorizer = TfidfVectorizer()
log_features = vectorizer.fit_transform(df["log_message"])
print("Transformed log messages:", log_features.toarray())
4. Build Machine Learning Model
Use a simple classification model to identify failure patterns.
python
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
# Simulated labels for training
labels = [1, 0, 1] # 1 = Failure, 0 = No Failure
X_train, X_test, y_train, y_test = train_test_split(log_features, labels,
test_size=0.2, random_state=42)
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
print("Model trained successfully.")
5. Predict Root Causes
Make predictions on new log entries.
python
new_logs = ["Database connection lost", "Server overheating detected"]
new_features = vectorizer.transform(new_logs)
predictions = model.predict(new_features)
print("Predictions:", predictions)
6. Visualizing Results
Plot logs and failure trends.
python
import matplotlib.pyplot as plt
failure_counts = df["error_level"].value_counts()
failure_counts.plot(kind="bar", title="Error Levels Distribution", color=["red",
"orange", "green"])
plt.show()
Explanation for Beginners
1. Data Collection: Logs from system services (Database, API, Server) are
collected.
2. Preprocessing: Logs are converted into numerical form using TF-IDF.
3. Model Training: A RandomForest model is trained to detect failure
patterns.
4. Prediction: The model predicts potential failures in new logs.
5. Visualization: Error levels are visualized for better insights.
This project provides a foundational AI-based RCA system, and it can be extended
with deep learning models and real-time log streaming.
5. Security and Compliance
Project 1. Automated Security Policy Enforcement with AI: Use AI to detect
misconfigurations in firewall rules, IAM policies, and network security.
Project Overview
This project automates security policy enforcement using AI by detecting
misconfigurations in firewall rules, IAM policies, and network security settings. It
leverages machine learning to analyze security policies and identify potential risks.
The project can be integrated into DevOps pipelines to ensure continuous security
compliance.
Project Implementation Steps
Step 1: Setup Environment
Ensure you have the necessary tools installed:
● Python 3.8+
● Virtual environment (venv)
● AWS CLI (for IAM policy analysis)
● Docker (for containerizing the application)
● Terraform (optional, for managing infrastructure)
Commands to Install Dependencies
# Update the system
sudo apt update && sudo apt upgrade -y
# Install Python and Virtual Environment
sudo apt install python3 python3-venv -y
# Create a virtual environment
python3 -m venv venv
source venv/bin/activate
# Install dependencies
pip install boto3 scikit-learn pandas requests flask
Step 2: Define AI Model for Policy Analysis
We will use machine learning to classify security configurations as secure or
misconfigured.
Code: ai_security_model.py
python
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib
# Sample dataset for training
data = {
"rule_id": [1, 2, 3, 4, 5],
"port": [22, 80, 443, 8080, 3389],
"action": [1, 0, 1, 0, 1], # 1 = Allow, 0 = Deny
"risk_level": [3, 1, 2, 4, 5] # Higher is riskier
}
df = pd.DataFrame(data)
# Define features and labels
X = df[["port", "action"]]
y = df["risk_level"]
# Train model
model = RandomForestClassifier()
model.fit(X, y)
# Save model
joblib.dump(model, "security_model.pkl")
print("Model trained and saved successfully.")
📌 Explanation:
● Creates a sample dataset with firewall rules
● Uses RandomForestClassifier to train a security risk model
● Saves the trained model for later use
Step 3: Detect Misconfigurations in IAM Policies
Using AWS IAM policies, we check for excessive permissions.
Code: iam_policy_checker.py
python
import boto3
import json
# Initialize AWS IAM client
iam = boto3.client("iam")
def check_policy(policy_arn):
policy = iam.get_policy(PolicyArn=policy_arn)
policy_version = iam.get_policy_version(
PolicyArn=policy_arn,
VersionId=policy["Policy"]["DefaultVersionId"]
document = policy_version["PolicyVersion"]["Document"]
# Analyze permissions
for statement in document["Statement"]:
if statement["Effect"] == "Allow" and statement["Action"] == "*":
print(f"Warning: Overly permissive policy detected in {policy_arn}")
check_policy("arn:aws:iam::aws:policy/AdministratorAccess")
📌 Explanation:
● Retrieves IAM policies from AWS
● Checks for overly permissive permissions ("Action": "*")
🔹 Commands to Run:
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export AWS_REGION="us-east-1"
python iam_policy_checker.py
Step 4: Firewall Rule Misconfiguration Detection
This script analyzes firewall rules to detect open ports.
Code: firewall_analyzer.py
python
import json
firewall_rules = """
[
{"port": 22, "protocol": "TCP", "action": "ALLOW"},
{"port": 3389, "protocol": "TCP", "action": "ALLOW"},
{"port": 443, "protocol": "TCP", "action": "ALLOW"}
"""
rules = json.loads(firewall_rules)
for rule in rules:
if rule["port"] in [22, 3389]:
print(f"Warning: High-risk port {rule['port']} is open.")
📌 Explanation:
● Reads firewall rules
● Detects risky open ports (22 for SSH, 3389 for RDP)
Step 5: Containerize the Application
Use Docker to package the security tool.
Dockerfile
dockerfile
FROM python:3.8
WORKDIR /app
COPY . /app
RUN pip install -r requirements.txt
CMD ["python", "firewall_analyzer.py"]
🔹 Commands to Build and Run:
docker build -t security-check .
docker run security-check
Step 6: Automate in CI/CD Pipeline (Jenkinsfile)
groovy
pipeline {
agent any
stages {
stage('Checkout') {
steps {
git 'https://github.com/your-repo/security-policy-check.git'
}
stage('Run Security Checks') {
steps {
sh 'python firewall_analyzer.py'
sh 'python iam_policy_checker.py'
stage('Deploy') {
steps {
sh 'docker build -t security-check .'
sh 'docker run security-check'
📌 Explanation:
● Pulls code from GitHub
● Runs security scripts
● Builds and runs Docker container
Step 7: Monitor Security Violations with Grafana & Prometheus
Use Prometheus to log security findings and visualize in Grafana.
Commands to Set Up Prometheus
docker run -d -p 9090:9090 --name=prometheus prom/prometheus
Commands to Set Up Grafana
docker run -d -p 3000:3000 --name=grafana grafana/grafana
This project provides a full-stack AI-powered security enforcement tool that
detects misconfigurations in firewall rules and IAM policies. You can integrate it
with CI/CD for automated security compliance and visual monitoring using
Grafana and Prometheus.
Project 2. AI-Powered SLA Compliance Monitoring: Analyze service response
times and uptime metrics using ML to predict SLA violations.
Service Level Agreements (SLAs) define the expected performance and reliability
of a service. This project builds an AI-powered monitoring system that analyzes
response times and uptime metrics, using machine learning (ML) to predict SLA
violations. It helps businesses proactively address performance issues before
breaching SLAs.
Project Overview
We will develop a Python-based solution using Flask for the API, PostgreSQL for
data storage, and Scikit-learn for ML-based SLA violation prediction. The system
will:
● Collect real-time service response times and uptime metrics
● Store data in PostgreSQL
● Train an ML model to predict SLA violations
● Visualize insights using Grafana
Step-by-Step Implementation
Step 1: Install Dependencies
Ensure your system has Python and PostgreSQL installed. Then, install the
required Python libraries:
pip install flask psycopg2 pandas scikit-learn requests matplotlib grafana-api
Step 2: Set Up PostgreSQL Database
Create the database and table to store service metrics.
CREATE DATABASE sla_monitor;
\c sla_monitor
CREATE TABLE service_metrics (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
response_time FLOAT,
uptime BOOLEAN
);
Step 3: Create a Flask API to Collect Metrics
Create a server.py file to collect and store service metrics.
python
from flask import Flask, request, jsonify
import psycopg2
from datetime import datetime
app = Flask(__name__)
# Database connection
conn = psycopg2.connect("dbname=sla_monitor user=postgres
password=yourpassword")
cur = conn.cursor()
@app.route('/metrics', methods=['POST'])
def collect_metrics():
data = request.get_json()
response_time = data['response_time']
uptime = data['uptime']
cur.execute("INSERT INTO service_metrics (response_time, uptime) VALUES
(%s, %s)", (response_time, uptime))
conn.commit()
return jsonify({"message": "Metrics saved!"}), 201
if __name__ == '__main__':
app.run(debug=True)
Step 4: Collect Metrics from a Service
Write a script to simulate collecting data from an API:
python
import requests
import time
import random
API_URL = "http://127.0.0.1:5000/metrics"
while True:
response_time = round(random.uniform(100, 1000), 2) # Simulated response
time (ms)
uptime = random.choice([True, False]) # Simulated uptime status
data = {"response_time": response_time, "uptime": uptime}
requests.post(API_URL, json=data)
time.sleep(5) # Collect metrics every 5 seconds
Step 5: Train an ML Model to Predict SLA Violations
Create a script to analyze historical data and predict SLA violations using
Scikit-learn.
python
import psycopg2
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
# Connect to database
conn = psycopg2.connect("dbname=sla_monitor user=postgres
password=yourpassword")
cur = conn.cursor()
# Load data
cur.execute("SELECT response_time, uptime FROM service_metrics")
data = cur.fetchall()
df = pd.DataFrame(data, columns=['response_time', 'uptime'])
# Prepare data
X = df[['response_time']]
y = df['uptime'].astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
random_state=42)
# Train model
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
# Test model
y_pred = model.predict(X_test)
print(f"Model Accuracy: {accuracy_score(y_test, y_pred) * 100:.2f}%")
# Save model
import joblib
joblib.dump(model, "sla_violation_predictor.pkl")
Step 6: Deploy ML Model as an API
Modify server.py to include an endpoint for prediction.
python
import joblib
import numpy as np
model = joblib.load("sla_violation_predictor.pkl")
@app.route('/predict', methods=['POST'])
def predict_sla_violation():
data = request.get_json()
response_time = np.array(data['response_time']).reshape(-1, 1)
prediction = model.predict(response_time)
return jsonify({"sla_violation": bool(prediction[0])})
Step 7: Visualize Metrics with Grafana
Install Grafana:
sudo apt update
sudo apt install -y grafana
sudo systemctl start grafana-server
● Connect PostgreSQL to Grafana and create dashboards for response times
and SLA violations.
Conclusion
This project builds an end-to-end AI-powered SLA monitoring system. It collects
real-time metrics, trains an ML model, and predicts SLA violations while
providing a Grafana dashboard for visualization.
6. Self-Healing and Automation
Project 1. Self-Healing Infrastructure: Use AI to detect and auto-remediate
cloud infrastructure issues (e.g., restarting failed pods in Kubernetes).
Self-healing infrastructure is an approach where cloud environments
automatically detect and remediate failures without human intervention. This
ensures high availability, reduced downtime, and improved system reliability.
In this project, we will:
● Use Prometheus to monitor Kubernetes pods.
● Apply AI/ML models to predict failures.
● Use Python automation to trigger remediation (e.g., restarting failed pods).
Step-by-Step Implementation
Step 1: Set Up a Kubernetes Cluster
If you don’t have a cluster, you can use kind (Kubernetes in Docker):
kind create cluster --name self-healing-cluster
To verify:
kubectl get nodes
Step 2: Deploy Prometheus for Monitoring
1. Install Prometheus in Kubernetes:
kubectl create namespace monitoring
helm repo add prometheus-community
https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus-community/kube-prometheus-stack -n
monitoring
2. Verify the installation:
kubectl get pods -n monitoring
Step 3: Deploy a Sample Application
Let’s create a simple Nginx deployment to test self-healing:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
labels:
app: nginx
spec:
replicas: 3
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:latest
ports:
- containerPort: 80
Apply the deployment:
kubectl apply -f nginx-deployment.yaml
Step 4: Create an AI-based Failure Prediction Model
We will use a simple Python AI model to detect failures using Prometheus
metrics.
1. Install dependencies:
pip install requests pandas scikit-learn
2. Python script to collect metrics from Prometheus:
python
import requests
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import time
import json
import os
PROMETHEUS_URL = "http://localhost:9090/api/v1/query"
def fetch_pod_status():
query = 'kube_pod_status_ready'
response = requests.get(PROMETHEUS_URL, params={'query': query})
data = response.json()
pod_data = []
for result in data['data']['result']:
pod_name = result['metric']['pod']
status = int(result['value'][1]) # 1 = Running, 0 = Failed
pod_data.append([pod_name, status])
return pd.DataFrame(pod_data, columns=['pod', 'status'])
def train_model():
df = fetch_pod_status()
X = df[['status']]
y = df['status'] # Labels: 1 (healthy), 0 (failed)
model = RandomForestClassifier()
model.fit(X, y)
return model
def detect_failure(model):
df = fetch_pod_status()
failed_pods = df[df['status'] == 0]['pod'].tolist()
if failed_pods:
print(f"Detected failed pods: {failed_pods}")
for pod in failed_pods:
restart_pod(pod)
def restart_pod(pod_name):
print(f"Restarting pod: {pod_name}")
os.system(f"kubectl delete pod {pod_name}")
if __name__ == "__main__":
model = train_model()
while True:
detect_failure(model)
time.sleep(10)
Step 5: Automate Self-Healing with a Kubernetes CronJob
1. Create a Kubernetes CronJob to run the script periodically:
yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: self-healing
spec:
schedule: "*/1 * * * *" # Runs every minute
jobTemplate:
spec:
template:
spec:
containers:
- name: self-healing
image: python:3.9
command: ["python", "/app/self-healing.py"]
volumeMounts:
- name: script-volume
mountPath: /app
volumes:
- name: script-volume
configMap:
name: self-healing-script
restartPolicy: OnFailure
2. Apply the CronJob
kubectl apply -f self-healing-cronjob.yaml
Step 6: Test the Self-Healing System
1. List the running pods:
kubectl get pods
2. Manually delete a pod to simulate failure:
kubectl delete pod <nginx-pod-name>
3. Check if the self-healing system restarts it:
kubectl get pods
How the Code Works
● The Python script fetches Prometheus metrics and predicts failures.
● If a pod is detected as failed (status == 0), the script automatically restarts
it using kubectl delete pod.
● A Kubernetes CronJob ensures the script runs periodically for continuous
monitoring.
● The AI model is trained on historical data and improves failure predictions
over time.
Conclusion
With this project, we created a self-healing Kubernetes cluster that: ✔ Monitors
pod health using Prometheus
✔ Uses AI-based failure detection
✔ Auto-remediates failures using Python automation
Project 2. AI-Based Configuration Drift Detection: Build a model that monitors
infrastructure-as-code (Terraform, Ansible) for unintended changes.
Configuration drift occurs when infrastructure configurations deviate from their
intended state due to manual changes, updates, or other unexpected modifications.
This project aims to automate drift detection using an AI-based model that
identifies anomalies in Terraform and Ansible configurations.
Project Setup & Step-by-Step Execution
Step 1: Install Required Tools
Ensure you have the following installed:
● Python 3
● Terraform
● Ansible
● Git
● Jenkins (Optional, for CI/CD Automation)
Install Python & Required Libraries
sudo apt update
sudo apt install python3 python3-pip -y
pip install numpy pandas scikit-learn watchdog
Install Terraform
wget -O terraform.zip
https://releases.hashicorp.com/terraform/1.6.0/terraform_1.6.0_linux_amd64.zip
unzip terraform.zip
sudo mv terraform /usr/local/bin/
terraform --version
Install Ansible
sudo apt install ansible -y
ansible --version
Step 2: Create a Terraform Configuration
Create a Terraform script to provision an AWS EC2 instance.
File: main.tf
hcl
provider "aws" {
region = "us-east-1"
resource "aws_instance" "web" {
ami = "ami-12345678"
instance_type = "t2.micro"
tags = {
Name = "Drift-Detection-Instance"
}
Initialize & Apply Terraform
terraform init
terraform apply -auto-approve
Step 3: Create an Ansible Playbook
Ansible will configure the server.
File: playbook.yml
yaml
- name: Configure Web Server
hosts: all
become: yes
tasks:
- name: Install Nginx
apt:
name: nginx
state: present
Run Ansible Playbook
ansible-playbook -i inventory.ini playbook.yml
Step 4: Implement AI-Based Drift Detection
We will use Python and Machine Learning to detect unexpected changes.
File: drift_detector.py
python
import os
import hashlib
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
# Function to calculate hash of configuration files
def get_file_hash(file_path):
hasher = hashlib.md5()
with open(file_path, "rb") as f:
hasher.update(f.read())
return hasher.hexdigest()
# List of configuration files to monitor
config_files = ["main.tf", "playbook.yml"]
# Generate initial baseline hashes
baseline = {file: get_file_hash(file) for file in config_files}
# Function to detect drift
def detect_drift():
current_hashes = [get_file_hash(file) for file in config_files]
baseline_hashes = list(baseline.values())
# Convert to numerical representation
data = np.array([baseline_hashes, current_hashes])
df = pd.DataFrame(data.T, columns=["baseline", "current"])
# Train Isolation Forest for anomaly detection
model = IsolationForest(contamination=0.1)
model.fit(df)
# Predict anomalies (drift)
anomalies = model.predict(df)
for i, file in enumerate(config_files):
if anomalies[i] == -1:
print(f"Configuration drift detected in: {file}")
# Run drift detection
detect_drift()
Step 5: Automate Drift Detection with Jenkins
Create a Jenkins pipeline to automate drift detection.
File: Jenkinsfile
groovy
pipeline {
agent any
stages {
stage('Checkout Code') {
steps {
git 'https://github.com/your-repo/drift-detection.git'
stage('Run Drift Detector') {
steps {
sh 'python3 drift_detector.py'
Explanation of Code
1. Terraform Configuration (main.tf)
○ Defines an AWS EC2 instance using Terraform.
○ terraform apply provisions the infrastructure.
2. Ansible Playbook (playbook.yml)
○ Installs Nginx on the EC2 instance.
○ Ensures infrastructure consistency.
3. Drift Detector (drift_detector.py)
○ Uses MD5 hashing to detect file changes.
○ Uses Machine Learning (Isolation Forest) to detect anomalies.
○ Compares current Terraform & Ansible configurations with the
baseline.
4. Jenkins Pipeline (Jenkinsfile)
○ Automates drift detection.
○ Runs Python script to check for configuration drifts.
Conclusion
This project automates drift detection using AI-based anomaly detection. The
model continuously monitors Terraform & Ansible configurations, alerting when
unintended changes occur. By integrating with Jenkins, we ensure automated
monitoring for infrastructure stability.
7. AI for Log Analysis & Monitoring
Project 1. AI-Powered Log Filtering & Categorization: Implementing AI to
automatically filter out noise in logs and categorize relevant events for quicker
analysis.
● Introduction
○ The goal of this project is to build an AI-powered system that
processes log data, filters out noise, and categorizes important events
using Python, Machine Learning (ML), and NLP (Natural
Language Processing).
○ This helps DevOps engineers, SREs (Site Reliability Engineers),
and security teams quickly analyze logs and detect issues.
○ We’ll use Python, Flask (for API), Scikit-learn, NLP libraries
(spaCy or NLTK), and a simple ML model for classification.
Step-by-Step Guide
1. Set Up the Environment
Install dependencies:
pip install flask pandas numpy scikit-learn nltk spacy
python -m spacy download en_core_web_sm
2. Prepare Log Data
Logs are usually in text files. Example:
log
[2024-02-08 12:30:00] ERROR Database connection failed
[2024-02-08 12:31:00] INFO User login successful
[2024-02-08 12:32:00] WARNING Disk space running low
We'll preprocess logs to extract key parts.
3. Preprocessing Logs (Python Code)
python
import re
import pandas as pd
import spacy
nlp = spacy.load("en_core_web_sm")
# Sample logs
logs = [
"[2024-02-08 12:30:00] ERROR Database connection failed",
"[2024-02-08 12:31:00] INFO User login successful",
"[2024-02-08 12:32:00] WARNING Disk space running low"
]
# Function to clean and extract log messages
def preprocess_log(log):
log = re.sub(r"\[.*?\]", "", log) # Remove timestamp
return log.strip()
# Process logs
clean_logs = [preprocess_log(log) for log in logs]
# Convert logs to structured format
df = pd.DataFrame({"log": clean_logs})
print(df.head())
Explanation:
● We remove timestamps to focus on the message.
● Store logs in a structured format using Pandas.
4. Implement AI Model for Categorization
Using TF-IDF Vectorization + Naïve Bayes Classifier:
python
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import make_pipeline
# Sample log data with labels
data = [
("ERROR Database connection failed", "Error"),
("INFO User login successful", "Info"),
("WARNING Disk space running low", "Warning"),
("ERROR Unable to reach API", "Error"),
("INFO Server restarted", "Info")
]
# Splitting logs and labels
texts, labels = zip(*data)
# Create text classification model
model = make_pipeline(TfidfVectorizer(), MultinomialNB())
# Train model
model.fit(texts, labels)
# Test on new log
test_log = ["CRITICAL: System overload detected"]
predicted_category = model.predict(test_log)[0]
print(f"Predicted Category: {predicted_category}")
Explanation:
● TF-IDF (Term Frequency-Inverse Document Frequency) converts logs
into numerical format.
● Naïve Bayes is used for classification.
● The model predicts the category of an unseen log message.
5. Build Flask API for Real-Time Log Processing
python
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/classify', methods=['POST'])
def classify_log():
data = request.json
log_message = data.get("log")
if not log_message:
return jsonify({"error": "No log provided"}), 400
category = model.predict([log_message])[0]
return jsonify({"log": log_message, "category": category})
if __name__ == '__main__':
app.run(debug=True)
Run the API:
python app.py
Test API (Using cURL or Postman):
curl -X POST http://127.0.0.1:5000/classify -H "Content-Type: application/json" -d
'{"log": "CRITICAL: System overload detected"}'
6. Deploying the API using Docker
Dockerfile:
FROM python:3.9
WORKDIR /app
COPY . /app
RUN pip install -r requirements.txt
CMD ["python", "app.py"]
Build & Run Docker Container:
docker build -t log-ai .
docker run -p 5000:5000 log-ai
Summary
✔ Preprocessed logs using regex & NLP
✔ Built a text classifier using Naïve Bayes
✔ Created a Flask API for real-time log categorization
✔ Deployed using Docker
Project 2. Real-Time Anomaly Detection in Logs: AI system that processes logs
in real time and raises alerts when unusual patterns or behavior are detected.
This project builds an AI-based system that processes logs in real time, detects
anomalies, and raises alerts when unusual behavior is found.
Tech Stack
● Python (for log processing & AI model)
● Flask (to expose API for log ingestion)
● Scikit-learn (for anomaly detection)
● Elasticsearch & Kibana (for storage & visualization)
● Docker (for containerization)
Step 1: Set Up the Environment
1. Install Dependencies
Run the following command:
pip install pandas numpy scikit-learn flask elasticsearch requests
If using Docker for Elasticsearch, run:
docker pull elasticsearch:8.11.2
docker run -d --name es -p 9200:9200 -e "discovery.type=single-node"
elasticsearch:8.11.2
To check if Elasticsearch is running:
curl -X GET "localhost:9200"
Step 2: Prepare Log Data
Create a sample log file (logs.json):
json
[
{"timestamp": "2025-02-08T12:00:00", "message": "User login", "status": 200,
"response_time": 120},
{"timestamp": "2025-02-08T12:01:00", "message": "File upload", "status": 200,
"response_time": 350},
{"timestamp": "2025-02-08T12:02:00", "message": "Failed login attempt",
"status": 401, "response_time": 90}
]
Step 3: Implement Anomaly Detection
We'll use Isolation Forest, an unsupervised machine learning algorithm, to detect
anomalies in logs.
Create anomaly_detector.py
python
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
import json
class AnomalyDetector:
def __init__(self):
self.model = IsolationForest(contamination=0.1, random_state=42)
def train(self, log_data):
df = pd.DataFrame(log_data)
features = df[['status', 'response_time']]
self.model.fit(features)
def predict(self, log_entry):
df = pd.DataFrame([log_entry])
features = df[['status', 'response_time']]
result = self.model.predict(features)
return "Anomaly" if result[0] == -1 else "Normal"
Explanation
● We use Isolation Forest to detect anomalies.
● The model trains on status and response_time fields.
● When new log data is received, the model predicts if it’s an anomaly.
Step 4: Create an API to Ingest Logs
We will use Flask to expose an API that receives logs, analyzes them, and stores
them in Elasticsearch.
Create app.py
python
from flask import Flask, request, jsonify
from elasticsearch import Elasticsearch
from anomaly_detector import AnomalyDetector
app = Flask(__name__)
es = Elasticsearch("http://localhost:9200")
detector = AnomalyDetector()
@app.route('/train', methods=['POST'])
def train():
data = request.get_json()
detector.train(data)
return jsonify({"message": "Model trained successfully"})
@app.route('/log', methods=['POST'])
def log_event():
data = request.get_json()
anomaly_result = detector.predict(data)
# Store in Elasticsearch
es.index(index="logs", document={"log": data, "anomaly": anomaly_result})
return jsonify({"status": "logged", "anomaly": anomaly_result})
if __name__ == "__main__":
app.run(debug=True)
Explanation
● /train API trains the model using past logs.
● /log API:
○ Receives new log entries.
○ Predicts if they are anomalies.
○ Stores the results in Elasticsearch.
Step 5: Train and Test the Model
Train the Model
Run:
curl -X POST "http://127.0.0.1:5000/train" -H "Content-Type: application/json" -d
@logs.json
Send a New Log for Analysis
curl -X POST "http://127.0.0.1:5000/log" -H "Content-Type: application/json" -d '{
"timestamp": "2025-02-08T12:10:00", "message": "Unusual traffic spike",
"status": 500, "response_time": 2000
}'
Expected response:
json
{"status": "logged", "anomaly": "Anomaly"}
Step 6: Visualizing in Kibana
If using Kibana:
Start Kibana
docker run -d --name kibana --link es:elasticsearch -p 5601:5601 kibana:8.11.2
Open http://localhost:5601, go to "Discover", and view logs.
Step 7: Running the Project
Run the Flask API
python app.py
Test with Log Data
● Use the /train API to train.
● Use the /log API to detect anomalies.
Conclusion
✅
✅
This project: Detects anomalies in real-time logs
✅
Uses Isolation Forest for AI-based detection
✅
Stores logs in Elasticsearch for analysis
Exposes APIs using Flask
Project 3. Log Correlation for Performance Issues: Using AI to correlate logs
from different services to identify root causes of performance degradation or
service outages.
Modern applications generate logs across multiple services, making it difficult to
pinpoint performance issues. Log correlation using AI helps analyze logs from
various sources, detect patterns, and identify root causes of performance
degradation or service outages.
In this project, we will:
● Collect logs from multiple services using Fluentd or Filebeat.
● Store logs in Elasticsearch for indexing and searching.
● Use Python and Machine Learning (ML) (Scikit-learn) to analyze logs
and detect anomalies.
● Visualize insights with Kibana or Grafana.
Step-by-Step Implementation
Step 1: Setup Log Collection
We use Fluentd or Filebeat to collect logs from different services.
Install Fluentd (Ubuntu Example)
curl -fsSL https://toolbelt.treasuredata.com/sh/install-ubuntu-bionic-td-agent4.sh |
sh
Install Filebeat (Alternative to Fluentd)
sudo apt-get install filebeat
Configure Filebeat to Send Logs to Elasticsearch
Edit /etc/filebeat/filebeat.yml:
yaml
output.elasticsearch:
hosts: ["localhost:9200"]
username: "elastic"
password: "yourpassword"
Restart Filebeat:
sudo systemctl restart filebeat
Step 2: Store Logs in Elasticsearch
Install Elasticsearch (Ubuntu Example)
sudo apt-get install elasticsearch
sudo systemctl start elasticsearch
sudo systemctl enable elasticsearch
Verify installation:
curl -X GET "localhost:9200/_cat/indices?v"
Step 3: Visualize Logs in Kibana
Install Kibana
sudo apt-get install kibana
sudo systemctl start kibana
Access Kibana at: http://localhost:5601
Step 4: Implement AI-Based Log Correlation with Python
We use Python with Scikit-learn to detect performance anomalies.
Install Dependencies
pip install pandas numpy elasticsearch scikit-learn matplotlib seaborn
Python Code for Log Correlation
python
import pandas as pd
import numpy as np
from elasticsearch import Elasticsearch
from sklearn.ensemble import IsolationForest
import matplotlib.pyplot as plt
import seaborn as sns
# Connect to Elasticsearch
es = Elasticsearch(["http://localhost:9200"])
# Fetch logs from Elasticsearch
query = {
"size": 1000,
"query": {
"range": {
"@timestamp": {
"gte": "now-1d/d",
"lt": "now/d"
}
}
}
}
response = es.search(index="logs", body=query)
logs = [hit["_source"] for hit in response["hits"]["hits"]]
# Convert logs to DataFrame
df = pd.DataFrame(logs)
# Feature extraction (Example: Response time)
df['response_time'] = df['message'].str.extract(r'Response time: (\d+)').astype(float)
# Detect anomalies using Isolation Forest
model = IsolationForest(contamination=0.05)
df['anomaly'] = model.fit_predict(df[['response_time']])
# Visualize anomalies
plt.figure(figsize=(10, 5))
sns.scatterplot(data=df, x=df.index, y="response_time", hue="anomaly",
palette={1: 'blue', -1: 'red'})
plt.title("Log Correlation for Performance Issues")
plt.show()
# Print potential issues
anomalies = df[df['anomaly'] == -1]
print("Potential performance issues detected:")
print(anomalies)
Step 5: Automate and Deploy the Solution
Run Python script every 5 minutes using cron:
crontab -e
Add:
*/5 * * * * /usr/bin/python3 /home/user/log_analysis.py
Deploy with Docker (Optional)
docker build -t log-analysis .
docker run -d --name log_analysis log-analysis
Explanation of Code
1. Connect to Elasticsearch to fetch logs.
2. Extract performance metrics (e.g., response time).
3. Use Machine Learning (Isolation Forest) to detect anomalies.
4. Visualize performance issues using Matplotlib.
5. Print logs of potential issues for debugging.
This project helps DevOps teams correlate logs, detect bottlenecks, and prevent
outages.
Project 4. AI-Based Multi-Source Log Aggregation: Aggregating logs from
diverse sources (cloud, on-prem, containers, etc.) using AI to spot cross-system
anomalies.
Log aggregation is crucial for monitoring applications running in different
environments like cloud, on-premises, and containers. This project builds an
AI-powered log aggregation system that:
● Collects logs from multiple sources (AWS CloudWatch, Kubernetes logs,
local files, etc.)
● Uses Elasticsearch for storage and Kibana for visualization
● Applies AI (Machine Learning) to detect anomalies in logs
Tech Stack
● Python (Flask for API, Pandas for data processing)
● ELK Stack (Elasticsearch, Logstash, Kibana)
● Docker & Kubernetes (for deployment)
● Machine Learning (scikit-learn for anomaly detection)
Project Setup with All Commands
1. Install Dependencies
Ensure Python, Docker, and Elasticsearch are installed.
# Install Python dependencies
pip install flask pandas elasticsearch scikit-learn docker
2. Set Up Elasticsearch & Kibana
# Pull and run Elasticsearch
docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node"
docker.elastic.co/elasticsearch/elasticsearch:8.0.0
# Pull and run Kibana
docker run -d --name kibana -p 5601:5601 --link elasticsearch
docker.elastic.co/kibana/kibana:8.0.0
3. Deploy Logstash
Create a logstash.conf file to read logs from various sources and push to
Elasticsearch:
input {
file {
path => "/var/log/app.log"
start_position => "beginning"
}
}
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}
%{LOGLEVEL:level} %{GREEDYDATA:msg}" }
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs"
}
}
Run Logstash:
docker run --rm -v $(pwd)/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
--link elasticsearch logstash:8.0.0
4. Python Flask API to Aggregate Logs
Create app.py:
python
from flask import Flask, request, jsonify
from elasticsearch import Elasticsearch
app = Flask(__name__)
es = Elasticsearch(["http://localhost:9200"])
@app.route("/logs", methods=["POST"])
def ingest_logs():
log_data = request.json
es.index(index="logs", body=log_data)
return jsonify({"message": "Log received"}), 200
if __name__ == "__main__":
app.run(debug=True, port=5000)
Run API
python app.py
5. AI-Based Anomaly Detection
Create anomaly_detection.py:
python
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
def detect_anomalies(logs):
df = pd.DataFrame(logs)
df["length"] = df["message"].apply(len)
model = IsolationForest(contamination=0.1)
df["anomaly"] = model.fit_predict(df[["length"]])
anomalies = df[df["anomaly"] == -1]
return anomalies.to_dict(orient="records")
Use it in Flask API:
python
@app.route("/anomalies", methods=["GET"])
def get_anomalies():
logs = es.search(index="logs", size=1000)["hits"]["hits"]
log_messages = [{"message": log["_source"]["msg"]} for log in logs]
anomalies = detect_anomalies(log_messages)
return jsonify(anomalies)
6. Testing the System
# Send a sample log
curl -X POST "http://localhost:5000/logs" -H "Content-Type: application/json" -d
'{"message": "Error: Connection timeout"}'
# Get detected anomalies
curl -X GET "http://localhost:5000/anomalies"
Code Explanation
1. Flask API for Log Collection
● Flask is used to create API endpoints
● /logs endpoint receives logs and stores them in Elasticsearch
2. Elasticsearch for Log Storage
● Used to index and store log data
● Querying Elasticsearch retrieves logs for AI processing
3. Machine Learning for Anomaly Detection
● IsolationForest is trained to identify unusual log patterns
● It assigns -1 (anomaly) or 1 (normal) based on log message lengths
Project 5. Automated Log Tagging: Using AI to automatically tag logs with
metadata for faster identification and analysis.
Log files contain valuable insights, but manually analyzing them can be
time-consuming. This project leverages AI/ML to automatically tag logs with
metadata like severity, source, and category. This helps in faster identification,
filtering, and analysis in DevOps and security monitoring.
Project Workflow
1. Collect log data
2. Preprocess logs (cleaning, tokenization)
3. Train an AI model to classify logs
4. Use the trained model to tag new logs automatically
5. Store results for further analysis
Step-by-Step Implementation
Step 1: Set Up the Environment
Ensure Python and required libraries are installed.
mkdir automated-log-tagging
cd automated-log-tagging
python3 -m venv env
source env/bin/activate # On Windows: env\Scripts\activate
pip install pandas numpy scikit-learn nltk joblib
Step 2: Prepare Sample Log Data
Create a sample log file logs.txt:
nano logs.txt
Add some sample logs:
pgsql
[ERROR] 2025-02-08 12:00:01 Database connection failed.
[INFO] 2025-02-08 12:05:02 User logged in successfully.
[WARNING] 2025-02-08 12:10:03 High memory usage detected.
[ERROR] 2025-02-08 12:15:04 Unauthorized access attempt.
Step 3: Preprocess the Log Data
Create preprocess.py to clean and prepare the logs.
python
import re
import pandas as pd
import nltk
from nltk.tokenize import word_tokenize
nltk.download('punkt')
def preprocess_log(log):
"""Clean and tokenize logs"""
log = re.sub(r"[\[\]]", "", log) # Remove brackets
log = log.lower()
tokens = word_tokenize(log)
return " ".join(tokens)
def load_logs(filename):
"""Load logs from file"""
with open(filename, "r") as file:
logs = file.readlines()
return [preprocess_log(log.strip()) for log in logs]
if __name__ == "__main__":
logs = load_logs("logs.txt")
df = pd.DataFrame(logs, columns=["log"])
df.to_csv("processed_logs.csv", index=False)
print("Logs preprocessed and saved.")
Run the script
python preprocess.py
Step 4: Train a Simple AI Model
Create train_model.py to train a log classifier using scikit-learn.
python
import pandas as pd
import joblib
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import train_test_split
# Load processed logs
df = pd.read_csv("processed_logs.csv")
# Add labels manually (ERROR, INFO, WARNING)
df["label"] = ["ERROR", "INFO", "WARNING", "ERROR"]
# Split data
X_train, X_test, y_train, y_test = train_test_split(df["log"], df["label"],
test_size=0.2, random_state=42)
# Create model pipeline
model = make_pipeline(TfidfVectorizer(), MultinomialNB())
# Train model
model.fit(X_train, y_train)
# Save model
joblib.dump(model, "log_classifier.pkl")
print("Model trained and saved.")
Run the training:
python train_model.py
Step 5: Automatically Tag New Logs
Create tag_logs.py to tag logs using the trained model.
python
import joblib
import pandas as pd
# Load model
model = joblib.load("log_classifier.pkl")
def tag_log(log):
"""Predict log category"""
return model.predict([log])[0]
# Load new logs
df = pd.read_csv("processed_logs.csv")
df["predicted_label"] = df["log"].apply(tag_log)
# Save results
df.to_csv("tagged_logs.csv", index=False)
print("Logs tagged and saved.")
Run the tagging:
python tag_logs.py
Step 6: View Tagged Logs
cat tagged_logs.csv
Example Output:
pgsql
log,predicted_label
"error 2025-02-08 database connection failed.",ERROR
"info 2025-02-08 user logged in successfully.",INFO
"warning 2025-02-08 high memory usage detected.",WARNING
"error 2025-02-08 unauthorized access attempt.",ERROR
● Data Preprocessing: Cleans logs by removing unwanted characters and
tokenizing words.
● Model Training: Uses TF-IDF (Term Frequency-Inverse Document
Frequency) for feature extraction and Naïve Bayes for classification.
● Log Tagging: Predicts the category (ERROR, INFO, WARNING) for new
logs.
8. AI for Predictive Scaling & Performance
Optimization
Project 1. Predictive Load Balancing: AI model that predicts incoming traffic
and adjusts load balancing strategies accordingly to optimize resource usage and
minimize latency.
Load balancing distributes network traffic across multiple servers to ensure no
single server is overwhelmed. Traditional load balancing techniques rely on static
rules or real-time traffic metrics. However, predictive load balancing uses AI/ML
models to anticipate traffic surges and adjust strategies proactively, minimizing
latency and optimizing resource usage.
Key Technologies Used:
● Python (for AI model and API)
● Flask (to serve predictions)
● Scikit-learn / TensorFlow (for training ML models)
● Nginx / HAProxy (as load balancers)
● Docker & Kubernetes (for deployment)
● Prometheus & Grafana (for monitoring)
Step 1: Setting Up the Environment
Before starting, install the required dependencies:
# Update system and install required packages
sudo apt update && sudo apt install python3 python3-pip docker-compose -y
# Install Python dependencies
pip3 install flask numpy pandas scikit-learn tensorflow joblib requests
Step 2: Building the AI Model
The AI model predicts traffic based on historical data.
2.1: Create Training Data
Create a dataset (traffic_data.csv) with columns: time, requests_per_minute,
cpu_usage, memory_usage, response_time, and server_allocation.
python
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
import joblib
# Load dataset
df = pd.read_csv("traffic_data.csv")
# Define features and target variable
X = df[['time', 'requests_per_minute', 'cpu_usage', 'memory_usage',
'response_time']]
y = df['server_allocation']
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
random_state=42)
# Train model
model = RandomForestRegressor(n_estimators=100)
model.fit(X_train, y_train)
# Save model
joblib.dump(model, "load_balancer_model.pkl")
Step 3: Creating API to Serve Predictions
We create a Flask API to serve predictions to the load balancer.
python
from flask import Flask, request, jsonify
import joblib
import numpy as np
# Load trained model
model = joblib.load("load_balancer_model.pkl")
app = Flask(__name__)
@app.route('/predict', methods=['POST'])
def predict():
data = request.get_json()
features = np.array([[data['time'], data['requests_per_minute'],
data['cpu_usage'], data['memory_usage'], data['response_time']]])
prediction = model.predict(features)[0]
return jsonify({"server_allocation": int(prediction)})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run the API:
python3 api.py
Step 4: Configuring Nginx as a Load Balancer
Modify nginx.conf to use the AI-powered decision-making API.
nginx
http {
upstream backend_servers {
server server1.example.com;
server server2.example.com;
server server3.example.com;
}
server {
listen 80;
location / {
proxy_pass http://backend_servers;
}
location /predict {
proxy_pass http://127.0.0.1:5000;
}
}
}
Restart Nginx:
sudo systemctl restart nginx
Step 5: Automating with Docker & Kubernetes
5.1: Create Dockerfile
dockerfile
FROM python:3.9
WORKDIR /app
COPY . /app
RUN pip install -r requirements.txt
CMD ["python3", "api.py"]
Build and Run Container:
docker build -t predictive-load-balancer .
docker run -d -p 5000:5000 predictive-load-balancer
5.2: Deploy with Kubernetes
Create deployment.yaml:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: predictive-load-balancer
spec:
replicas: 2
selector:
matchLabels:
app: load-balancer
template:
metadata:
labels:
app: load-balancer
spec:
containers:
- name: load-balancer
image: predictive-load-balancer
ports:
- containerPort: 5000
Apply Deployment:
kubectl apply -f deployment.yaml
Step 6: Monitoring with Prometheus & Grafana
6.1: Install Prometheus
sudo apt install prometheus -y
sudo systemctl start prometheus
6.2: Configure Prometheus for API Metrics
Modify prometheus.yml:
yaml
scrape_configs:
- job_name: 'load-balancer-api'
metrics_path: '/metrics'
static_configs:
- targets: ['localhost:5000']
Restart Prometheus:
sudo systemctl restart prometheus
6.3: Install Grafana
sudo apt install grafana -y
sudo systemctl start grafana
Login to Grafana (http://localhost:3000), add Prometheus as a data source, and
create dashboards.
Conclusion
This project demonstrates how AI-driven predictive load balancing optimizes
resource allocation by anticipating traffic surges. It integrates:
● Machine Learning for Traffic Prediction
● Flask API for Predictions
● Nginx Load Balancer
● Docker & Kubernetes for Deployment
● Prometheus & Grafana for Monitoring
Project 2. AI-Driven Predictive Resource Allocation: Using AI to dynamically
allocate resources (CPU, memory, storage) based on predicted workloads in
containers and VMs.
This project focuses on AI-Driven Predictive Resource Allocation, where AI
models analyze past workloads and predict future resource demands. Based on
predictions, the system dynamically adjusts CPU, memory, and storage allocation
for containers and VMs to optimize performance and cost efficiency.
Step-by-Step Guide
1. Prerequisites
● Ubuntu 20.04+ (or any Linux-based OS)
● Docker & Kubernetes (for containerized environments)
● Python 3.8+ (for AI model development)
● TensorFlow/PyTorch (for predictive modeling)
● Prometheus & Grafana (for monitoring)
● Kubernetes Horizontal Pod Autoscaler (HPA) & Vertical Pod Autoscaler
(VPA)
● Terraform (for infrastructure automation)
● Ansible (for automation)
● Jupyter Notebook (for model development)
2. Project Setup
Step 1: Install Required Tools
# Update packages
sudo apt update && sudo apt upgrade -y
# Install Docker
sudo apt install docker.io -y
sudo systemctl start docker
sudo systemctl enable docker
# Install Kubernetes (kind for local setup)
curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.20.0/kind-linux-amd64
chmod +x kind
sudo mv kind /usr/local/bin/
# Install kubectl
curl -LO "https://dl.k8s.io/release/$(curl -L -s
https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x kubectl
sudo mv kubectl /usr/local/bin/
# Install Prometheus & Grafana
kubectl apply -f
https://raw.githubusercontent.com/prometheus-operator/prometheus-operator/main/
bundle.yaml
kubectl apply -f
https://raw.githubusercontent.com/grafana/grafana/main/deploy/kubernetes/grafana
.yaml
# Install Terraform
wget
https://releases.hashicorp.com/terraform/1.5.0/terraform_1.5.0_linux_amd64.zip
unzip terraform_1.5.0_linux_amd64.zip
sudo mv terraform /usr/local/bin/
3. AI Model Development (Predicting Resource Usage)
Step 2: Install Python Libraries
pip install numpy pandas tensorflow torch matplotlib seaborn scikit-learn
Step 3: Load & Preprocess Data
python
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow import keras
# Load dataset (Assuming CSV format with 'CPU', 'Memory', 'Storage',
'Timestamp')
data = pd.read_csv("resource_usage.csv")
# Convert timestamp to numerical values
data['Timestamp'] = pd.to_datetime(data['Timestamp'])
data['Timestamp'] = data['Timestamp'].astype(int) // 10**9 # Convert to Unix time
# Normalize data
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(data)
# Split dataset
X = data_scaled[:, :-1]
y = data_scaled[:, -1]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
random_state=42)
# Build AI Model
model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(X_train.shape[1],)),
keras.layers.Dense(32, activation='relu'),
keras.layers.Dense(1) # Predict next resource allocation
])
model.compile(optimizer='adam', loss='mse')
model.fit(X_train, y_train, epochs=50, batch_size=16, validation_data=(X_test,
y_test))
# Save model
model.save("resource_predictor.h5")
4. Deploy AI Model in Kubernetes
Step 4: Create a Flask API for AI Model
python
from flask import Flask, request, jsonify
import tensorflow as tf
import numpy as np
app = Flask(__name__)
# Load trained model
model = tf.keras.models.load_model("resource_predictor.h5")
@app.route('/predict', methods=['POST'])
def predict():
data = request.get_json()
input_data = np.array(data["features"]).reshape(1, -1)
prediction = model.predict(input_data)
return jsonify({"predicted_allocation": prediction.tolist()})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Step 5: Create Dockerfile for Deployment
dockerfile
FROM python:3.8-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY app.py .
COPY resource_predictor.h5 .
CMD ["python", "app.py"]
Step 6: Build and Push Docker Image
docker build -t myrepo/resource-predictor:latest .
docker push myrepo/resource-predictor:latest
Step 7: Deploy to Kubernetes
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-resource-predictor
spec:
replicas: 1
selector:
matchLabels:
app: ai-resource-predictor
template:
metadata:
labels:
app: ai-resource-predictor
spec:
containers:
- name: ai-resource-predictor
image: myrepo/resource-predictor:latest
ports:
- containerPort: 5000
kubectl apply -f deployment.yaml
5. Implement Auto-Scaling Based on Predictions
Step 8: Enable Kubernetes HPA
kubectl autoscale deployment ai-resource-predictor --cpu-percent=50 --min=1
--max=5
Step 9: Enable Kubernetes VPA
yaml
apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
name: ai-resource-predictor-vpa
spec:
targetRef:
apiVersion: "apps/v1"
kind: Deployment
name: ai-resource-predictor
updatePolicy:
updateMode: "Auto"
kubectl apply -f vpa.yaml
6. Monitor Resource Allocation
Step 10: Setup Prometheus & Grafana Dashboards
kubectl port-forward svc/prometheus 9090
kubectl port-forward svc/grafana 3000
● Open Grafana at http://localhost:3000
● Add Prometheus as a data source
● Create a dashboard with metrics:
○ container_memory_usage_bytes
○ container_cpu_usage_seconds_total
○ container_fs_usage_bytes
7. Automate Infrastructure with Terraform
Step 11: Create Terraform Script
hcl
provider "aws" {
region = "us-east-1"
}
resource "aws_instance" "k8s_node" {
ami = "ami-0abcdef1234567890"
instance_type = "t3.medium"
tags = {
Name = "KubernetesNode"
}
}
terraform init
terraform apply -auto-approve
Conclusion
This project predicts future resource usage and automatically scales
Kubernetes workloads using AI. It improves efficiency, cost optimization, and
performance for dynamic cloud environments.
Project 3. Predictive Autoscaling with Customizable Metrics: AI-based
auto-scaling system that considers custom application-specific metrics in addition
to CPU/memory load.
Autoscaling is essential in cloud environments to manage application performance
and cost efficiently. Traditional autoscaling methods rely on CPU and memory
utilization, but predictive autoscaling enhances this by using AI-based models to
forecast future resource demands.
This project implements a Predictive Autoscaling System that uses machine
learning models to scale resources based on both system (CPU/Memory) and
custom application-specific metrics, such as request rates, latency, or database
queries per second.
Project Overview
● Step 1: Setup Kubernetes cluster (K3s/Kind/Minikube)
● Step 2: Install and configure Prometheus for monitoring metrics
● Step 3: Train and deploy a Machine Learning model for prediction
● Step 4: Implement a custom Kubernetes autoscaler using Python
● Step 5: Deploy a sample application and test autoscaling
Step 1: Setup Kubernetes Cluster
Using Kind (Kubernetes in Docker)
kind create cluster --name predictive-autoscale
kubectl cluster-info --context kind-predictive-autoscale
Step 2: Install Prometheus for Metrics Collection
Deploy Prometheus using Helm
helm repo add prometheus-community
https://prometheus-community.github.io/helm-charts
helm repo update
helm install prometheus prometheus-community/kube-prometheus-stack
--namespace monitoring --create-namespace
Verify Installation
kubectl get pods -n monitoring
Step 3: Train and Deploy a Machine Learning Model
We use a simple Linear Regression Model trained with past CPU usage and
request rates to predict future resource needs.
Python Code for Training (train_model.py)
python
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
import pickle
# Sample Data: CPU Usage & Requests
data = {
"cpu_usage": [20, 30, 50, 60, 80],
"request_rate": [100, 200, 400, 600, 900],
"replicas": [1, 2, 3, 4, 5] # Expected scaling
}
df = pd.DataFrame(data)
# Train Model
X = df[["cpu_usage", "request_rate"]]
y = df["replicas"]
model = LinearRegression()
model.fit(X, y)
# Save Model
with open("autoscaler_model.pkl", "wb") as f:
pickle.dump(model, f)
Deploy Model as a Microservice
Create a Flask API to serve predictions.
pip install flask scikit-learn pandas numpy
autoscaler_service.py
python
from flask import Flask, request, jsonify
import pickle
import numpy as np
app = Flask(__name__)
# Load model
with open("autoscaler_model.pkl", "rb") as f:
model = pickle.load(f)
@app.route("/predict", methods=["POST"])
def predict():
data = request.get_json()
cpu_usage = data["cpu_usage"]
request_rate = data["request_rate"]
prediction = model.predict(np.array([[cpu_usage, request_rate]]))
return jsonify({"recommended_replicas": int(round(prediction[0]))})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
Run API
python autoscaler_service.py
Test the API:
curl -X POST http://localhost:5000/predict -H "Content-Type: application/json" -d
'{"cpu_usage": 60, "request_rate": 700}'
Step 4: Implement Custom Kubernetes Autoscaler
We create a Python script that fetches Prometheus metrics and scales
deployments.
autoscaler.py
python
import requests
import json
import subprocess
PROMETHEUS_URL =
"http://prometheus-server.monitoring.svc.cluster.local:9090/api/v1/query"
PREDICTOR_URL =
"http://autoscaler-service.default.svc.cluster.local:5000/predict"
DEPLOYMENT_NAME = "my-app"
NAMESPACE = "default"
def get_metrics():
cpu_query =
'sum(rate(container_cpu_usage_seconds_total{namespace="default"}[5m]))'
request_query = 'sum(rate(http_requests_total{namespace="default"}[5m]))'
cpu_response =
requests.get(f"{PROMETHEUS_URL}?query={cpu_query}").json()
request_response =
requests.get(f"{PROMETHEUS_URL}?query={request_query}").json()
cpu_usage = float(cpu_response["data"]["result"][0]["value"][1])
request_rate = float(request_response["data"]["result"][0]["value"][1])
return cpu_usage, request_rate
def scale_deployment(replicas):
cmd = f"kubectl scale deployment {DEPLOYMENT_NAME}
--replicas={replicas}"
subprocess.run(cmd, shell=True)
def main():
cpu_usage, request_rate = get_metrics()
payload = {"cpu_usage": cpu_usage, "request_rate": request_rate}
prediction_response = requests.post(PREDICTOR_URL, json=payload).json()
recommended_replicas = prediction_response["recommended_replicas"]
scale_deployment(recommended_replicas)
if __name__ == "__main__":
main()
Run Autoscaler in a Cron Job
Create a Kubernetes CronJob to run every minute.
yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: predictive-autoscaler
spec:
schedule: "* * * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: autoscaler
image: myrepo/autoscaler:latest
command: ["python", "autoscaler.py"]
restartPolicy: OnFailure
Step 5: Deploy a Sample Application
kubectl create deployment my-app --image=nginx
kubectl expose deployment my-app --type=LoadBalancer --port=80
Step 6: Test Predictive Autoscaling
Increase traffic:
kubectl run load-test --image=busybox --restart=Never -- wget -qO- http://my-app
Check replicas:
kubectl get deployment my-app
Summary
● We set up Kubernetes and installed Prometheus to collect metrics.
● We trained a predictive ML model to estimate the required replicas.
● We built a Flask API to serve predictions.
● We created a Python-based Kubernetes autoscaler that dynamically
scales deployments.
● We automated the scaling process with a Kubernetes CronJob.
Project 4. AI-Powered Resource Bottleneck Detection: AI to analyze
performance metrics and detect resource bottlenecks that may affect scaling
decisions.
Scaling applications efficiently requires understanding resource usage. This project
uses AI/ML techniques to analyze system performance metrics (CPU, memory,
network, and disk usage) and detect resource bottlenecks that may impact scaling
decisions. We will use Python, Prometheus, Grafana, and Scikit-Learn for data
collection, visualization, and AI-based anomaly detection.
Project Setup & Steps
1. Install Required Tools
Ensure your system has the following installed:
● Python (v3.8+)
● Prometheus (for monitoring)
● Grafana (for visualization)
● Docker (optional for containerization)
Install required Python packages:
pip install pandas numpy scikit-learn prometheus_api_client flask
2. Set Up Prometheus for Data Collection
Create a Prometheus configuration file prometheus.yml:
yaml
global:
scrape_interval: 5s
scrape_configs:
- job_name: 'system_metrics'
static_configs:
- targets: ['localhost:9090']
Run Prometheus using Docker:
docker run -p 9090:9090 -v
$(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus
3. Fetch Performance Metrics
Use Python to query Prometheus and retrieve system metrics.
Create a file fetch_metrics.py:
python
from prometheus_api_client import PrometheusConnect
import pandas as pd
import time
# Connect to Prometheus
prom = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
def fetch_metrics():
query_cpu = '100 - (avg by (instance)
(irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100)'
query_memory = 'node_memory_Active_bytes /
node_memory_MemTotal_bytes * 100'
cpu_usage = prom.custom_query(query=query_cpu)
memory_usage = prom.custom_query(query=query_memory)
return cpu_usage, memory_usage
if __name__ == "__main__":
while True:
cpu, mem = fetch_metrics()
print("CPU Usage:", cpu)
print("Memory Usage:", mem)
time.sleep(10)
Run the script:
python fetch_metrics.py
4. Implement AI Model for Bottleneck Detection
Modify bottleneck_detector.py:
python
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
# Simulated sample data
data = {
"cpu": [20, 30, 50, 90, 95, 15, 40, 80, 85, 10],
"memory": [40, 50, 75, 85, 90, 35, 60, 80, 95, 20]
}
df = pd.DataFrame(data)
# Train Isolation Forest for anomaly detection
model = IsolationForest(contamination=0.2)
df["anomaly"] = model.fit_predict(df[["cpu", "memory"]])
# Print detected anomalies
print(df[df["anomaly"] == -1])
Run:
python bottleneck_detector.py
5. Build a Flask API for Live Bottleneck Detection
Create app.py:
python
from flask import Flask, jsonify
from bottleneck_detector import model, df
app = Flask(__name__)
@app.route("/detect", methods=["GET"])
def detect():
anomalies = df[df["anomaly"] == -1].to_dict(orient="records")
return jsonify({"bottlenecks": anomalies})
if __name__ == "__main__":
app.run(debug=True, port=5000)
Run Flask API:
python app.py
Test with:
curl http://127.0.0.1:5000/detect
6. Visualize in Grafana
● Connect Grafana to Prometheus
● Create dashboards to monitor CPU and Memory usage
Conclusion
This project uses Prometheus for monitoring, Flask for API, and AI (Isolation
Forest) to detect bottlenecks in real-time. The insights help in scaling decisions,
ensuring efficient resource utilization.
Project 5. Multi-Tenant Cloud Optimization: Using AI to ensure efficient
resource sharing in multi-tenant cloud environments without compromising
performance.
Multi-tenant cloud environments host multiple users (tenants) on a shared
infrastructure, making efficient resource allocation crucial. AI-driven optimization
ensures fair resource distribution, cost savings, and performance stability without
compromising security.
This project will leverage Python, Kubernetes, Prometheus, Grafana, and
Machine Learning (ML) to build an AI-based resource allocation system.
Project Steps with Commands
Step 1: Set Up the Environment
Ensure you have the necessary tools installed:
● Python 3.x
● Kubernetes (kind or Minikube)
● Docker
● Helm
● Prometheus & Grafana
# Install Python dependencies
pip install numpy pandas scikit-learn flask requests kubernetes prometheus_client
# Install Kubernetes cluster (if not already)
kind create cluster --name multi-tenant
# Install Prometheus & Grafana for monitoring
helm repo add prometheus-community
https://prometheus-community.github.io/helm-charts
helm repo update
helm install prometheus prometheus-community/kube-prometheus-stack
Step 2: Create a Kubernetes Multi-Tenant Setup
Create Namespaces for Tenants
kubectl create namespace tenant-a
kubectl create namespace tenant-b
Define Resource Quotas for Each Tenant
Save this as quota.yaml:
yaml
apiVersion: v1
kind: ResourceQuota
metadata:
name: tenant-quota
namespace: tenant-a
spec:
hard:
cpu: "2"
memory: "4Gi"
pods: "10"
Apply it:
kubectl apply -f quota.yaml
Step 3: Deploy Sample Workloads
Create a simple web app (Flask) and deploy it in Kubernetes.
Flask App (app.py)
python
from flask import Flask
import os
app = Flask(__name__)
@app.route("/")
def home():
return f"Running in {os.environ.get('TENANT', 'default')} namespace"
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
Dockerize the App
# Dockerfile
FROM python:3.9
WORKDIR /app
COPY app.py .
RUN pip install flask
CMD ["python", "app.py"]
docker build -t multi-tenant-app .
docker tag multi-tenant-app myrepo/multi-tenant-app:latest
docker push myrepo/multi-tenant-app:latest
Deploy in Kubernetes
yaml
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: tenant-app
namespace: tenant-a
spec:
replicas: 2
selector:
matchLabels:
app: tenant-app
template:
metadata:
labels:
app: tenant-app
spec:
containers:
- name: tenant-app
image: myrepo/multi-tenant-app:latest
ports:
- containerPort: 5000
Apply it:
kubectl apply -f deployment.yaml
Step 4: AI-Based Optimization Model
Create an AI model to predict and optimize resource allocation.
AI Model (optimize.py)
python
import numpy as np
from sklearn.linear_model import LinearRegression
# Sample data (CPU usage vs. requests)
X = np.array([10, 20, 30, 40, 50]).reshape(-1, 1) # Requests
y = np.array([1, 2, 2.5, 3, 4]) # CPU usage in cores
model = LinearRegression()
model.fit(X, y)
def predict_cpu(requests):
return model.predict(np.array([[requests]]))[0]
# Example prediction
print(f"Predicted CPU for 60 requests: {predict_cpu(60)} cores")
Step 5: Monitor and Optimize in Real-Time
Expose Prometheus Metrics
python
# metrics.py
from prometheus_client import start_http_server, Gauge
import random
import time
cpu_usage = Gauge("cpu_usage", "Current CPU usage")
def monitor():
start_http_server(8000)
while True:
cpu_usage.set(random.uniform(1, 4)) # Simulating CPU usage
time.sleep(5)
monitor()
View Metrics in Prometheus
kubectl port-forward svc/prometheus 9090
Access: http://localhost:9090
Visualize in Grafana
kubectl port-forward svc/grafana 3000
Access: http://localhost:3000 (Default Login: admin/admin)
Conclusion
This project sets up an AI-driven multi-tenant cloud resource optimization system.
● AI predicts CPU needs
● Prometheus monitors usage
● Kubernetes enforces quotas
● Grafana visualizes performance
9. AI for Incident Prediction & Automated Remediation
Project 1. Automated Health Checks with AI: AI-powered health check system
that automatically checks infrastructure health and suggests fixes before failure.
Automated Health Checks with AI is a system that monitors infrastructure
(servers, databases, applications) using AI. It detects issues like high CPU usage,
low memory, or failing services and suggests or applies fixes automatically.
Technologies Used:
● Python (Flask for API, TensorFlow for AI model)
● Prometheus (Monitoring)
● Grafana (Visualization)
● Docker (Containerization)
● Kubernetes (Orchestration)
● Jenkins (CI/CD)
Step-by-Step Implementation
1. Install Dependencies
Ensure Python, Docker, and Kubernetes are installed.
sudo apt update && sudo apt install -y python3 python3-pip docker.io kubectl
pip3 install flask prometheus_client tensorflow numpy pandas
2. Set Up Prometheus for Monitoring
Create a prometheus.yml config file:
yaml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'health-checks'
static_configs:
- targets: ['localhost:8000']
Run Prometheus in Docker:
docker run -d --name=prometheus -p 9090:9090 -v
$(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus
3. Create a Flask API for Health Checks
python
from flask import Flask, jsonify
import psutil
import tensorflow as tf
import numpy as np
app = Flask(__name__)
# AI Model (Dummy Model for Prediction)
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(10, activation='relu', input_shape=(3,)),
tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
@app.route('/health', methods=['GET'])
def check_health():
cpu = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory().percent
disk = psutil.disk_usage('/').percent
prediction = model.predict(np.array([[cpu, memory, disk]]))
health_status = "Critical" if prediction[0][0] > 0.5 else "Healthy"
return jsonify({'cpu': cpu, 'memory': memory, 'disk': disk, 'status': health_status})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)
Run the API:
python3 health_check.py
4. Set Up Grafana for Visualization
Run Grafana:
docker run -d --name=grafana -p 3000:3000 grafana/grafana
Log in to http://localhost:3000 and configure Prometheus as a data source.
5. Deploy in Kubernetes
Create a deployment file health-check-deployment.yaml:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: health-check
spec:
replicas: 2
selector:
matchLabels:
app: health-check
template:
metadata:
labels:
app: health-check
spec:
containers:
- name: health-check
image: your-dockerhub-username/health-check:latest
ports:
- containerPort: 8000
Apply it:
kubectl apply -f health-check-deployment.yaml
6. Automate with Jenkins
Create a Jenkinsfile:
groovy
pipeline {
agent any
stages {
stage('Build') {
steps {
sh 'docker build -t your-dockerhub-username/health-check .'
}
}
stage('Push') {
steps {
withDockerRegistry([credentialsId: 'docker-hub', url: '']) {
sh 'docker push your-dockerhub-username/health-check'
}
}
}
stage('Deploy') {
steps {
sh 'kubectl apply -f health-check-deployment.yaml'
}
}
}
}
Run Jenkins Pipeline.
● Flask API: Hosts a simple server that checks CPU, memory, and disk usage.
● AI Model: Uses TensorFlow to analyze the system's health and predict
failures.
● Prometheus: Collects real-time system metrics.
● Grafana: Visualizes data from Prometheus.
● Kubernetes: Deploys and scales the application.
● Jenkins: Automates build and deployment.
Project 2. Dynamic Incident Severity Prediction: AI model that predicts the
potential severity of an incident based on past data, helping teams prioritize
responses.
Incident management is crucial in IT operations, cybersecurity, and customer
support. A quick response to critical incidents can prevent business losses. This
project develops a Machine Learning (ML) model to predict incident severity
using historical data, helping teams prioritize responses efficiently.
Technologies Used
● Python (for data processing and model training)
● Pandas, NumPy (for data handling)
● Scikit-learn (for machine learning)
● Flask (to create an API for predictions)
● Docker (for containerization)
● Jupyter Notebook (for experimentation)
2. Steps to Build the Project
Step 1: Set Up the Environment
Install required libraries:
pip install pandas numpy scikit-learn flask joblib
Step 2: Prepare Dataset
For simplicity, we use a CSV dataset with fields like:
● incident_type (e.g., network failure, security breach)
● time_of_day (morning, afternoon, night)
● affected_users (number of users impacted)
● downtime_minutes (how long the issue lasted)
● severity (Low, Medium, High)
Example Dataset (incident_data.csv):
incident_typ time_of_d affected_use downtime_minu severit
e ay rs tes y
network_issu morning 100 30 Mediu
e m
security_brea night 500 120 High
ch
hardware_fai afternoon 50 20 Low
l
Step 3: Load & Process Data
Create a script (data_processing.py) to preprocess data.
python
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
# Load data
df = pd.read_csv("incident_data.csv")
# Encode categorical values
encoder = LabelEncoder()
df["incident_type"] = encoder.fit_transform(df["incident_type"])
df["time_of_day"] = encoder.fit_transform(df["time_of_day"])
df["severity"] = encoder.fit_transform(df["severity"]) # Convert labels to numbers
# Split data
X = df.drop(columns=["severity"])
y = df["severity"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
random_state=42)
print("Data processed successfully!")
Step 4: Train the ML Model
Create a script (train_model.py) to train a classification model.
python
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import joblib
# Train model
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train)
# Save model
joblib.dump(clf, "incident_model.pkl")
# Evaluate model
y_pred = clf.predict(X_test)
print(f"Accuracy: {accuracy_score(y_test, y_pred) * 100:.2f}%")
Step 5: Build API for Prediction
Create a Flask API (app.py) to take input and predict severity.
python
from flask import Flask, request, jsonify
import joblib
import pandas as pd
app = Flask(__name__)
# Load model
model = joblib.load("incident_model.pkl")
@app.route("/predict", methods=["POST"])
def predict():
data = request.get_json()
df = pd.DataFrame([data])
prediction = model.predict(df)
severity_map = {0: "Low", 1: "Medium", 2: "High"}
return jsonify({"severity_prediction": severity_map[prediction[0]]})
if __name__ == "__main__":
app.run(debug=True)
Step 6: Test the API
Run the Flask app:
python app.py
Then, send a test request using Postman or cURL:
curl -X POST http://127.0.0.1:5000/predict -H "Content-Type: application/json" -d
'{"incident_type": 1, "time_of_day": 2, "affected_users": 200,
"downtime_minutes": 45}'
Expected Response:
json
{"severity_prediction": "Medium"}
Step 7: Containerize the Application
Create a Dockerfile for the API:
dockerfile
FROM python:3.9
WORKDIR /app
COPY . /app
RUN pip install -r requirements.txt
CMD ["python", "app.py"]
Build & Run the Docker container:
docker build -t incident-severity .
docker run -p 5000:5000 incident-severity
● Data Preprocessing: Converts raw data into a usable format.
● Label Encoding: Transforms categorical data (e.g., "morning") into
numbers.
● Model Training: Uses past incidents to learn patterns.
● Flask API: Exposes a web service to take new incidents as input and predict
severity.
● Docker: Ensures the project runs the same way everywhere.
Project 3. Proactive Failure Prevention System: AI-based system that uses
failure trends to predict and prevent critical infrastructure failures before they
happen.
Introduction
In critical infrastructure systems like manufacturing plants, cloud servers, or
railway tracks, failures can cause significant downtime and financial loss. A
Proactive Failure Prevention System leverages machine learning to predict
failures before they happen. The system analyzes past failure data, identifies
trends, and alerts users about potential failures so preventive actions can be taken.
Project Breakdown
1. Set up the environment (Python, dependencies, database)
2. Collect and store sensor data (Simulated dataset)
3. Train a Machine Learning model (Failure prediction using Scikit-Learn)
4. Build an API using Flask (Serve ML predictions)
5. Store predictions in MongoDB (Historical tracking)
6. Deploy using Docker (Containerize and run anywhere)
Step-by-Step Implementation
1. Set Up the Environment
Install the required dependencies:
# Update the system and install dependencies
sudo apt update && sudo apt install python3-pip -y
# Create and activate a virtual environment
python3 -m venv venv
source venv/bin/activate
# Install necessary Python libraries
pip install flask pandas scikit-learn pymongo numpy joblib
2. Prepare the Sensor Data
We'll create a simulated dataset representing sensor readings and failure records.
sensor_data.csv (Example dataset)
temperature,pressure,vibration,failure
80,100,0.5,1
60,85,0.3,0
75,95,0.4,1
50,70,0.2,0
3. Train a Machine Learning Model
We'll use a Random Forest Classifier to predict failures based on sensor data.
train_model.py
python
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import joblib
# Load dataset
df = pd.read_csv("sensor_data.csv")
# Features and target variable
X = df[['temperature', 'pressure', 'vibration']]
y = df['failure']
# Split data into training and testing
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
random_state=42)
# Train the model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Evaluate the model
y_pred = model.predict(X_test)
print(f"Model Accuracy: {accuracy_score(y_test, y_pred)}")
# Save the trained model
joblib.dump(model, "failure_model.pkl")
Run the script:
python train_model.py
4. Create a Flask API
Flask will serve predictions via an API.
app.py
python
from flask import Flask, request, jsonify
import joblib
import numpy as np
from pymongo import MongoClient
# Load trained model
model = joblib.load("failure_model.pkl")
# Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["failure_db"]
collection = db["predictions"]
app = Flask(__name__)
@app.route('/predict', methods=['POST'])
def predict():
data = request.json
temperature = data["temperature"]
pressure = data["pressure"]
vibration = data["vibration"]
# Make prediction
features = np.array([[temperature, pressure, vibration]])
prediction = model.predict(features)[0]
# Store in MongoDB
collection.insert_one({"temperature": temperature, "pressure": pressure,
"vibration": vibration, "prediction": int(prediction)})
return jsonify({"failure": bool(prediction)})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
5. Run MongoDB
Start MongoDB to store predictions.
sudo systemctl start mongod
mongo --eval 'use failure_db'
6. Test the API
Run the Flask app:
python app.py
Send a test request:
curl -X POST "http://127.0.0.1:5000/predict" -H "Content-Type: application/json"
-d '{"temperature": 75, "pressure": 90, "vibration": 0.4}'
Expected output:
json
{"failure": true}
7. Deploy Using Docker
Create a Dockerfile:
dockerfile
FROM python:3.9
WORKDIR /app
COPY . .
RUN pip install -r requirements.txt
CMD ["python", "app.py"]
Build and run the container:
docker build -t failure-predictor .
docker run -p 5000:5000 failure-predictor
● Machine Learning (ML) Model: We trained a model to predict failures
using historical data.
● Flask API: The API accepts real-time sensor data and predicts failure risks.
● MongoDB: Stores historical predictions to analyze failure trends.
● Docker: Enables the application to run in any environment.
Conclusion
This project showcases how AI-driven predictive maintenance can prevent
failures. By continuously improving the ML model and integrating real-time IoT
sensor data, this system can be scaled for smart manufacturing, cloud reliability,
and critical infrastructure monitoring.
Project 4. Predictive Incident Management in Multi-Cloud: AI to predict
incidents across different cloud environments and suggest remediation actions.
Cloud environments generate vast amounts of logs and monitoring data. This
project builds an AI-powered system that predicts incidents across AWS, Azure,
and GCP and suggests remediation actions.
Technologies Used
● Machine Learning (ML): Python, Scikit-learn, Pandas
● Cloud APIs: AWS CloudWatch, Azure Monitor, GCP Logging
● Infrastructure: Docker, Kubernetes, Terraform
● Monitoring: Prometheus, Grafana
● DevOps Tools: Jenkins, GitHub Actions
2. Project Setup
Install Required Tools
sudo apt update && sudo apt install python3-pip -y
pip install pandas numpy scikit-learn flask requests boto3 google-cloud-monitoring
azure-mgmt-monitor joblib
3. Collecting Incident Data
AWS CloudWatch Logs
python
import boto3
client = boto3.client('logs')
def get_logs(log_group, start_time, end_time):
response = client.filter_log_events(
logGroupName=log_group,
startTime=start_time,
endTime=end_time
)
return response['events']
logs = get_logs('/aws/lambda/error-logs', 1700000000, 1700003600)
print(logs)
Azure Monitor Logs
python
from azure.mgmt.monitor import MonitorManagementClient
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
client = MonitorManagementClient(credential, "<Subscription_ID>")
def get_logs():
logs =
client.metrics.list("subscriptions/<Subscription_ID>/resourceGroups/<ResourceGr
oup>/providers/Microsoft.Compute/virtualMachines/<VM_Name>")
return logs
print(get_logs())
4. Machine Learning Model
Preprocessing Data
python
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import joblib
# Load dataset
data = pd.read_csv("incident_logs.csv")
# Feature selection
X = data[['cpu_usage', 'memory_usage', 'response_time']]
y = data['incident_occurred']
# Train-Test Split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# Train Model
model = RandomForestClassifier()
model.fit(X_train, y_train)
# Save model
joblib.dump(model, 'incident_predictor.pkl')
5. API for Predictions
Flask API
python
from flask import Flask, request, jsonify
import joblib
app = Flask(__name__)
model = joblib.load("incident_predictor.pkl")
@app.route('/predict', methods=['POST'])
def predict():
data = request.get_json()
prediction = model.predict([[data['cpu_usage'], data['memory_usage'],
data['response_time']]])
return jsonify({'incident_predicted': bool(prediction[0])})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Test API
curl -X POST http://localhost:5000/predict -H "Content-Type: application/json" -d
'{"cpu_usage": 85, "memory_usage": 70, "response_time": 500}'
6. Docker & Kubernetes Deployment
Dockerfile
dockerfile
FROM python:3.9
WORKDIR /app
COPY . .
RUN pip install -r requirements.txt
CMD ["python", "app.py"]
Build and Push Docker Image
docker build -t your_dockerhub/incident-predictor:latest .
docker push your_dockerhub/incident-predictor:latest
Kubernetes Deployment
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: incident-predictor
spec:
replicas: 2
selector:
matchLabels:
app: incident-predictor
template:
metadata:
labels:
app: incident-predictor
spec:
containers:
- name: predictor
image: your_dockerhub/incident-predictor:latest
ports:
- containerPort: 5000
---
apiVersion: v1
kind: Service
metadata:
name: incident-predictor
spec:
type: LoadBalancer
ports:
- port: 80
targetPort: 5000
selector:
app: incident-predictor
Deploy to Kubernetes
kubectl apply -f deployment.yaml
kubectl get pods
kubectl get svc
7. Monitoring with Prometheus & Grafana
Prometheus Config
yaml
scrape_configs:
- job_name: 'incident-predictor'
metrics_path: /metrics
static_configs:
- targets: ['incident-predictor:5000']
Start Prometheus
docker run -d -p 9090:9090 -v ./prometheus.yml:/etc/prometheus/prometheus.yml
prom/prometheus
Start Grafana
docker run -d -p 3000:3000 grafana/grafana
8. Auto-Remediation with AWS Lambda
● If an incident is predicted, AWS Lambda triggers an action.
AWS Lambda Code
python
import boto3
def lambda_handler(event, context):
client = boto3.client('ec2')
instances = ['i-0abcd1234efgh5678']
response = client.reboot_instances(InstanceIds=instances)
return response
Trigger Lambda from API
Modify Flask API to trigger AWS Lambda if an incident is predicted:
python
import boto3
lambda_client = boto3.client('lambda')
def trigger_lambda():
response = lambda_client.invoke(FunctionName="AutoRemediationLambda")
return response
9. CI/CD with Jenkins
Jenkins Pipeline
groovy
pipeline {
agent any
stages {
stage('Build') {
steps {
sh 'docker build -t your_dockerhub/incident-predictor:latest .'
}
}
stage('Push') {
steps {
withDockerRegistry([credentialsId: 'docker-hub-credentials', url: '']) {
sh 'docker push your_dockerhub/incident-predictor:latest'
}
}
}
stage('Deploy') {
steps {
sh 'kubectl apply -f deployment.yaml'
}
}
}
}
10. Conclusion
● Predict incidents using AI.
● Deploy in Kubernetes for scalability.
● Monitor with Prometheus & Grafana.
● Automate remediation using AWS Lambda.
● CI/CD with Jenkins.
Project 5. AI-Powered Predictive Alerting: Using machine learning models to
identify patterns that precede incidents and proactively alert teams before failure
occurs.
In modern IT operations, system failures can lead to downtime, loss of revenue,
and customer dissatisfaction. This project focuses on AI-powered predictive
alerting, where we use machine learning models to analyze system logs and
metrics, identify patterns leading to failures, and proactively alert teams before
incidents occur.
This project is useful for DevOps engineers, SREs, and IT teams to implement
predictive monitoring instead of reactive troubleshooting.
Tech Stack
● Programming Language: Python
● Machine Learning: Scikit-learn, Pandas, NumPy
● Data Visualization: Matplotlib, Seaborn
● Alerting: Prometheus & Alertmanager
● Deployment: Docker, Kubernetes
● Data Storage: PostgreSQL or InfluxDB
● Logging & Monitoring: Grafana, Prometheus
Project Steps
Step 1: Setup Environment
Install the necessary dependencies:
pip install pandas numpy scikit-learn matplotlib seaborn prometheus-client flask
requests
Step 2: Collect & Preprocess Data
We'll use system logs or synthetic failure logs.
Example dataset structure (CSV)
Timesta CPU Usage Memory Disk I/O Error Failure
mp (%) Usage (%) (MB/s) Count (1/0)
10:01:00 85 76 120 5 0
10:02:00 90 80 130 10 1
Load dataset in Python
python
import pandas as pd
df = pd.read_csv("system_logs.csv")
print(df.head())
Preprocessing
python
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
X = df.drop(columns=["Failure"])
y = df["Failure"]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2,
random_state=42)
Step 3: Train Machine Learning Model
We'll use a Random Forest classifier to predict failures.
python
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
print(f"Accuracy: {accuracy_score(y_test, y_pred):.2f}")
Step 4: Deploy Model as an API
We'll use Flask to create an API for real-time predictions.
app.py
python
from flask import Flask, request, jsonify
import joblib
import numpy as np
app = Flask(__name__)
model = joblib.load("predictor.pkl")
scaler = joblib.load("scaler.pkl")
@app.route('/predict', methods=['POST'])
def predict():
data = request.json["features"]
scaled_data = scaler.transform([data])
prediction = model.predict(scaled_data)[0]
return jsonify({"prediction": int(prediction)})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
Save the model
python
import joblib
joblib.dump(model, "predictor.pkl")
joblib.dump(scaler, "scaler.pkl")
Step 5: Alerting with Prometheus & Alertmanager
Expose metrics for monitoring
Modify app.py:
python
from prometheus_client import Counter, start_http_server
failure_alerts = Counter('system_failure_alerts', 'Number of predicted failures')
@app.route('/predict', methods=['POST'])
def predict():
data = request.json["features"]
scaled_data = scaler.transform([data])
prediction = model.predict(scaled_data)[0]
if prediction == 1:
failure_alerts.inc() # Increment alert count
return jsonify({"prediction": int(prediction)})
if __name__ == "__main__":
start_http_server(8000) # Expose metrics at port 8000
app.run(host="0.0.0.0", port=5000)
Configure Prometheus to scrape Flask app
Edit prometheus.yml:
yaml
scrape_configs:
- job_name: 'predictive_alerts'
static_configs:
- targets: ['localhost:8000']
Run Prometheus:
./prometheus --config.file=prometheus.yml
Alertmanager Rules
Create alert_rules.yml:
yaml
groups:
- name: system_alerts
rules:
- alert: SystemFailure
expr: system_failure_alerts > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Potential system failure detected!"
Run Alertmanager:
./alertmanager --config.file=alertmanager.yml
Step 6: Containerize & Deploy
Dockerfile
FROM python:3.9
WORKDIR /app
COPY . /app
RUN pip install -r requirements.txt
CMD ["python", "app.py"]
Build & Run:
docker build -t predictive-alerts .
docker run -p 5000:5000 predictive-alerts
Deploy on Kubernetes:
kubectl create deployment predictive-alerts --image=predictive-alerts
kubectl expose deployment predictive-alerts --type=NodePort --port=5000
Conclusion
This project enables proactive incident management by:
● Analyzing system logs to detect failure patterns
● Predicting failures using AI models
● Alerting teams via Prometheus & Alertmanager
● Deploying the solution using Docker & Kubernetes
10. AI for CI/CD & DevSecOps
Project 1. AI-Driven Test Suite Optimization: Using AI to automatically
optimize the sequence of tests in CI/CD pipelines to reduce the overall pipeline
runtime.
In modern CI/CD pipelines, running a full test suite can be time-consuming,
delaying deployments. This project leverages AI to optimize test execution order,
prioritizing tests based on past failures, execution time, and code changes. By
running critical tests first, we can detect failures earlier and reduce the overall
pipeline runtime.
Project Setup
Tech Stack
● Python (Machine Learning & Optimization)
● Pytest (Test framework)
● GitHub Actions/Jenkins (CI/CD)
● SQLite (Storing test history)
● Docker (Containerization)
Step 1: Set Up the Project
mkdir ai-test-optimizer && cd ai-test-optimizer
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install pytest numpy pandas scikit-learn sqlite3
This creates a virtual environment and installs necessary dependencies.
Step 2: Create a Sample Test Suite
Create a tests/ directory with sample test cases.
mkdir tests
Example: Sample Pytest Test Cases (tests/test_sample.py)
python
import time
import random
def test_fast():
"""A fast test case"""
time.sleep(1)
assert True
def test_slow():
"""A slow test case"""
time.sleep(3)
assert True
def test_unstable():
"""A test that sometimes fails"""
time.sleep(2)
assert random.choice([True, False])
● test_fast() runs quickly
● test_slow() takes more time
● test_unstable() is flaky
Step 3: Store Test History in SQLite
We store execution time and failure history in a database to optimize the order of
execution.
Create Database and Logger (test_logger.py)
python
import sqlite3
import time
DB_FILE = "test_history.db"
def setup_db():
"""Initialize the test history database"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS test_history (
test_name TEXT PRIMARY KEY,
avg_runtime REAL,
failure_count INTEGER
)
""")
conn.commit()
conn.close()
def log_test_result(test_name, runtime, failed):
"""Update test execution history"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("SELECT avg_runtime, failure_count FROM test_history
WHERE test_name=?", (test_name,))
row = cursor.fetchone()
if row:
avg_runtime, failure_count = row
new_runtime = (avg_runtime + runtime) / 2
new_failures = failure_count + (1 if failed else 0)
cursor.execute("UPDATE test_history SET avg_runtime=?, failure_count=?
WHERE test_name=?",
(new_runtime, new_failures, test_name))
else:
cursor.execute("INSERT INTO test_history (test_name, avg_runtime,
failure_count) VALUES (?, ?, ?)",
(test_name, runtime, 1 if failed else 0))
conn.commit()
conn.close()
setup_db()
This script:
● Creates an SQLite database to track test runtime and failures
● Logs test execution results
Step 4: AI Model to Prioritize Tests
We use scikit-learn to prioritize tests based on past failures and execution time.
Create AI Model (ai_test_optimizer.py)
python
import sqlite3
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
DB_FILE = "test_history.db"
def get_prioritized_tests():
"""Fetch and sort tests based on AI-driven priority"""
conn = sqlite3.connect(DB_FILE)
df = pd.read_sql_query("SELECT * FROM test_history", conn)
conn.close()
if df.empty:
return []
# Normalize data
scaler = MinMaxScaler()
df[["avg_runtime", "failure_count"]] = scaler.fit_transform(df[["avg_runtime",
"failure_count"]])
# Prioritize: Sort by failures (descending) & runtime (ascending)
df["priority_score"] = df["failure_count"] - df["avg_runtime"]
df = df.sort_values(by="priority_score", ascending=False)
return df["test_name"].tolist()
print(get_prioritized_tests())
This script:
● Fetches test data from the database
● Normalizes runtime and failure count
● Assigns priority (run failure-prone tests first, fast tests before slow ones)
Step 5: Run Tests in Optimized Order
Modify the test runner to execute prioritized tests.
Run Optimized Test Execution (run_tests.py)
python
import pytest
import time
from ai_test_optimizer import get_prioritized_tests
from test_logger import log_test_result
def run_test(test_name):
"""Run a single test and log results"""
start = time.time()
result = pytest.main(["-q", f"tests/{test_name}.py"])
end = time.time()
log_test_result(test_name, end - start, result != 0)
def run_tests():
"""Run tests in AI-optimized order"""
test_order = get_prioritized_tests()
if not test_order:
test_order = ["test_sample"] # Default if no history
for test in test_order:
run_test(test)
if __name__ == "__main__":
run_tests()
This script:
● Fetches prioritized tests
● Runs them one by one
● Logs results in the database
Step 6: Integrate with CI/CD (GitHub Actions or Jenkins)
GitHub Actions Workflow (.github/workflows/test_optimization.yml)
yaml
name: AI-Test-Optimization
on: [push, pull_request]
jobs:
run-tests:
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v3
- name: Set Up Python
uses: actions/setup-python@v4
with:
python-version: "3.9"
- name: Install Dependencies
run: |
python -m venv venv
source venv/bin/activate
pip install pytest numpy pandas scikit-learn sqlite3
- name: Run Optimized Tests
run: |
source venv/bin/activate
python run_tests.py
Step 7: Run Everything
Run the following commands to test locally:
python test_logger.py # Initialize database
python ai_test_optimizer.py # Check test order
python run_tests.py # Run optimized tests
Conclusion
● This AI-driven approach prioritizes failure-prone and fast tests to detect
bugs earlier and reduce pipeline runtime.
● The system continuously learns from test results, improving efficiency
over time.
● It can be integrated into any CI/CD pipeline like Jenkins, GitHub Actions,
or GitLab CI.
Project 2. AI for Continuous Security Assessment: Real-time security
vulnerability detection during the CI/CD pipeline, integrated into DevSecOps
practices.
Introduction
As security threats evolve, organizations must integrate continuous security
assessment within their CI/CD pipelines. This project implements AI-driven
real-time security vulnerability detection, ensuring DevSecOps compliance. By
integrating AI-based tools, we automate security scanning and risk analysis at
various CI/CD stages.
Project Overview
Technology Stack
● CI/CD Tools: Jenkins/GitHub Actions/GitLab CI
● AI/ML for Security: OpenAI API, ML Models (Scikit-learn, TensorFlow)
● Security Tools: OWASP Dependency-Check, Trivy, SonarQube
● Containerization: Docker, Kubernetes
● Infrastructure as Code: Terraform
● Monitoring: Prometheus, Grafana
● Database: PostgreSQL/MongoDB (for storing vulnerabilities)
● Scripting: Python, Shell
Step-by-Step Implementation
Step 1: Setup CI/CD Pipeline
1.1 Install Jenkins/GitHub Actions/GitLab CI
# Install Jenkins (Ubuntu)
sudo apt update
sudo apt install openjdk-11-jdk -y
wget -q -O - https://pkg.jenkins.io/debian-stable/jenkins.io.key | sudo apt-key add -
sudo sh -c 'echo deb http://pkg.jenkins.io/debian-stable binary/ >
/etc/apt/sources.list.d/jenkins.list'
sudo apt update
sudo apt install jenkins -y
sudo systemctl start jenkins
sudo systemctl enable jenkins
For GitHub Actions or GitLab CI, configure .github/workflows/security.yml or
.gitlab-ci.yml.
Step 2: AI-based Security Scanning
2.1 Integrate OWASP Dependency-Check for Vulnerability Analysis
# Install OWASP Dependency-Check
wget
https://github.com/jeremylong/DependencyCheck/releases/download/v7.0.4/depen
dency-check-7.0.4-release.zip
unzip dependency-check-7.0.4-release.zip
cd dependency-check/bin
./dependency-check.sh --project "AI-Security-Scan" --scan /path/to/project
2.2 Automate Security Scanning in CI/CD
yaml
# GitHub Actions - .github/workflows/security.yml
name: Security Scan
on: [push]
jobs:
security-check:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Run OWASP Dependency-Check
run: ./dependency-check/bin/dependency-check.sh --project "AI-Security"
--scan .
Step 3: AI Integration for Threat Analysis
3.1 Build AI Model for Security
python
# ai_security_model.py - Machine Learning Model for Security Analysis
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# Load vulnerability dataset
data = pd.read_csv("vulnerability_data.csv")
X = data.drop(columns=["Risk_Level"])
y = data["Risk_Level"]
# Train ML Model
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
random_state=42)
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
# Evaluate model
predictions = model.predict(X_test)
print("Model Accuracy:", accuracy_score(y_test, predictions))
# Save model
import joblib
joblib.dump(model, "security_model.pkl")
Step 4: AI-based Risk Prediction in Pipeline
4.1 Integrate AI Model into CI/CD
yaml
- name: AI Security Check
run: python security_check.py
4.2 Security Assessment with AI
python
# security_check.py - Use AI model in CI/CD
import joblib
import pandas as pd
# Load trained model
model = joblib.load("security_model.pkl")
# Scan new code vulnerabilities
new_scan = pd.read_csv("new_vulnerabilities.csv")
risk_predictions = model.predict(new_scan)
# Generate security report
for idx, risk in enumerate(risk_predictions):
print(f"Vulnerability {idx+1}: Risk Level - {risk}")
Step 5: Deploy Secure Infrastructure using Terraform
5.1 Define Secure Cloud Resources
hcl
# main.tf - Terraform configuration
provider "aws" {
region = "us-east-1"
}
resource "aws_s3_bucket" "security_logs" {
bucket = "ai-security-logs"
acl = "private"
}
5.2 Apply Terraform Configuration
terraform init
terraform apply -auto-approve
Step 6: Security Monitoring & Alerts
6.1 Setup Prometheus & Grafana
docker run -d -p 9090:9090 --name prometheus prom/prometheus
docker run -d -p 3000:3000 --name grafana grafana/grafana
6.2 Monitor Vulnerabilities in Real-time
yaml
# Prometheus Alert for High-Risk Vulnerabilities
groups:
- name: security_alerts
rules:
- alert: HighSeverityVulnerability
expr: security_risk > 8
for: 2m
labels:
severity: critical
annotations:
summary: "High-risk security vulnerability detected!"
Project Summary
✅ Implemented CI/CD security scanning with OWASP Dependency-Check
✅ Integrated AI model for real-time threat assessment
✅ Automated security risk classification using Machine Learning
✅ Deployed secure infrastructure with Terraform
✅ Monitored vulnerabilities using Prometheus & Grafana
Project 3. AI-Based Dependency Vulnerability Scanning: Implement AI-based
scanning of dependencies in code repositories for potential vulnerabilities or
license compliance issues.
Dependency vulnerabilities in software projects can lead to security risks and
compliance violations. Traditional scanning tools like OWASP
Dependency-Check, Snyk, or Trivy detect vulnerabilities, but AI can improve
detection accuracy and predict potential risks. This project builds an AI-powered
scanner that integrates machine learning models with existing vulnerability
databases to enhance security scanning.
Project Steps
1. Set Up Environment
○ Install Python and required libraries
○ Set up a virtual environment
2. Get Project Dependencies
○ Clone a sample code repository
○ Extract dependencies (Maven, npm, pip, etc.)
3. Collect Vulnerability Data
○ Use sources like the National Vulnerability Database (NVD)
○ Parse Common Vulnerabilities and Exposures (CVE) data
4. AI-Based Vulnerability Analysis
○ Train a simple AI model to predict risk levels
○ Use NLP to analyze package descriptions
5. Implement License Compliance Check
○ Extract license information from dependencies
○ Cross-check against approved licenses
6. Generate Reports and Alerts
○ Store results in a database
○ Send alerts for critical vulnerabilities
7. Integrate with CI/CD Pipeline
○ Automate scanning in GitHub Actions or Jenkins
Step-by-Step Implementation
1. Set Up Environment
Install Python and create a virtual environment:
sudo apt update && sudo apt install python3 python3-venv -y
python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip
Install required dependencies:
pip install requests beautifulsoup4 pandas scikit-learn tensorflow nltk
2. Clone a Sample Repository & Extract Dependencies
Clone a test project (Java, Node.js, Python, etc.):
git clone https://github.com/your-test-repo.git
cd your-test-repo
Extract dependencies:
For Python (pip):
pip freeze > requirements.txt
For Node.js (npm):
npm list --json > dependencies.json
For Java (Maven):
mvn dependency:tree -DoutputType=text -DoutputFile=dependencies.txt
3. Fetch Vulnerability Data
Fetch vulnerability data from the National Vulnerability Database (NVD):
python
import requests
NVD_API = "https://services.nvd.nist.gov/rest/json/cves/1.0"
def get_cve_data():
response = requests.get(NVD_API)
return response.json()
cve_data = get_cve_data()
print(cve_data) # Sample CVE JSON output
4. AI-Based Vulnerability Detection
Use AI to classify dependency risks:
python
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
# Sample training data
vulnerabilities = ["Critical SQL Injection vulnerability in package X",
"Minor dependency update issue in package Y"]
labels = [1, 0] # 1 = High Risk, 0 = Low Risk
vectorizer = TfidfVectorizer()
X_train = vectorizer.fit_transform(vulnerabilities)
model = LogisticRegression()
model.fit(X_train, labels)
# Predict new risks
def predict_risk(description):
X_test = vectorizer.transform([description])
return model.predict(X_test)[0]
print(predict_risk("Security flaw found in package Z")) # Output: 1 (High Risk) or
0 (Low Risk)
5. License Compliance Check
Extract and verify licenses:
python
import json
def check_license():
with open("dependencies.json", "r") as f:
data = json.load(f)
for package, info in data["dependencies"].items():
print(f"Package: {package}, License: {info.get('license', 'Unknown')}")
check_license()
6. Generate Reports
Save results in a CSV file:
python
import pandas as pd
results = [{"package": "numpy", "risk": "High"}, {"package": "requests", "risk":
"Low"}]
df = pd.DataFrame(results)
df.to_csv("scan_results.csv", index=False)
7. Integrate with CI/CD (Jenkins Example)
Add this to your Jenkinsfile:
groovy
pipeline {
agent any
stages {
stage('Dependency Scan') {
steps {
sh 'python3 scan.py'
}
}
stage('Check Results') {
steps {
sh 'cat scan_results.csv'
}
}
}
}
Conclusion
This project builds an AI-based Dependency Vulnerability Scanner that:
● Extracts dependencies from code repositories
● Fetches vulnerability data from NVD
● Uses AI to classify risk levels
● Checks licenses for compliance
● Generates reports and integrates with CI/CD
Project 4. Automated Code Quality Review with AI: AI models that scan code
during CI/CD builds and provide insights into code quality, security, and
performance improvements.
Objective: Implement an AI-driven code quality review system in a CI/CD
pipeline to analyze code for security, performance, and best practices.
Step-by-Step Guide
Step 1: Set Up the Project
Create a directory for the project
mkdir ai-code-review
cd ai-code-review
Initialize a Git repository
git init
Set up a Python virtual environment
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
Install dependencies
pip install openai flake8 bandit
Step 2: Implement AI-Powered Code Review Script
● Create a Python script code_review.py to analyze code using Flake8 (for
style), Bandit (for security), and OpenAI API (for AI-driven insights).
python
import os
import openai
import subprocess
openai.api_key = "your_openai_api_key"
def run_command(command):
"""Execute a shell command and return output"""
result = subprocess.run(command, shell=True, capture_output=True, text=True)
return result.stdout.strip()
def analyze_code():
"""Run static analysis tools"""
flake8_result = run_command("flake8 . --exclude=venv")
bandit_result = run_command("bandit -r .")
return f"Flake8 Report:\n{flake8_result}\n\nBandit Security
Report:\n{bandit_result}"
def ai_code_review(code_analysis):
"""Send analysis to OpenAI for insights"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "system", "content": "You are an expert code reviewer."},
{"role": "user", "content": f"Analyze this report and provide
suggestions:\n{code_analysis}"}]
)
return response["choices"][0]["message"]["content"]
if __name__ == "__main__":
report = analyze_code()
ai_suggestions = ai_code_review(report)
print("=== AI Code Review Suggestions ===")
print(ai_suggestions)
Step 3: Set Up a CI/CD Pipeline in GitHub Actions
● Create .github/workflows/code_review.yml
yaml
name: AI Code Review
on: [push, pull_request]
jobs:
review:
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v4
- name: Set Up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install Dependencies
run: |
python -m venv venv
source venv/bin/activate
pip install openai flake8 bandit
- name: Run AI Code Review
run: python code_review.py
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
Step 4: Commit and Push Code
git add .
git commit -m "Add AI Code Review"
git push origin main
Step 5: Review AI Code Analysis in GitHub Actions
Once the GitHub Action runs, it will analyze your code, check for issues, and
provide AI-generated suggestions.
1. analyze_code()
○ Runs flake8 for style checks.
○ Runs bandit for security scans.
○ Collects reports for AI processing.
2. ai_code_review()
○ Sends the analysis to OpenAI’s GPT-4 model for review.
○ Receives feedback on improvements.
3. CI/CD Pipeline
○ Runs automatically on every push/pull request.
○ Installs dependencies and executes the review script.
Project 5. AI-Enhanced Test Failure Analysis: Using AI to automatically
analyze failed tests in CI/CD pipelines and suggest possible causes and fixes.
Introduction
In CI/CD pipelines, test failures can slow down development. This project
automates test failure analysis using AI. It collects failure logs from Jenkins,
processes them using NLP (Natural Language Processing), and uses OpenAI
GPT to suggest possible causes and fixes.
Step 1: Setting Up the Environment
Prerequisites
● Jenkins installed and running
● Python (>=3.8) installed
● Docker installed
● OpenAI API key
Required Python Libraries
pip install openai requests flask
Step 2: Jenkins Job Setup
Jenkinsfile Configuration
This pipeline will run tests and send failure logs to our AI-powered analysis tool.
groovy
pipeline {
agent any
stages {
stage('Checkout') {
steps {
git 'https://github.com/your-repo/your-project.git'
}
}
stage('Run Tests') {
steps {
script {
def testResult = sh(script: 'pytest --tb=short > test_output.log; echo $?',
returnStatus: true)
archiveArtifacts artifacts: 'test_output.log', fingerprint: true
if (testResult != 0) {
sh 'curl -X POST -F "file=@test_output.log"
http://localhost:5000/analyze'
error("Tests failed. Check AI analysis.")
}
}
}
}
}
}
● Runs tests with pytest
● Captures failures in test_output.log
● Sends the log to the AI-powered Flask service
Step 3: Creating the AI Service with Flask
Flask API (ai_analysis.py)
python
from flask import Flask, request, jsonify
import openai
import os
app = Flask(__name__)
# Set your OpenAI API key
openai.api_key = os.getenv("OPENAI_API_KEY")
@app.route('/analyze', methods=['POST'])
def analyze():
if 'file' not in request.files:
return jsonify({'error': 'No file uploaded'}), 400
file = request.files['file']
log_data = file.read().decode('utf-8')
prompt = f"Analyze the following test failure logs and suggest possible causes
and fixes:\n\n{log_data}"
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
ai_suggestion = response['choices'][0]['message']['content']
return jsonify({'suggestion': ai_suggestion})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
● Reads the test logs
● Sends them to GPT-4
● Returns possible causes & fixes
Step 4: Running the AI Service in Docker
Dockerfile
dockerfile
FROM python:3.8
WORKDIR /app
COPY ai_analysis.py .
RUN pip install flask openai
CMD ["python", "ai_analysis.py"]
Build and Run the Container
docker build -t ai-test-analyzer .
docker run -d -p 5000:5000 --env OPENAI_API_KEY=your_api_key
ai-test-analyzer
Step 5: Running the Complete Setup
Start Jenkins Pipeline
1. Push your code to GitHub
2. Trigger the Jenkins job
3. Jenkins runs tests, collects failures
4. Failed logs sent to AI service
5. AI suggests fixes in Jenkins logs
Example Output
Test Failure Log (test_output.log) makefile
AssertionError: Expected 200 but got 500
AI Suggestion
pgsql
Possible Cause: The API endpoint might be returning a 500 due to an unhandled
exception.
Fix: Check application logs for errors. Validate input parameters. Ensure database
connection is active.
Summary
● Automates test failure analysis using AI
● Saves developers time debugging failures
● Easily integrates into CI/CD pipelines
11. AI for Infrastructure & Network Monitoring
Project 1. AI-Powered Load Forecasting for Infrastructure: Predicting
infrastructure load for upcoming days or weeks using historical data and adjusting
resource allocation accordingly.
This project predicts infrastructure load (such as CPU, memory, or network usage)
for upcoming days or weeks using historical data. The goal is to optimize resource
allocation by analyzing past trends and forecasting future demands with machine
learning.
Step 1: Setting Up the Environment
Before starting, ensure you have Python and essential libraries installed.
Install Required Packages
pip install pandas numpy scikit-learn matplotlib seaborn tensorflow
Step 2: Data Collection & Preprocessing
We assume the dataset contains historical infrastructure usage data, including
timestamps, CPU load, memory usage, and network activity.
Load the Dataset
python
import pandas as pd
# Load dataset
df = pd.read_csv("infrastructure_usage.csv", parse_dates=["timestamp"])
# Display first few rows
print(df.head())
Handle Missing Data
python
df = df.fillna(method="ffill") # Forward fill missing values
Feature Engineering
python
df["hour"] = df["timestamp"].dt.hour
df["day_of_week"] = df["timestamp"].dt.dayofweek
df["month"] = df["timestamp"].dt.month
Step 3: Data Visualization
Plot CPU Usage Over Time
python
import matplotlib.pyplot as plt
plt.figure(figsize=(10,5))
plt.plot(df["timestamp"], df["cpu_load"], label="CPU Load")
plt.xlabel("Time")
plt.ylabel("CPU Load")
plt.title("CPU Load Over Time")
plt.legend()
plt.show()
Step 4: Train-Test Split
python
from sklearn.model_selection import train_test_split
X = df[["hour", "day_of_week", "month", "cpu_load"]].values
y = df["cpu_load"].shift(-1).fillna(0).values # Predicting next time step
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
Step 5: Building a Machine Learning Model
We will use LSTM (Long Short-Term Memory), a type of neural network
effective for time-series forecasting.
Prepare Data for LSTM
python
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
# Reshape data for LSTM
X_train = np.reshape(X_train, (X_train.shape[0], 1, X_train.shape[1]))
X_test = np.reshape(X_test, (X_test.shape[0], 1, X_test.shape[1]))
Define LSTM Model
python
model = Sequential([
LSTM(50, return_sequences=True, input_shape=(1, X_train.shape[2])),
LSTM(50, return_sequences=False),
Dense(25),
Dense(1)
])
model.compile(optimizer="adam", loss="mean_squared_error")
model.fit(X_train, y_train, epochs=10, batch_size=32)
Step 6: Model Evaluation & Prediction
python
predictions = model.predict(X_test)
plt.figure(figsize=(10,5))
plt.plot(y_test, label="Actual Load")
plt.plot(predictions, label="Predicted Load", linestyle="dashed")
plt.xlabel("Time")
plt.ylabel("CPU Load")
plt.title("Infrastructure Load Forecasting")
plt.legend()
plt.show()
Step 7: Deployment (Optional - Using Flask)
To deploy the model as an API, create a Flask app.
Install Flask
pip install flask
Create app.py
python
from flask import Flask, request, jsonify
import numpy as np
import tensorflow as tf
app = Flask(__name__)
model = tf.keras.models.load_model("load_forecasting_model.h5")
@app.route("/predict", methods=["POST"])
def predict():
data = request.json
input_data = np.array(data["features"]).reshape(1, 1, -1)
prediction = model.predict(input_data)
return jsonify({"prediction": float(prediction[0][0])})
if __name__ == "__main__":
app.run(debug=True)
Run the API
python app.py
Test API with Curl
curl -X POST http://127.0.0.1:5000/predict -H "Content-Type: application/json" -d
'{"features": [10, 3, 7, 50]}'
Conclusion
This project used LSTM to forecast infrastructure load and built an API for
real-world integration. It helps DevOps teams optimize resource allocation and
prevent over-provisioning or downtime.
Project 2. Proactive Infrastructure Health Monitoring: AI model for identifying
potential infrastructure failures before they occur by monitoring system health in
real time.
Infrastructure failures in IT systems can lead to downtime, security risks, and
financial losses. A Proactive Infrastructure Health Monitoring System
leverages AI and real-time monitoring to detect potential failures before they
occur. It analyzes system health metrics, predicts issues, and alerts administrators
to take preventive action.
In this project, we will build an AI-driven monitoring system using Python,
Flask, Prometheus, Grafana, and Machine Learning (Scikit-learn/PyTorch).
This system collects system health metrics (CPU, memory, disk usage), trains an
AI model to predict failures, and visualizes real-time data.
Project Setup & Steps
Step 1: Install Dependencies
Before starting, ensure you have Python and necessary tools installed.
sudo apt update && sudo apt upgrade -y
sudo apt install python3 python3-pip -y
pip install flask prometheus_client psutil pandas scikit-learn matplotlib
Step 2: Build the System Metrics Collector
Create a Python script to collect CPU, memory, and disk usage metrics.
Create metrics_collector.py
python
from flask import Flask, Response
import psutil
from prometheus_client import Gauge, generate_latest
app = Flask(__name__)
# Define Prometheus metrics
cpu_usage = Gauge("cpu_usage", "CPU Usage Percentage")
memory_usage = Gauge("memory_usage", "Memory Usage Percentage")
disk_usage = Gauge("disk_usage", "Disk Usage Percentage")
@app.route("/metrics")
def metrics():
cpu_usage.set(psutil.cpu_percent(interval=1))
memory_usage.set(psutil.virtual_memory().percent)
disk_usage.set(psutil.disk_usage("/").percent)
return Response(generate_latest(), mimetype="text/plain")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
Run the Metrics Collector
python3 metrics_collector.py
Your system's health metrics will be available at http://localhost:5000/metrics.
Step 3: Train an AI Model to Predict Failures
We will use a simple machine learning model to predict system failures based on
collected data.
Create train_model.py
python
import pandas as pd
import joblib
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
# Generate synthetic data
data = {
"cpu_usage": [10, 20, 50, 90, 95, 80, 60, 40],
"memory_usage": [30, 40, 50, 85, 90, 70, 60, 50],
"disk_usage": [40, 50, 60, 80, 85, 70, 65, 55],
"failure": [0, 0, 0, 1, 1, 1, 0, 0] # 1 = Failure, 0 = Normal
}
df = pd.DataFrame(data)
# Split dataset
X = df[["cpu_usage", "memory_usage", "disk_usage"]]
y = df["failure"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
random_state=42)
# Train model
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
# Save the model
joblib.dump(model, "failure_prediction_model.pkl")
# Evaluate
y_pred = model.predict(X_test)
print(f"Model Accuracy: {accuracy_score(y_test, y_pred) * 100:.2f}%")
Run the Model Training
python3 train_model.py
The trained model will be saved as failure_prediction_model.pkl.
Step 4: Deploy an API for AI Predictions
We will create a Flask API that takes real-time metrics and predicts potential
failures.
Create predict_failure.py
python
from flask import Flask, request, jsonify
import joblib
import psutil
app = Flask(__name__)
# Load trained model
model = joblib.load("failure_prediction_model.pkl")
@app.route("/predict", methods=["GET"])
def predict():
# Get real-time system metrics
data = {
"cpu_usage": psutil.cpu_percent(interval=1),
"memory_usage": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage("/").percent,
}
# Make prediction
prediction = model.predict([[data["cpu_usage"], data["memory_usage"],
data["disk_usage"]]])
result = "Failure predicted! Take action!" if prediction[0] == 1 else "System is
healthy."
return jsonify({"metrics": data, "prediction": result})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5001)
Run the AI Prediction API
python3 predict_failure.py
Now, visit http://localhost:5001/predict to see real-time predictions.
Step 5: Setup Prometheus for Monitoring
Prometheus will scrape our metrics and store them for analysis.
Install Prometheus
wget
https://github.com/prometheus/prometheus/releases/latest/download/prometheus-li
nux-amd64.tar.gz
tar -xvf prometheus-linux-amd64.tar.gz
cd prometheus-linux-amd64
Edit prometheus.yml
Add the following under scrape_configs:
yaml
scrape_configs:
- job_name: 'system_metrics'
static_configs:
- targets: ['localhost:5000']
Run Prometheus
./prometheus --config.file=prometheus.yml
Prometheus UI will be available at http://localhost:9090.
Step 6: Setup Grafana for Visualization
Grafana will display real-time system health data.
Install Grafana
sudo apt install -y software-properties-common
sudo add-apt-repository "deb https://packages.grafana.com/oss/deb stable main"
sudo apt update
sudo apt install grafana -y
Start Grafana
sudo systemctl start grafana-server
sudo systemctl enable grafana-server
Access Grafana UI
Visit http://localhost:3000 (default username/password: admin/admin).
Add Prometheus as a Data Source
● Go to Settings > Data Sources > Add Prometheus
● URL: http://localhost:9090
Create Dashboards
● Import a dashboard and select cpu_usage, memory_usage, and disk_usage
as metrics.
Final Architecture
1. Metrics Collector (Flask) → Sends system health data to Prometheus
2. AI Model (Scikit-learn) → Predicts failures
3. Prediction API (Flask) → Provides real-time failure warnings
4. Prometheus → Stores and queries metrics
5. Grafana → Visualizes data for monitoring
Step 7: Automate with Docker (Optional)
Create Dockerfile
dockerfile
FROM python:3.9
WORKDIR /app
COPY . .
RUN pip install -r requirements.txt
CMD ["python3", "metrics_collector.py"]
Build & Run the Container
docker build -t infra-monitor .
docker run -d -p 5000:5000 infra-monitor
Conclusion
This Proactive Infrastructure Health Monitoring System allows organizations
to predict and prevent system failures using AI-driven monitoring. By integrating
Flask, Prometheus, Grafana, and ML models, we gain real-time insights into
system health, reducing downtime risks.
Project 3. Network Traffic Anomaly Detection with AI: Using machine learning
to detect outliers in network traffic data (e.g., unusual spikes or drops), potentially
identifying attacks.
Network security is a crucial aspect of modern digital infrastructure. Detecting
anomalies in network traffic can help identify potential security threats, such as
DDoS attacks, data exfiltration, or unauthorized access.
In this project, we will use Machine Learning (ML) to detect unusual traffic
patterns using unsupervised learning techniques like Isolation Forest and
One-Class SVM.
Project Setup
1. Install Required Libraries
Before starting, install the necessary Python libraries:
pip install pandas numpy scikit-learn matplotlib seaborn
Step-by-Step Implementation
Step 1: Import Libraries
python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
Step 2: Load and Explore the Dataset
For this project, we will use a synthetic dataset. However, you can also use real
datasets like CICIDS2017 or KDDCup99.
python
# Create synthetic network traffic data
np.random.seed(42)
normal_traffic = np.random.normal(loc=50, scale=10, size=(1000, 2))
anomalous_traffic = np.random.normal(loc=100, scale=20, size=(50, 2)) #
Simulating attacks
# Combine normal and anomalous traffic
data = np.vstack((normal_traffic, anomalous_traffic))
labels = np.array([0] * 1000 + [1] * 50) # 0 = normal, 1 = anomaly
# Convert to DataFrame
df = pd.DataFrame(data, columns=['Packets_Per_Second', 'Bytes_Per_Second'])
df['Anomaly'] = labels
# Display first few rows
print(df.head())
# Plot data distribution
sns.scatterplot(x=df['Packets_Per_Second'], y=df['Bytes_Per_Second'],
hue=df['Anomaly'])
plt.title('Network Traffic Data')
plt.show()
📌 Explanation:
● We generate normal traffic using a normal distribution.
● We introduce anomalies to simulate unusual traffic patterns.
● The dataset contains two features: Packets per Second and Bytes per
Second.
Step 3: Preprocess the Data
python
scaler = StandardScaler()
df[['Packets_Per_Second', 'Bytes_Per_Second']] =
scaler.fit_transform(df[['Packets_Per_Second', 'Bytes_Per_Second']])
📌 Why Standardization?
● Since ML models work better with normalized data, we use StandardScaler
to bring all values into a common range.
Step 4: Train the Isolation Forest Model
python
iso_forest = IsolationForest(contamination=0.05, random_state=42)
df['Anomaly_Score'] = iso_forest.fit_predict(df[['Packets_Per_Second',
'Bytes_Per_Second']])
# Replace -1 with 1 for anomaly detection
df['Anomaly_Detected'] = (df['Anomaly_Score'] == -1).astype(int)
# Display detected anomalies
print(df[df['Anomaly_Detected'] == 1].head())
📌 Explanation:
● Isolation Forest isolates anomalies by recursively partitioning data.
● contamination=0.05 assumes 5% of data is anomalous.
● The model predicts -1 for anomalies and 1 for normal data.
Step 5: Train One-Class SVM Model (Alternative Approach)
python
oc_svm = OneClassSVM(nu=0.05, kernel="rbf", gamma='scale')
df['SVM_Anomaly_Score'] = oc_svm.fit_predict(df[['Packets_Per_Second',
'Bytes_Per_Second']])
df['SVM_Anomaly_Detected'] = (df['SVM_Anomaly_Score'] == -1).astype(int)
# Display detected anomalies
print(df[df['SVM_Anomaly_Detected'] == 1].head())
📌 Explanation:
● One-Class SVM is another unsupervised anomaly detection method.
● It learns the normal behavior and flags deviations.
Step 6: Visualize the Anomalies
python
plt.figure(figsize=(10, 6))
sns.scatterplot(data=df, x='Packets_Per_Second', y='Bytes_Per_Second',
hue='Anomaly_Detected', palette={0: 'blue', 1: 'red'})
plt.title('Anomaly Detection using Isolation Forest')
plt.show()
📌 Visualization:
● Normal traffic points are shown in blue.
● Detected anomalies are marked in red.
Step 7: Evaluate the Model
python
from sklearn.metrics import classification_report
print("Isolation Forest Report:")
print(classification_report(df['Anomaly'], df['Anomaly_Detected']))
print("One-Class SVM Report:")
print(classification_report(df['Anomaly'], df['SVM_Anomaly_Detected']))
📌 Evaluation Metrics:
● Precision: How many detected anomalies are actual anomalies?
● Recall: How many actual anomalies were detected?
● F1-Score: Balances precision and recall.
Conclusion
🔹 This project demonstrated how Machine Learning can detect network
🔹 Isolation Forest and One-Class SVM help find outliers in network traffic
anomalies.
🔹 The model can be extended using real-time data from Wireshark, NetFlow, or
data.
🔹 Future improvements include deep learning models like Autoencoders for
cloud monitoring logs.
better accuracy.
Project 4. Distributed Network Monitoring with AI: AI to monitor network
performance across distributed environments (hybrid clouds, multi-region setups)
and provide insights.
In modern IT infrastructure, network monitoring is crucial, especially in hybrid
cloud and multi-region setups. Traditional monitoring tools often struggle with
scalability and real-time insights. This project leverages AI-powered monitoring
to:
● Track network performance across distributed environments
● Detect anomalies in network traffic
● Provide predictive insights using Machine Learning (ML)
We'll use:
● Python for backend development
● Prometheus & Grafana for monitoring & visualization
● Scapy & TShark for packet analysis
● TensorFlow/PyTorch for AI-based anomaly detection
● Docker & Kubernetes for deployment
Step-by-Step Implementation
Step 1: Install Dependencies
Ensure you have Python, Prometheus, and Grafana installed.
# Update system
sudo apt update && sudo apt upgrade -y
# Install Python & Virtual Environment
sudo apt install python3 python3-pip python3-venv -y
# Create a virtual environment
python3 -m venv venv
source venv/bin/activate
# Install required Python libraries
pip install scapy tensorflow pandas numpy matplotlib prometheus_client flask
requests
Step 2: Set Up Prometheus for Network Metrics Collection
Download & install Prometheus
wget
https://github.com/prometheus/prometheus/releases/latest/download/prometheus-li
nux-amd64.tar.gz
tar -xvf prometheus-linux-amd64.tar.gz
cd prometheus-linux-amd64
Configure Prometheus (prometheus.yml)
yaml
global:
scrape_interval: 10s
scrape_configs:
- job_name: "network-monitor"
static_configs:
- targets: ["localhost:8000"] # Flask API exposing network metrics
Start Prometheus
sh
./prometheus --config.file=prometheus.yml
Step 3: Build the Network Monitoring Script (Python API)
Create network_monitor.py
python
from flask import Flask, jsonify
from prometheus_client import start_http_server, Gauge
import scapy.all as scapy
import time
import random
app = Flask(__name__)
# Prometheus metrics
packet_count = Gauge('network_packet_count', 'Number of packets captured')
packet_size = Gauge('network_packet_size', 'Total size of packets captured')
def capture_traffic():
packets = scapy.sniff(count=10)
total_size = sum(len(p) for p in packets)
packet_count.set(len(packets))
packet_size.set(total_size)
@app.route('/metrics')
def metrics():
capture_traffic()
return jsonify({'packet_count': packet_count._value.get(), 'packet_size':
packet_size._value.get()})
if __name__ == '__main__':
start_http_server(8000)
app.run(host='0.0.0.0', port=5000)
Step 4: Implement AI for Anomaly Detection
Create anomaly_detection.py
python
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
# Simulated network data
data = np.array([[random.randint(100, 5000), random.randint(10, 200)] for _ in
range(100)])
df = pd.DataFrame(data, columns=["packet_size", "latency"])
# Normalize data
scaler = MinMaxScaler()
df_scaled = scaler.fit_transform(df)
# Create simple autoencoder for anomaly detection
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(8, activation='relu', input_shape=(2,)),
tf.keras.layers.Dense(4, activation='relu'),
tf.keras.layers.Dense(8, activation='relu'),
tf.keras.layers.Dense(2, activation='sigmoid')
])
model.compile(optimizer='adam', loss='mse')
model.fit(df_scaled, df_scaled, epochs=10, batch_size=8)
# Predict on new data
new_data = np.array([[4500, 180]]) # Example high packet size & latency
new_data_scaled = scaler.transform(new_data)
reconstruction = model.predict(new_data_scaled)
# Compute anomaly score
anomaly_score = np.mean(np.abs(new_data_scaled - reconstruction))
print("Anomaly Score:", anomaly_score)
Step 5: Deploy on Docker & Kubernetes
Dockerfile
FROM python:3.9
WORKDIR /app
COPY . .
RUN pip install -r requirements.txt
CMD ["python", "network_monitor.py"]
Build & Run Docker Container
docker build -t network-monitor .
docker run -p 5000:5000 network-monitor
Deploy to Kubernetes (network-monitor.yaml)
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: network-monitor
spec:
replicas: 2
selector:
matchLabels:
app: network-monitor
template:
metadata:
labels:
app: network-monitor
spec:
containers:
- name: network-monitor
image: network-monitor:latest
ports:
- containerPort: 5000
---
apiVersion: v1
kind: Service
metadata:
name: network-monitor-service
spec:
selector:
app: network-monitor
ports:
- protocol: TCP
port: 80
targetPort: 5000
type: LoadBalancer
Deploy on Kubernetes:
kubectl apply -f network-monitor.yaml
Step 6: Visualize Metrics in Grafana
Install Grafana
sudo apt install -y grafana
sudo systemctl start grafana-server
sudo systemctl enable grafana-server
Configure Data Source:
○ Go to http://localhost:3000
○ Login (admin/admin)
○ Add Prometheus as a data source
○ Query: {network_packet_count} & {network_packet_size}
● Flask API (network_monitor.py):
○ Captures network packets and exposes Prometheus metrics
○ Used to integrate with Grafana
● AI Model (anomaly_detection.py):
○ Uses TensorFlow Autoencoder for detecting unusual network
activity
● Docker & Kubernetes:
○ Docker: Packages the app into a container
○ Kubernetes: Deploys across distributed cloud environments
● Grafana:
○ Visualizes network metrics
Conclusion
This project provides real-time network monitoring with AI-powered anomaly
detection. It integrates with Prometheus & Grafana for visualization and can be
scaled across multi-cloud & hybrid environments using Kubernetes.