Cluster-mode testing validates stateful applications in Kubernetes: databases with replication, message queues with quorum, caches with failover. These tests verify correctness under failures, leader election, data consistency, and network partitions.
Test Categories
1. Kafka Cluster Tests (6 tests)
Setup:
apiVersion: apps/v1kind: StatefulSetmetadata: name: kafkaspec: serviceName: kafka replicas: 3 template: spec: containers: - name: kafka image: cleanstart/kafka:3.3.1-prod ports: - containerPort: 9092 volumeMounts: - name: data mountPath: /var/lib/kafka volumeClaimTemplates: - metadata: name: data spec: accessModes: ["ReadWriteOnce"] resources: requests: storage: 10GiTests:
# test_kafka_cluster.pyimport subprocessimport jsonfrom kafka import KafkaProducer, KafkaConsumerfrom kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType def test_kafka_broker_quorum(): """Verify all brokers form quorum""" # Check broker metadata result = subprocess.run( ["kubectl", "exec", "kafka-0", "-n", "messaging", "--", "kafka-broker-api-versions.sh", "--bootstrap-server", "kafka:9092"], capture_output=True, text=True ) assert result.returncode == 0 assert "ApiVersion" in result.stdout def test_kafka_replication_factor(): """Verify topic replication factor = broker count""" topic = "test-topic" num_brokers = 3 replication_factor = 3 # Create topic subprocess.run( ["kubectl", "exec", "kafka-0", "-n", "messaging", "--", "kafka-topics.sh", "--bootstrap-server", "kafka:9092", "--create", "--topic", topic, "--partitions", "3", "--replication-factor", str(replication_factor)], check=True ) # Verify replication result = subprocess.run( ["kubectl", "exec", "kafka-0", "-n", "messaging", "--", "kafka-topics.sh", "--bootstrap-server", "kafka:9092", "--describe", "--topic", topic], capture_output=True, text=True ) # Parse output: "Isr: 0,1,2" means all brokers in sync assert "Isr: 0,1,2" in result.stdout def test_kafka_producer_consumer(): """Verify producer/consumer through Kafka""" messages = ["msg1", "msg2", "msg3"] # Produce messages producer = KafkaProducer( bootstrap_servers=["kafka:9092"], value_serializer=lambda v: v.encode('utf-8') ) for msg in messages: producer.send("test-topic", msg) producer.flush() # Consume messages consumer = KafkaConsumer( "test-topic", bootstrap_servers=["kafka:9092"], auto_offset_reset='earliest', consumer_timeout_ms=5000, value_deserializer=lambda m: m.decode('utf-8') ) received = [msg.value for msg in consumer] consumer.close() assert received == messages def test_kafka_message_ordering(): """Verify message ordering in partition""" messages = list(range(100)) # Produce to same partition (key ensures same partition) producer = KafkaProducer( bootstrap_servers=["kafka:9092"], value_serializer=lambda v: str(v).encode('utf-8') ) for msg in messages: producer.send("ordered-topic", msg, key=b"samekey") producer.flush() # Consume all messages consumer = KafkaConsumer( "ordered-topic", bootstrap_servers=["kafka:9092"], auto_offset_reset='earliest', consumer_timeout_ms=5000, value_deserializer=lambda m: int(m.decode('utf-8')) ) received = [msg.value for msg in consumer] consumer.close() assert received == messages, "Messages not in order" def test_kafka_broker_failure_recovery(): """Verify cluster recovers from broker failure""" # Kill broker-2 subprocess.run( ["kubectl", "delete", "pod", "kafka-2", "-n", "messaging"], check=True ) # Wait for pod to restart subprocess.run( ["kubectl", "wait", "--for=condition=Ready", "pod/kafka-2", "-n", "messaging", "--timeout=60s"], check=True ) # Verify cluster still works producer = KafkaProducer(bootstrap_servers=["kafka:9092"]) future = producer.send("test-topic", b"after-failure") future.get(timeout=10) # Should succeed even after broker failure # Verify ISR recovered (all brokers back in sync) result = subprocess.run( ["kubectl", "exec", "kafka-0", "-n", "messaging", "--", "kafka-topics.sh", "--bootstrap-server", "kafka:9092", "--describe", "--topic", "test-topic"], capture_output=True, text=True ) assert "Isr: 0,1,2" in result.stdout def test_kafka_follower_can_serve_reads(): """Verify reads can come from in-sync replicas""" # Produce messages producer = KafkaProducer(bootstrap_servers=["kafka:9092"]) producer.send("readtest-topic", b"test") producer.flush() # Consume from broker-1 (follower) consumer = KafkaConsumer( "readtest-topic", bootstrap_servers=["kafka-1.kafka:9092"], # Connect to specific broker auto_offset_reset='earliest', consumer_timeout_ms=5000 ) messages = list(consumer) consumer.close() assert len(messages) > 0, "Could not read from follower"2. PostgreSQL Cluster Tests (6 tests)
Setup:
apiVersion: apps/v1kind: StatefulSetmetadata: name: postgresspec: serviceName: postgres replicas: 2 template: spec: containers: - name: postgres image: cleanstart/postgresql:15-prod env: - name: POSTGRES_PASSWORD valueFrom: secretKeyRef: name: postgres-secret key: password volumeMounts: - name: data mountPath: /var/lib/postgresqlTests:
# test_postgres_cluster.pyimport psycopg2import psycopg2.extensionsimport time def test_postgres_primary_connection(): """Verify connection to primary""" conn = psycopg2.connect( host="postgres-0.postgres", database="postgres", user="postgres", password="password" ) cursor = conn.cursor() cursor.execute("SELECT version()") result = cursor.fetchone() assert result is not None cursor.close() conn.close() def test_postgres_replication_lag(): """Verify WAL replication lag is minimal""" conn = psycopg2.connect( host="postgres-0.postgres", database="postgres", user="postgres", password="password" ) cursor = conn.cursor() cursor.execute(""" SELECT slot_name, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = 'postgres_1' """) result = cursor.fetchone() cursor.close() conn.close() assert result is not None, "Replication slot not found" def test_postgres_failover(): """Verify standby can become primary""" # Delete primary pod subprocess.run(["kubectl", "delete", "pod", "postgres-0", "-n", "database"], check=True) # Wait for pod to restart subprocess.run( ["kubectl", "wait", "--for=condition=Ready", "pod/postgres-0", "-n", "database", "--timeout=60s"], check=True ) # Verify new primary accepts connections conn = psycopg2.connect( host="postgres-0.postgres", database="postgres", user="postgres", password="password" ) cursor = conn.cursor() cursor.execute("SELECT pg_is_in_recovery()") is_standby = cursor.fetchone()[0] cursor.close() conn.close() assert not is_standby, "Primary still in recovery mode" def test_postgres_wal_archiving(): """Verify WAL archiving works for backups""" conn = psycopg2.connect( host="postgres-0.postgres", database="postgres", user="postgres", password="password" ) cursor = conn.cursor() cursor.execute("SHOW archive_mode") result = cursor.fetchone() assert result[0] == "on", "WAL archiving not enabled" cursor.close() conn.close() def test_postgres_transaction_isolation(): """Verify ACID properties""" # Create test table conn1 = psycopg2.connect( host="postgres-0.postgres", database="postgres", user="postgres", password="password" ) cursor1 = conn1.cursor() cursor1.execute("CREATE TABLE IF NOT EXISTS test (id INT, value INT)") cursor1.execute("INSERT INTO test VALUES (1, 100)") conn1.commit() # Start transaction in conn1 cursor1.execute("BEGIN ISOLATION LEVEL SERIALIZABLE") cursor1.execute("UPDATE test SET value = 200 WHERE id = 1") # Read in conn2 (should not see uncommitted change) conn2 = psycopg2.connect( host="postgres-0.postgres", database="postgres", user="postgres", password="password" ) cursor2 = conn2.cursor() cursor2.execute("SELECT value FROM test WHERE id = 1") value_before = cursor2.fetchone()[0] # Commit in conn1 conn1.commit() # Read again in conn2 cursor2.execute("SELECT value FROM test WHERE id = 1") value_after = cursor2.fetchone()[0] cursor1.close() cursor2.close() conn1.close() conn2.close() assert value_before == 100, "Uncommitted change visible" assert value_after == 200, "Committed change not visible"3. Redis Sentinel Tests (6 tests)
Setup:
apiVersion: apps/v1kind: StatefulSetmetadata: name: redisspec: serviceName: redis replicas: 3 template: spec: containers: - name: redis image: cleanstart/redis:7-prod ports: - containerPort: 6379 - name: sentinel image: cleanstart/redis:7-prod command: ["redis-sentinel", "/etc/redis/sentinel.conf"] ports: - containerPort: 26379Tests:
# test_redis_cluster.pyimport redisimport time def test_redis_master_connection(): """Verify connection to Redis master""" r = redis.Redis(host="redis-master", port=6379, decode_responses=True) result = r.ping() assert result is True def test_redis_sentinel_discovery(): """Verify Sentinel can discover master""" from redis.sentinel import Sentinel sentinels = [("redis-sentinel-0", 26379), ("redis-sentinel-1", 26379), ("redis-sentinel-2", 26379)] sentinel = Sentinel(sentinels) master = sentinel.master_for("mymaster", socket_timeout=0.1) result = master.ping() assert result is True def test_redis_replication(): """Verify master-replica replication""" # Write to master master = redis.Redis(host="redis-master", port=6379, decode_responses=True) master.set("test-key", "test-value") # Read from replica with slight delay for replication time.sleep(0.5) replica = redis.Redis(host="redis-replica-0", port=6379, decode_responses=True) value = replica.get("test-key") assert value == "test-value", "Data not replicated" def test_redis_failover(): """Verify Sentinel triggers failover""" # Kill master subprocess.run(["kubectl", "delete", "pod", "redis-master", "-n", "cache"], check=True) # Wait for Sentinel to detect failure and failover (usually < 30s) time.sleep(10) # Verify new master is elected from redis.sentinel import Sentinel sentinels = [("redis-sentinel-0", 26379)] sentinel = Sentinel(sentinels) master = sentinel.master_for("mymaster", socket_timeout=0.1) result = master.ping() assert result is True def test_redis_persistence(): """Verify data persists across restart""" master = redis.Redis(host="redis-master", port=6379, decode_responses=True) master.set("persistent-key", "persistent-value") # Restart master pod subprocess.run(["kubectl", "delete", "pod", "redis-master", "-n", "cache"], check=True) subprocess.run( ["kubectl", "wait", "--for=condition=Ready", "pod/redis-master", "-n", "cache", "--timeout=60s"], check=True ) # Verify data still exists master = redis.Redis(host="redis-master", port=6379, decode_responses=True) value = master.get("persistent-key") assert value == "persistent-value" def test_redis_set_get_performance(): """Verify SET/GET performance""" r = redis.Redis(host="redis-master", port=6379) import time start = time.time() for i in range(10000): r.set(f"key-{i}", f"value-{i}") r.get(f"key-{i}") elapsed = time.time() - start ops_per_sec = 20000 / elapsed # 10k sets + 10k gets assert ops_per_sec > 10000, f"Performance too slow: {ops_per_sec:.0f} ops/sec"Running Cluster-Mode Tests
# Run all cluster-mode tests./test-suite run \ --category cluster-mode \ --cluster gke-staging # Run specific cluster test./test-suite run \ --test kafka_broker_failure_recovery \ --cluster gke-staging # Generate cluster test report./test-suite run \ --category cluster-mode \ --cluster gke-staging \ --format html \ --output cluster-report.htmlFailure Injection Testing
Network Partition Test
def test_kafka_network_partition(): """Simulate network partition and verify recovery""" # Block all traffic to kafka-2 subprocess.run([ "kubectl", "exec", "kafka-0", "-n", "messaging", "--", "iptables", "-A", "OUTPUT", "-d", "kafka-2", "-j", "DROP" ], check=True) # Verify cluster still works (2/3 brokers) producer = KafkaProducer(bootstrap_servers=["kafka:9092"]) future = producer.send("test-topic", b"during-partition") future.get(timeout=10) # Restore connectivity subprocess.run([ "kubectl", "exec", "kafka-0", "-n", "messaging", "--", "iptables", "-D", "OUTPUT", "-d", "kafka-2", "-j", "DROP" ], check=True) # Verify kafka-2 recovers result = subprocess.run( ["kubectl", "exec", "kafka-0", "-n", "messaging", "--", "kafka-topics.sh", "--bootstrap-server", "kafka:9092", "--describe", "--topic", "test-topic"], capture_output=True, text=True ) assert "Isr: 0,1,2" in result.stdout, "ISR not recovered"Resource Exhaustion Test
def test_postgres_under_memory_pressure(): """Verify PostgreSQL handles memory pressure gracefully""" conn = psycopg2.connect( host="postgres-0.postgres", database="postgres", user="postgres", password="password" ) cursor = conn.cursor() # Create large table cursor.execute(""" CREATE TABLE large_data AS SELECT generate_series(1, 1000000) as id, md5(random()::text) as data """) conn.commit() # Verify table created despite memory pressure cursor.execute("SELECT COUNT(*) FROM large_data") count = cursor.fetchone()[0] cursor.close() conn.close() assert count == 1000000Cluster Test Report Example
A successful cluster test run on gke-staging generates the following results:
Kafka Cluster Tests (6/6 Passed): Broker quorum is verified with all 3 brokers active, replication factor is set to 3 with all partitions replicated, producer/consumer successfully process 10,000 messages in order, broker failure recovery works with kafka-2 restart and ISR recovery, follower reads are operational on Replica-2, and the cluster maintains a 2/3 quorum even during network partitions. Performance metrics show throughput of 1240 messages/sec and message latency (p95) of 12ms.
PostgreSQL Cluster Tests (6/6 Passed): The primary connection (postgres-0) accepts connections, replication lag is 0 bytes (fully synchronized), failover succeeds with postgres-1 being promoted to primary after postgres-0 failure, WAL archiving is enabled and working, transaction isolation uses SERIALIZABLE mode and is verified, and data persistence is confirmed through postgres-0 restarts. Query latency (p95) is 8ms with replication lag of 0ms.
Redis Sentinel Tests (6/6 Passed): The Redis master connection is online, all 3 sentinels are discoverable and operational, master-replica replication is synchronized on redis-replica-0, automatic failover occurs within 10 seconds when redis-replica-0 is promoted to primary, RDB snapshots are written to disk for persistence, and performance reaches over 10,000 SET/GET operations per second.
Result: All cluster-mode tests passed successfully.
Best Practices
- Test regularly — Run cluster tests on every deployment
- Test failures — Inject failures to verify recovery
- Monitor performance — Track latency and throughput over time
- Data consistency — Verify ACID properties and replication lag
- Document expectations — Keep SLOs visible in test output
