Self-Hosted Push Notifications Specification
Part 5: Advanced Features & Production Deployment
Version: 1.0
Last Updated: October 2025
Prerequisites: Part 4: Service Worker & PWA Configuration
Author: Bunty9
License: MIT (Free to use and adapt)
Table of Contents
- Advanced Targeting Strategies
- Notification Scheduling
- Rate Limiting & Throttling
- Batch Processing
- Priority Queuing
- Retry Strategies
- Production Deployment
- Docker Configuration
- Kubernetes Deployment
- Load Balancing
- Horizontal Scaling
- Database Optimization
Advanced Targeting Strategies
Multi-User Targeting
Use Case: Send notification to booking owner + all assigned users.
// services/push_service.go
// SendToMultipleUsers sends notifications to multiple users in parallel
func (ps *PushService) SendToMultipleUsers(userIDs []uuid.UUID, payload types.PushPayload) error {
if len(userIDs) == 0 {
return nil
}
log.Printf("[PushService] Sending to %d users", len(userIDs))
// Send to all users in parallel
for _, userID := range userIDs {
go func(uid uuid.UUID) {
if err := ps.SendToUser(uid, payload); err != nil {
log.Printf("[PushService] Failed to send to user %s: %v", uid, err)
}
}(userID)
}
return nil
}
// Example: Notify booking owner + assigned users
func (bs *BookingService) NotifyAllBookingParticipants(booking *models.Booking, payload types.PushPayload) error {
// Collect all user IDs
userIDs := []uuid.UUID{booking.UserID}
// Add assigned users
var seatAssignments []models.SeatAssignment
bs.db.Where("booking_id = ?", booking.ID).Find(&seatAssignments)
for _, sa := range seatAssignments {
if sa.AssignedUserID != nil && *sa.AssignedUserID != booking.UserID {
userIDs = append(userIDs, *sa.AssignedUserID)
}
}
// Remove duplicates
userIDs = uniqueUUIDs(userIDs)
// Send to all
return bs.pushService.SendToMultipleUsers(userIDs, payload)
}
Role-Based Targeting
Use Case: Send to all admins with specific role (e.g., caretakers).
// SendToAdminRole sends to all admins with a specific role
func (ps *PushService) SendToAdminRole(role string, payload types.PushPayload) error {
var adminIDs []uuid.UUID
// Query admins by role
err := ps.db.Table("admins").
Where("role = ? AND is_active = ?", role, true).
Pluck("id", &adminIDs).Error
if err != nil {
return fmt.Errorf("failed to fetch admins: %w", err)
}
if len(adminIDs) == 0 {
log.Printf("[PushService] No admins found with role: %s", role)
return nil
}
log.Printf("[PushService] Sending to %d %s admins", len(adminIDs), role)
// Send to all admins
for _, adminID := range adminIDs {
go ps.SendToAdmin(adminID, payload)
}
return nil
}
Caretaker Targeting by Space
Use Case: Notify caretakers assigned to a specific space.
// SendToCaretakersOfSpace sends to caretakers assigned to a space
func (ps *PushService) SendToCaretakersOfSpace(spaceID uuid.UUID, payload types.PushPayload) error {
var space models.Space
err := ps.db.First(&space, spaceID).Error
if err != nil {
return fmt.Errorf("space not found: %w", err)
}
if len(space.CaretakerIDs) == 0 {
log.Printf("[PushService] No caretakers assigned to space: %s", space.Name)
return nil
}
log.Printf("[PushService] Sending to %d caretakers for space: %s",
len(space.CaretakerIDs), space.Name)
// Send to all caretakers
for _, caretakerIDStr := range space.CaretakerIDs {
caretakerID, err := uuid.Parse(caretakerIDStr)
if err != nil {
log.Printf("[PushService] Invalid caretaker ID: %s", caretakerIDStr)
continue
}
go ps.SendToAdmin(caretakerID, payload)
}
return nil
}
Conditional Targeting
Use Case: Send only to users matching specific criteria.
// SendToUsersWithCondition sends to users matching a condition
func (ps *PushService) SendToUsersWithCondition(
condition string,
args []interface{},
payload types.PushPayload,
) error {
var subscriptions []models.PushSubscription
// Query subscriptions with custom condition
query := ps.db.Where("is_active = ? AND user_id IS NOT NULL", true)
if condition != "" {
query = query.Where(condition, args...)
}
err := query.Find(&subscriptions).Error
if err != nil {
return fmt.Errorf("failed to query subscriptions: %w", err)
}
log.Printf("[PushService] Sending to %d users matching condition", len(subscriptions))
// Send to all matching subscriptions
for _, subscription := range subscriptions {
go ps.sendToSubscription(subscription, payload)
}
return nil
}
// Example: Notify users with bookings starting in next 24 hours
func (bs *BookingService) NotifyUpcomingBookings() error {
payload := types.PushPayload{
Title: "Booking Starting Soon",
Body: "Your booking starts within 24 hours",
Type: "booking_reminder_24h",
URL: "/bookings",
}
condition := `user_id IN (
SELECT user_id FROM bookings
WHERE start_time BETWEEN NOW() AND NOW() + INTERVAL '24 hours'
AND status = 'confirmed'
)`
return bs.pushService.SendToUsersWithCondition(condition, []interface{}{}, payload)
}
Notification Scheduling
Background Worker Implementation
File: workers/notification_scheduler.go
package workers
import (
"log"
"time"
"gorm.io/gorm"
"your-app/models"
"your-app/services"
"your-app/types"
)
// NotificationScheduler handles scheduled notifications
type NotificationScheduler struct {
db *gorm.DB
pushService *services.PushService
ticker *time.Ticker
done chan bool
}
// NewNotificationScheduler creates a new scheduler
func NewNotificationScheduler(db *gorm.DB, pushService *services.PushService) *NotificationScheduler {
return &NotificationScheduler{
db: db,
pushService: pushService,
done: make(chan bool),
}
}
// Start begins the scheduler
func (ns *NotificationScheduler) Start() {
ns.ticker = time.NewTicker(1 * time.Minute)
go func() {
log.Println("[NotificationScheduler] Started")
for {
select {
case <-ns.ticker.C:
ns.processScheduledNotifications()
case <-ns.done:
log.Println("[NotificationScheduler] Stopped")
return
}
}
}()
}
// Stop halts the scheduler
func (ns *NotificationScheduler) Stop() {
ns.ticker.Stop()
ns.done <- true
}
// processScheduledNotifications sends due notifications
func (ns *NotificationScheduler) processScheduledNotifications() {
var scheduledNotifications []models.ScheduledNotification
// Find notifications due now
err := ns.db.Where("scheduled_at <= ? AND status = ?", time.Now(), "pending").
Find(&scheduledNotifications).Error
if err != nil {
log.Printf("[NotificationScheduler] Error fetching scheduled notifications: %v", err)
return
}
if len(scheduledNotifications) == 0 {
return
}
log.Printf("[NotificationScheduler] Processing %d scheduled notifications", len(scheduledNotifications))
for _, sn := range scheduledNotifications {
go ns.sendScheduledNotification(sn)
}
}
// sendScheduledNotification sends a single scheduled notification
func (ns *NotificationScheduler) sendScheduledNotification(sn models.ScheduledNotification) {
payload := types.PushPayload{
Title: sn.Title,
Body: sn.Body,
Icon: sn.Icon,
URL: sn.URL,
Data: sn.Data,
}
var err error
// Send based on recipient type
switch sn.RecipientType {
case "user":
err = ns.pushService.SendToUser(sn.RecipientID, payload)
case "admin":
err = ns.pushService.SendToAdmin(sn.RecipientID, payload)
case "broadcast":
err = ns.pushService.SendBroadcastToAllUsers(payload)
default:
log.Printf("[NotificationScheduler] Unknown recipient type: %s", sn.RecipientType)
return
}
// Update status
if err != nil {
ns.db.Model(&sn).Updates(map[string]interface{}{
"status": "failed",
"error": err.Error(),
"processed_at": time.Now(),
})
} else {
ns.db.Model(&sn).Updates(map[string]interface{}{
"status": "sent",
"processed_at": time.Now(),
})
}
}
Scheduled Notification Model
// models/scheduled_notification.go
type ScheduledNotification struct {
ID uuid.UUID `gorm:"type:uuid;primary_key"`
RecipientType string `gorm:"type:varchar(20)"` // user, admin, broadcast
RecipientID uuid.UUID `gorm:"type:uuid"`
Title string `gorm:"type:varchar(255)"`
Body string `gorm:"type:text"`
Icon string `gorm:"type:varchar(255)"`
URL string `gorm:"type:varchar(255)"`
Data datatypes.JSON `gorm:"type:jsonb"`
ScheduledAt time.Time `gorm:"not null;index"`
Status string `gorm:"type:varchar(20);default:'pending'"` // pending, sent, failed
Error string `gorm:"type:text"`
ProcessedAt *time.Time
CreatedAt time.Time
}
Scheduling API
// controllers/notification_controller.go
// ScheduleNotification handles POST /api/notifications/schedule
func (nc *NotificationController) ScheduleNotification(c *gin.Context) {
var req types.ScheduleNotificationRequest
if err := c.ShouldBindJSON(&req); err != nil {
utils.ErrorResponse(c, http.StatusBadRequest, "Invalid request")
return
}
scheduledNotification := models.ScheduledNotification{
RecipientType: req.RecipientType,
RecipientID: req.RecipientID,
Title: req.Title,
Body: req.Body,
Icon: req.Icon,
URL: req.URL,
Data: req.Data,
ScheduledAt: req.ScheduledAt,
Status: "pending",
}
if err := nc.db.Create(&scheduledNotification).Error; err != nil {
utils.ErrorResponse(c, http.StatusInternalServerError, "Failed to schedule notification")
return
}
utils.SuccessResponse(c, "Notification scheduled successfully", scheduledNotification)
}
Rate Limiting & Throttling
Per-User Rate Limiting
// middleware/rate_limiter.go
import (
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
"golang.org/x/time/rate"
)
type RateLimiter struct {
limiters map[string]*rate.Limiter
mu sync.RWMutex
limit rate.Limit
burst int
}
func NewRateLimiter(requestsPerMinute int, burst int) *RateLimiter {
return &RateLimiter{
limiters: make(map[string]*rate.Limiter),
limit: rate.Limit(requestsPerMinute) / 60, // Per second
burst: burst,
}
}
func (rl *RateLimiter) getLimiter(key string) *rate.Limiter {
rl.mu.RLock()
limiter, exists := rl.limiters[key]
rl.mu.RUnlock()
if !exists {
rl.mu.Lock()
limiter = rate.NewLimiter(rl.limit, rl.burst)
rl.limiters[key] = limiter
rl.mu.Unlock()
}
return limiter
}
func (rl *RateLimiter) Middleware() gin.HandlerFunc {
return func(c *gin.Context) {
userID, exists := c.Get("userID")
if !exists {
c.JSON(http.StatusUnauthorized, gin.H{"error": "Unauthorized"})
c.Abort()
return
}
key := userID.(string)
limiter := rl.getLimiter(key)
if !limiter.Allow() {
c.JSON(http.StatusTooManyRequests, gin.H{
"error": "Rate limit exceeded. Try again later.",
})
c.Abort()
return
}
c.Next()
}
}
// Usage in routes
func SetupPushRoutes(router *gin.Engine, pushController *controllers.PushController) {
rateLimiter := NewRateLimiter(10, 5) // 10 requests/min, burst 5
push := router.Group("/api/push")
push.Use(middleware.AuthMiddleware())
push.Use(rateLimiter.Middleware())
{
push.POST("/subscribe", pushController.Subscribe)
push.POST("/unsubscribe", pushController.Unsubscribe)
}
}
Global Rate Limiting
// Limit total push notifications sent per second
type GlobalRateLimiter struct {
limiter *rate.Limiter
}
func NewGlobalRateLimiter(notificationsPerSecond int) *GlobalRateLimiter {
return &GlobalRateLimiter{
limiter: rate.NewLimiter(rate.Limit(notificationsPerSecond), notificationsPerSecond*2),
}
}
func (grl *GlobalRateLimiter) Wait(ctx context.Context) error {
return grl.limiter.Wait(ctx)
}
// In PushService
func (ps *PushService) sendToSubscription(subscription models.PushSubscription, payload types.PushPayload) error {
// Wait for rate limit token
if ps.globalRateLimiter != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := ps.globalRateLimiter.Wait(ctx); err != nil {
return fmt.Errorf("rate limit wait failed: %w", err)
}
}
// Send notification...
}
Batch Processing
Batch Send Implementation
// SendBatch sends notifications in controlled batches
func (ps *PushService) SendBatch(subscriptions []models.PushSubscription, payload types.PushPayload) error {
batchSize := 100
delayBetweenBatches := 100 * time.Millisecond
log.Printf("[PushService] Sending to %d subscriptions in batches of %d", len(subscriptions), batchSize)
for i := 0; i < len(subscriptions); i += batchSize {
end := i + batchSize
if end > len(subscriptions) {
end = len(subscriptions)
}
batch := subscriptions[i:end]
// Send batch in parallel
var wg sync.WaitGroup
for _, subscription := range batch {
wg.Add(1)
go func(sub models.PushSubscription) {
defer wg.Done()
ps.sendToSubscription(sub, payload)
}(subscription)
}
wg.Wait()
// Delay between batches to avoid overwhelming the system
if end < len(subscriptions) {
time.Sleep(delayBetweenBatches)
}
log.Printf("[PushService] Processed batch %d-%d of %d", i+1, end, len(subscriptions))
}
return nil
}
Priority Queuing
Priority Queue Implementation
// types/priority_queue.go
type PriorityNotification struct {
Priority int // 0 = highest, 10 = lowest
Subscription models.PushSubscription
Payload types.PushPayload
}
type PriorityQueue []*PriorityNotification
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].Priority < pq[j].Priority
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *PriorityQueue) Push(x interface{}) {
*pq = append(*pq, x.(*PriorityNotification))
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
// Worker that processes priority queue
func (ps *PushService) StartPriorityWorker() {
pq := make(PriorityQueue, 0)
heap.Init(&pq)
for {
if pq.Len() > 0 {
notif := heap.Pop(&pq).(*PriorityNotification)
ps.sendToSubscription(notif.Subscription, notif.Payload)
} else {
time.Sleep(100 * time.Millisecond)
}
}
}
Retry Strategies
Exponential Backoff
// sendWithRetry sends notification with exponential backoff
func (ps *PushService) sendWithRetry(
subscription models.PushSubscription,
payload types.PushPayload,
maxRetries int,
) error {
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
err := ps.sendToSubscription(subscription, payload)
if err == nil {
return nil // Success
}
lastErr = err
// Don't retry on certain errors
if isNonRetryableError(err) {
return err
}
if attempt < maxRetries {
// Exponential backoff: 1s, 2s, 4s, 8s, ...
backoff := time.Duration(1<<uint(attempt)) * time.Second
log.Printf("[PushService] Retry %d/%d after %v for subscription %s",
attempt+1, maxRetries, backoff, subscription.ID)
time.Sleep(backoff)
}
}
return fmt.Errorf("failed after %d retries: %w", maxRetries, lastErr)
}
func isNonRetryableError(err error) bool {
// 410 Gone, 403 Forbidden = subscription expired/invalid
return strings.Contains(err.Error(), "410") ||
strings.Contains(err.Error(), "403")
}
Production Deployment
Environment Configuration
Production .env.prod
:
# Server
PORT=8081
ENV=production
# Database
DB_HOST=prod-postgres.internal
DB_PORT=5432
DB_USER=push_service
DB_PASSWORD=${DATABASE_PASSWORD} # From secrets
DB_NAME=ohhspaces_prod
DB_SSLMODE=require
DB_MAX_OPEN_CONNS=100
DB_MAX_IDLE_CONNS=10
# VAPID (from secrets)
VAPID_PUBLIC_KEY=${VAPID_PUBLIC_KEY}
VAPID_PRIVATE_KEY=${VAPID_PRIVATE_KEY}
VAPID_SUBJECT=mailto:admin@yourdomain.com
# Rate Limiting
RATE_LIMIT_ENABLED=true
RATE_LIMIT_REQUESTS=100
RATE_LIMIT_WINDOW=1m
# Logging
LOG_LEVEL=info
LOG_FORMAT=json
# Workers
ENABLE_REMINDER_WORKER=true
REMINDER_WORKER_INTERVAL=5m
ENABLE_SCHEDULER_WORKER=true
SCHEDULER_WORKER_INTERVAL=1m
# Performance
MAX_GOROUTINES=1000
BATCH_SIZE=100
BATCH_DELAY_MS=100
Docker Configuration
Dockerfile
# Build stage
FROM golang:1.21-alpine AS builder
WORKDIR /app
# Copy go mod files
COPY go.mod go.sum ./
RUN go mod download
# Copy source code
COPY . .
# Build
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o user-service ./user-service
# Production stage
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
# Copy binary from builder
COPY --from=builder /app/user-service .
# Copy migrations (optional)
COPY --from=builder /app/migrations ./migrations
EXPOSE 8081
CMD ["./user-service"]
docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:14-alpine
environment:
POSTGRES_DB: ohhspaces_prod
POSTGRES_USER: push_service
POSTGRES_PASSWORD: ${DATABASE_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U push_service"]
interval: 10s
timeout: 5s
retries: 5
user-service:
build:
context: .
dockerfile: Dockerfile
environment:
DB_HOST: postgres
DB_PORT: 5432
DB_USER: push_service
DB_PASSWORD: ${DATABASE_PASSWORD}
DB_NAME: ohhspaces_prod
VAPID_PUBLIC_KEY: ${VAPID_PUBLIC_KEY}
VAPID_PRIVATE_KEY: ${VAPID_PRIVATE_KEY}
VAPID_SUBJECT: mailto:admin@yourdomain.com
ports:
- "8081:8081"
depends_on:
postgres:
condition: service_healthy
restart: unless-stopped
volumes:
postgres_data:
Kubernetes Deployment
Deployment YAML
File: k8s/user-service-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
namespace: production
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: gcr.io/your-project/user-service:latest
ports:
- containerPort: 8081
env:
- name: PORT
value: "8081"
- name: ENV
value: "production"
- name: DB_HOST
value: "postgres-service"
- name: DB_PORT
value: "5432"
- name: DB_USER
valueFrom:
secretKeyRef:
name: db-credentials
key: username
- name: DB_PASSWORD
valueFrom:
secretKeyRef:
name: db-credentials
key: password
- name: DB_NAME
value: "ohhspaces_prod"
- name: VAPID_PUBLIC_KEY
valueFrom:
secretKeyRef:
name: vapid-keys
key: public-key
- name: VAPID_PRIVATE_KEY
valueFrom:
secretKeyRef:
name: vapid-keys
key: private-key
- name: VAPID_SUBJECT
value: "mailto:admin@yourdomain.com"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8081
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8081
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: user-service
namespace: production
spec:
selector:
app: user-service
ports:
- protocol: TCP
port: 8081
targetPort: 8081
type: ClusterIP
Secrets Configuration
# Create database credentials secret
kubectl create secret generic db-credentials \
--from-literal=username=push_service \
--from-literal=password='your-secure-password' \
--namespace=production
# Create VAPID keys secret
kubectl create secret generic vapid-keys \
--from-literal=public-key='BOzzMgOMwVyuziwHiI8...' \
--from-literal=private-key='BNw7fc1ayj3-Az-OJ8...' \
--namespace=production
Load Balancing
Nginx Configuration
upstream user_service {
least_conn;
server user-service-1:8081 max_fails=3 fail_timeout=30s;
server user-service-2:8081 max_fails=3 fail_timeout=30s;
server user-service-3:8081 max_fails=3 fail_timeout=30s;
}
server {
listen 443 ssl http2;
server_name api.yourdomain.com;
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
location /api/push/ {
proxy_pass http://user_service;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_cache_bypass $http_upgrade;
# Timeouts
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
}
Horizontal Scaling
Scaling Considerations
- Stateless Services: Ensure push service is stateless
- Database Connection Pooling: Use connection pool to avoid exhaustion
- Worker Coordination: Only one worker instance should run per task type
Worker Leader Election
// Use distributed locking for worker coordination
func (rw *ReminderWorker) Start() {
// Acquire distributed lock (e.g., Redis, etcd)
lockKey := "worker:booking_reminder:leader"
for {
acquired := acquireLock(lockKey, 60*time.Second)
if acquired {
log.Println("[ReminderWorker] Acquired leadership")
rw.run()
} else {
log.Println("[ReminderWorker] Waiting for leadership")
time.Sleep(30 * time.Second)
}
}
}
Database Optimization
Connection Pooling
func setupDatabase(dsn string) (*gorm.DB, error) {
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{
PrepareStmt: true, // Cache prepared statements
})
if err != nil {
return nil, err
}
sqlDB, err := db.DB()
if err != nil {
return nil, err
}
// Connection pool settings
sqlDB.SetMaxOpenConns(100) // Max open connections
sqlDB.SetMaxIdleConns(10) // Max idle connections
sqlDB.SetConnMaxLifetime(time.Hour) // Max connection lifetime
return db, nil
}
Query Optimization
-- Add composite index for common queries
CREATE INDEX idx_push_subscriptions_user_active_last_used
ON push_subscriptions(user_id, is_active, last_used_at DESC)
WHERE is_active = TRUE;
-- Index for notification logs
CREATE INDEX idx_push_notification_logs_user_sent_status
ON push_notification_logs(user_id, sent_at DESC, status);
-- Partial index for active subscriptions
CREATE INDEX idx_active_subscriptions
ON push_subscriptions(user_id, endpoint)
WHERE is_active = TRUE;
Summary
You now have:
✅ Advanced targeting strategies (multi-user, role-based, conditional)
✅ Notification scheduling with background workers
✅ Rate limiting and throttling mechanisms
✅ Batch processing for large-scale sends
✅ Priority queuing for critical notifications
✅ Retry strategies with exponential backoff
✅ Docker containerization
✅ Kubernetes production deployment
✅ Load balancing configuration
✅ Horizontal scaling patterns
✅ Database optimization techniques
Next Steps
➡️ Part 6: Monitoring, Debugging & Troubleshooting
Part 6 will cover:
- Metrics and monitoring
- Logging best practices
- Debugging techniques
- Performance profiling
- Error tracking
- Alerting strategies
- Health checks
- Incident response
Top comments (0)