Automating DevOps and Cloud Operations
A Script Collection
● Python Scripts
Parse Log Files: Parse and extract specific data from log files for troubleshooting.
with open('server.log', 'r') as file:
for line in file:
if "ERROR" in line:
print(line.strip())
Generate Passwords: Create secure passwords for automation.
import random
import string
def generate_password(length=12):
chars = string.ascii_letters + string.digits + string.punctuation
return ''.join(random.choice(chars) for _ in range(length))
print(generate_password())
API Calls to Cloud Providers: Automate interactions with AWS, Azure, or GCP.
import boto3
ec2 = boto3.client('ec2')
instances = ec2.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
print(f"Instance ID: {instance['InstanceId']} - State:
{instance['State']['Name']}")
File Integrity Monitoring: Monitor file changes.
import hashlib
def get_file_hash(filepath):
with open(filepath, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
initial_hash = get_file_hash('/path/to/file')
while True:
if initial_hash != get_file_hash('/path/to/file'):
print("File has been modified!")
Continuous Integration Pipeline: Trigger Jenkins pipelines.
import requests
def trigger_jenkins_build(job_name, jenkins_url, user, token):
url = f"{jenkins_url}/job/{job_name}/build"
response = requests.post(url, auth=(user, token))
print(response.status_code, response.text)
trigger_jenkins_build('my-job', 'http://jenkins.local', 'admin',
'my-token')
Database Backups: Automate backups for databases.
import subprocess
from datetime import datetime
db_name = "my_database"
backup_file =
f"{db_name}_backup_{datetime.now().strftime('%Y%m%d%H%M%S')}.sql"
subprocess.run(["mysqldump", "-u", "root", "-p", db_name, "-r",
backup_file])
print(f"Backup saved as {backup_file}")
Infrastructure as Code Validation: Validate Terraform configurations.
import subprocess
def validate_terraform(path):
result = subprocess.run(["terraform", "validate", path],
capture_output=True, text=True)
if result.returncode == 0:
print("Validation passed")
else:
print("Validation failed:", result.stderr)
validate_terraform("/path/to/terraform")
Kubernetes Cluster Monitoring: Custom monitoring script.
from kubernetes import client, config
config.load_kube_config()
v1 = client.CoreV1Api()
for pod in v1.list_pod_for_all_namespaces().items:
print(f"{pod.metadata.namespace}/{pod.metadata.name} -
{pod.status.phase}")
● Shell Scripts
Disk Space Alert
#!/bin/bash
THRESHOLD=80
for path in $(df -h | grep '^/dev' | awk '{ print $5 " " $1 }')
do
usage=$(echo $path | awk '{ print $1 }' | sed 's/%//')
partition=$(echo $path | awk '{ print $2 }')
if [ $usage -gt $THRESHOLD ]; then
echo "Disk space critical: $partition ($usage%)"
fi
done
Create Backup
#!/bin/bash
tar -czvf /backup/mybackup_$(date +%F).tar.gz /important_data
User Management
#!/bin/bash
for user in $(cat users.txt)
do
useradd $user
echo "Created user: $user"
done
Service Health Check
#!/bin/bash
if ! systemctl is-active --quiet nginx; then
echo "Nginx is down. Restarting..."
systemctl restart nginx
fi
Automated Deployment
#!/bin/bash
git pull origin main
docker-compose down
docker-compose up -d --build
Network Latency Monitoring
#!/bin/bash
HOST="google.com"
ping -c 1 $HOST &> /dev/null
if [ $? -ne 0 ]; then
echo "Network is down"
else
echo "Network is up"
fi
Kubernetes Node Autoscaler
#!/bin/bash
NODE_COUNT=$(kubectl get nodes | grep -c Ready)
POD_COUNT=$(kubectl get pods --all-namespaces | wc -l)
if [ $POD_COUNT -gt $((NODE_COUNT * 50)) ]; then
echo "Scaling nodes..."
kubectl scale node-pool my-pool --replicas=$((NODE_COUNT + 1))
fi
High Availability Cluster Failover
#!/bin/bash
VIP="192.168.1.100"
CHECK_PORT=80
if ! nc -z $VIP $CHECK_PORT; then
echo "Primary is down. Switching to secondary."
ip addr add $VIP/24 dev eth0
fi
● Python Scripts
Check Website Availability
import requests
url = "http://example.com"
try:
response = requests.get(url)
if response.status_code == 200:
print(f"Website {url} is up.")
else:
print(f"Website {url} returned status code
{response.status_code}.")
except requests.ConnectionError:
print(f"Website {url} is down.")
Environment Variable Loader
import os
env_var = os.getenv("APP_ENV", "development")
print(f"Application environment: {env_var}")
Server Health Monitoring
import psutil
print(f"CPU Usage: {psutil.cpu_percent()}%")
print(f"Memory Usage: {psutil.virtual_memory().percent}%")
print(f"Disk Usage: {psutil.disk_usage('/').percent}%")
Rotate Logs
import os
import shutil
log_dir = "/var/log/myapp"
archive_dir = "/var/log/myapp/archive"
if not os.path.exists(archive_dir):
os.makedirs(archive_dir)
for log_file in os.listdir(log_dir):
if log_file.endswith(".log"):
shutil.move(os.path.join(log_dir, log_file), archive_dir)
print("Logs rotated successfully.")
SSH Automation
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect('your-server-ip', username='user', password='password')
stdin, stdout, stderr = ssh.exec_command('ls -l /var/log')
print(stdout.read().decode())
ssh.close()
Container Cleanup
import docker
client = docker.from_env()
containers = client.containers.list(all=True)
for container in containers:
if container.status == "exited":
container.remove()
print(f"Removed container: {container.name}")
AWS Cost Optimization Script
import boto3
client = boto3.client('ec2')
instances = client.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
if instance['State']['Name'] == 'stopped':
print(f"Instance {instance['InstanceId']} is stopped and
incurring no charges.")
Custom CI/CD Notifications
import smtplib
def send_email(subject, message, to_email):
smtp_server = "smtp.gmail.com"
smtp_port = 587
sender_email = "your-email@gmail.com"
sender_password = "your-password"
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(sender_email, sender_password)
email_message = f"Subject: {subject}\n\n{message}"
server.sendmail(sender_email, to_email, email_message)
send_email("Build Success", "Your latest CI build succeeded.",
"recipient@example.com")
● Shell Scripts
Check Process Running
#!/bin/bash
PROCESS_NAME="nginx"
if pgrep -x "$PROCESS_NAME" > /dev/null; then
echo "$PROCESS_NAME is running"
else
echo "$PROCESS_NAME is not running"
fi
Simple Backup with Tar
#!/bin/bash
BACKUP_DIR="/backup"
SOURCE_DIR="/data"
tar -czf "$BACKUP_DIR/backup_$(date +%Y%m%d).tar.gz" "$SOURCE_DIR"
SSL Certificate Expiry Checker
#!/bin/bash
DOMAIN="example.com"
EXPIRY_DATE=$(echo | openssl s_client -servername "$DOMAIN" -connect
"$DOMAIN:443" 2>/dev/null | openssl x509 -noout -dates | grep
'notAfter' | cut -d= -f2)
echo "SSL Certificate for $DOMAIN expires on $EXPIRY_DATE"
Docker Image Cleanup
#!/bin/bash
docker image prune -f
docker images | grep "<none>" | awk '{print $3}' | xargs docker rmi -f
Automated System Update
#!/bin/bash
apt-get update -y && apt-get upgrade -y
Dynamic Inventory for Ansible
#!/bin/bash
echo '{
"web": {
"hosts": ["web1.example.com", "web2.example.com"],
"vars": {
"ansible_user": "ubuntu"
}
}
}'
Autoscaling Alerts
#!/bin/bash
CPU_USAGE=$(top -bn1 | grep "Cpu(s)" | awk '{print $2 + $4}')
THRESHOLD=80
if (( $(echo "$CPU_USAGE > $THRESHOLD" | bc -l) )); then
echo "High CPU usage: $CPU_USAGE%"
# Trigger alert or scale action here
fi
Automated Canary Deployment
#!/bin/bash
kubectl apply -f canary-deployment.yaml
sleep 30
SUCCESS_RATE=$(curl -s "http://metrics-server/api/success-rate")
if (( $(echo "$SUCCESS_RATE > 95" | bc -l) )); then
kubectl apply -f stable-deployment.yaml
else
echo "Canary deployment failed. Rolling back."
kubectl delete -f canary-deployment.yaml
fi
● Python Scripts
Monitor Open Ports on a Server
● Purpose: Check which ports are open and being used by which processes.
import socket
def check_ports(ports, host='127.0.0.1'):
for port in ports:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1) # Timeout in seconds
result = s.connect_ex((host, port))
if result == 0:
print(f"Port {port} is open.")
else:
print(f"Port {port} is closed.")
# Check common ports
check_ports([22, 80, 443, 8080])
Parameters:
● ports: A list of port numbers to check.
● host: Host IP to check the ports on (127.0.0.1 is the local machine by default).
● s.settimeout(1): Sets a timeout for connection attempts to avoid hanging on closed
ports.
Custom Load Balancer Checker
● Purpose: Verify if backend servers behind a load balancer are responding.
import requests
servers = ["http://backend1.local", "http://backend2.local"]
for server in servers:
try:
response = requests.get(server, timeout=5)
if response.status_code == 200:
print(f"{server} is healthy.")
else:
print(f"{server} responded with status:
{response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Error connecting to {server}: {e}")
Parameters:
● servers: A list of server URLs to check.
● timeout=5: Sets a maximum time (in seconds) for waiting for a response.
● response.status_code: Checks the HTTP response code (200 means OK).
AWS S3 Bucket Cleaner
● Purpose: Delete old files from an S3 bucket to save storage costs.
import boto3
from datetime import datetime, timedelta
s3 = boto3.client('s3')
bucket_name = "my-bucket"
retention_days = 30
def delete_old_files():
cutoff_date = datetime.now() - timedelta(days=retention_days)
objects = s3.list_objects_v2(Bucket=bucket_name)
if "Contents" in objects:
for obj in objects["Contents"]:
last_modified = obj["LastModified"]
if last_modified < cutoff_date:
print(f"Deleting {obj['Key']} (Last Modified:
{last_modified})")
s3.delete_object(Bucket=bucket_name, Key=obj["Key"])
delete_old_files()
Parameters:
● bucket_name: The S3 bucket name.
● retention_days: Days after which files are considered old.
● last_modified: The timestamp of when a file was last modified in S3.
Log Analyzer with Regex
● Purpose: Extract specific information from logs using regular expressions.
import re
log_file = "application.log"
error_pattern = re.compile(r"ERROR (\d{3}): (.+)")
with open(log_file, 'r') as file:
for line in file:
match = error_pattern.search(line)
if match:
error_code, error_message = match.groups()
print(f"Error Code: {error_code}, Message:
{error_message}")
Parameters:
● error_pattern: A regex to match errors (e.g., ERROR 404: Not Found).
● match.groups(): Captures matched groups for further processing.
● Shell Scripts
Backup and Remove Old Logs
● Purpose: Compress log files older than 7 days and delete the originals.
#!/bin/bash
LOG_DIR="/var/log/myapp"
ARCHIVE_DIR="/var/log/myapp/archive"
mkdir -p $ARCHIVE_DIR
find $LOG_DIR -type f -name "*.log" -mtime +7 -exec tar -czvf
$ARCHIVE_DIR/logs_$(date +%F).tar.gz {} + -exec rm -f {} +
Parameters:
● -type f: Search for files only.
● -name "*.log": Filter files ending in .log.
● -mtime +7: Files modified more than 7 days ago.
● -exec: Execute commands (e.g., compress and delete) on matched files.
Monitor Free Memory and Trigger Alert
● Purpose: Send an alert if free memory falls below a threshold.
#!/bin/bash
THRESHOLD=500 # Free memory in MB
free_mem=$(free -m | awk '/^Mem:/ { print $4 }')
if [ "$free_mem" -lt "$THRESHOLD" ]; then
echo "Low memory: ${free_mem}MB available!" | mail -s "Memory
Alert" admin@example.com
fi
Parameters:
● free -m: Shows free memory in MB.
● awk '/^Mem:/ { print $4 }': Extracts the free memory field from the output.
Service Restart on Failure
● Purpose: Check if a service is running and restart it if it's not.
#!/bin/bash
SERVICE_NAME="nginx"
if ! systemctl is-active --quiet $SERVICE_NAME; then
echo "$SERVICE_NAME is down. Restarting..."
systemctl restart $SERVICE_NAME
fi
Parameters:
● systemctl is-active: Checks if the service is active.
● systemctl restart: Restarts the service if it's inactive.
Dynamic Cloud Instance Scaling
● Purpose: Scale cloud instances up or down based on CPU load.
#!/bin/bash
CPU_THRESHOLD=75
avg_cpu=$(top -bn1 | grep "Cpu(s)" | awk '{print $2 + $4}')
if (( $(echo "$avg_cpu > $CPU_THRESHOLD" | bc -l) )); then
echo "High CPU usage detected: ${avg_cpu}%. Scaling up instances."
# Add cloud CLI commands to scale up instances
fi
Parameters:
● top -bn1: Gets CPU usage stats in a non-interactive mode.
● awk '{print $2 + $4}': Sums the user and system CPU usage.
● Python Scripts
Automated SSL Certificate Expiry Checker
● Purpose: Check the expiry date of SSL certificates for a list of domains and send an
alert if the expiry is near.
import ssl
import socket
from datetime import datetime
def get_cert_expiry(domain, port=443):
context = ssl.create_default_context()
with socket.create_connection((domain, port)) as sock:
with context.wrap_socket(sock, server_hostname=domain) as
ssock:
cert = ssock.getpeercert()
expiry_date = datetime.strptime(cert['notAfter'], '%b %d
%H:%M:%S %Y %Z')
return expiry_date
domains = ['example.com', 'myapp.com']
threshold_days = 30
for domain in domains:
expiry_date = get_cert_expiry(domain)
days_left = (expiry_date - datetime.now()).days
if days_left < threshold_days:
print(f"WARNING: {domain}'s SSL certificate expires in
{days_left} days!")
else:
print(f"{domain}'s SSL certificate is valid for {days_left}
more days.")
Explanation:
● get_cert_expiry: Connects to the domain over SSL and retrieves the certificate's
expiry date.
● threshold_days: The number of days before expiry to trigger a warning.
● socket.create_connection: Establishes a socket connection to the domain.
AWS EC2 Instance Health Check
● Purpose: Verify the health status of EC2 instances in a specific region.
import boto3
ec2_client = boto3.client('ec2', region_name='us-east-1')
def check_instance_health():
statuses =
ec2_client.describe_instance_status(IncludeAllInstances=True)
for status in statuses['InstanceStatuses']:
instance_id = status['InstanceId']
state = status['InstanceState']['Name']
health = status['InstanceStatus']['Status']
print(f"Instance {instance_id}: State={state},
Health={health}")
check_instance_health()
Explanation:
● boto3.client('ec2'): Connects to the EC2 service in the specified region.
● describe_instance_status: Retrieves health and state information for EC2
instances.
● InstanceStatus: Indicates the operational status of the instance.
Dynamic Inventory Script for Ansible
● Purpose: Generate a dynamic inventory of EC2 instances for use with Ansible.
import boto3
import json
def generate_inventory():
ec2 = boto3.client('ec2', region_name='us-east-1')
instances = ec2.describe_instances()
inventory = {"all": {"hosts": []}}
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
if instance['State']['Name'] == 'running':
inventory['all']['hosts'].append(instance['PrivateIpAddress'])
print(json.dumps(inventory, indent=2))
generate_inventory()
Explanation:
● describe_instances: Fetches all EC2 instances in the region.
● instance['State']['Name']: Filters running instances.
● json.dumps: Converts the inventory to JSON for Ansible.
● Bash Scripts
Dynamic Disk Space Monitoring
● Purpose: Monitor disk usage and send an alert if usage exceeds a threshold.
#!/bin/bash
THRESHOLD=80 # Percentage
ALERT_EMAIL="admin@example.com"
df -h | awk '$5 ~ /[0-9]+%/ { gsub("%", "", $5); if ($5 >
'$THRESHOLD') print $1, $5 }' | while read -r fs usage; do
echo "Disk usage on $fs is $usage%" | mail -s "Disk Space Alert"
$ALERT_EMAIL
done
Explanation:
● df -h: Displays human-readable disk space usage.
● awk: Extracts and checks disk usage percentage.
● mail: Sends an email alert if usage exceeds the threshold.
Kubernetes Pod Restart on Failure
● Purpose: Restart a failed Kubernetes pod automatically.
#!/bin/bash
NAMESPACE="default"
POD_NAME="my-app-pod"
kubectl get pod $POD_NAME -n $NAMESPACE | grep -q "Error"
if [ $? -eq 0 ]; then
echo "Restarting failed pod $POD_NAME in namespace $NAMESPACE..."
kubectl delete pod $POD_NAME -n $NAMESPACE
else
echo "Pod $POD_NAME is running fine."
fi
Explanation:
● kubectl get pod: Checks the pod's status.
● grep -q "Error": Identifies if the pod is in an error state.
● kubectl delete pod: Deletes the failed pod, prompting Kubernetes to restart it.
MySQL Database Backup
● Purpose: Create automated backups of a MySQL database.
#!/bin/bash
DB_USER="root"
DB_PASS="password"
DB_NAME="mydatabase"
BACKUP_DIR="/backups/mysql"
DATE=$(date +%F)
mkdir -p $BACKUP_DIR
mysqldump -u $DB_USER -p$DB_PASS $DB_NAME >
$BACKUP_DIR/${DB_NAME}_${DATE}.sql
if [ $? -eq 0 ]; then
echo "Backup successful: ${BACKUP_DIR}/${DB_NAME}_${DATE}.sql"
else
echo "Backup failed."
fi
Explanation:
● mysqldump: Dumps the database to an SQL file.
● mkdir -p: Ensures the backup directory exists.
● $?: Checks if the previous command was successful.
AWS CLI Instance Scaling
● Purpose: Scale EC2 instances up or down based on CPU utilization.
#!/bin/bash
INSTANCE_ID="i-1234567890abcdef"
THRESHOLD=75
cpu_utilization=$(aws cloudwatch get-metric-statistics \
--metric-name CPUUtilization \
--start-time $(date -u -d "-5 minutes" +%Y-%m-%dT%H:%M:%SZ) \
--end-time $(date -u +%Y-%m-%dT%H:%M:%SZ) \
--period 300 \
--namespace AWS/EC2 \
--statistics Average \
--dimensions Name=InstanceId,Value=$INSTANCE_ID \
--query "Datapoints[0].Average" --output text)
if (( $(echo "$cpu_utilization > $THRESHOLD" | bc -l) )); then
echo "High CPU detected ($cpu_utilization%). Scaling up."
aws ec2 modify-instance-attribute --instance-id $INSTANCE_ID
--instance-type t2.large
fi
Explanation:
● aws cloudwatch get-metric-statistics: Retrieves the average CPU usage.
● bc -l: Performs a floating-point comparison.
● modify-instance-attribute: Changes the instance type to scale up.
ElasticSearch Index Cleaner
● Purpose: Remove ElasticSearch indices older than 30 days.
#!/bin/bash
ELASTIC_URL="http://localhost:9200"
RETENTION_DAYS=30
curl -s "$ELASTIC_URL/_cat/indices?h=index,creation.date.string" |
while read -r index created_date; do
index_date=$(date -d "$created_date" +%s)
cutoff_date=$(date -d "-$RETENTION_DAYS days" +%s)
if [ "$index_date" -lt "$cutoff_date" ]; then
echo "Deleting index: $index"
curl -XDELETE "$ELASTIC_URL/$index"
fi
done
Explanation:
● curl: Interacts with ElasticSearch REST API.
● date -d: Converts the creation date to a timestamp.
● cutoff_date: Determines the oldest allowed index date.
Aman Kumar Choudhary