DevToolBoxKOSTENLOS
Blog

Apache Kafka Leitfaden: Architektur, Producer, Consumer, Streams, Connect & Performance

27 Min. Lesezeitvon DevToolBox Team

Apache Kafka is a distributed event streaming platform used by thousands of companies for high-throughput, low-latency data pipelines, streaming analytics, and mission-critical applications. Originally developed at LinkedIn and open-sourced in 2011, Kafka has become the de facto standard for event-driven architectures. This comprehensive guide covers Kafka architecture, producers, consumers, client libraries for Node.js, Java, and Python, Kafka Streams, Kafka Connect, Schema Registry, exactly-once semantics, security, performance tuning, and monitoring.

TL;DR

Kafka is a distributed commit log that organizes data into topics and partitions across a cluster of brokers. Producers write events, consumers read them via consumer groups, and Kafka guarantees ordering within partitions. Use KRaft mode (no ZooKeeper) for new deployments, enable idempotent producers for safety, and leverage Kafka Streams or Connect for processing pipelines. Target 10-100 partitions per topic, tune batch.size and linger.ms for throughput, and monitor consumer lag as your primary health metric.

Key Takeaways

  • Kafka organizes data into topics split across partitions for parallel processing. Each partition is an ordered, immutable append-only log.
  • KRaft mode replaces ZooKeeper for metadata management in Kafka 3.3+, simplifying operations and improving scalability.
  • Consumer groups enable horizontal scaling — each partition is consumed by exactly one consumer within a group.
  • Idempotent producers (enable.idempotence=true) prevent duplicate messages during retries at no significant performance cost.
  • Exactly-once semantics require transactional producers + read_committed isolation level on consumers.
  • Kafka Connect provides pre-built connectors for databases, cloud storage, and search engines without custom code.
  • Consumer lag is the most important operational metric — monitor it with Prometheus, Burrow, or Cruise Control.

Kafka Architecture

Kafka is a distributed system composed of brokers, topics, partitions, and replicas. Understanding these building blocks is essential before writing any code.

Brokers and Cluster

A Kafka cluster consists of multiple brokers (servers). Each broker stores a subset of topic partitions and serves client requests. Brokers are identified by unique integer IDs. One broker acts as the controller, managing partition leadership and cluster metadata.

Topics and Partitions

A topic is a named category of messages. Each topic is split into one or more partitions. A partition is an ordered, immutable sequence of records, each assigned a sequential offset. Partitions are distributed across brokers for parallelism and fault tolerance.

# Kafka Cluster Architecture
#
# Cluster: 3 Brokers, Topic "orders" with 3 partitions, replication factor 3
#
# Broker 0 (Controller)      Broker 1                  Broker 2
# ┌──────────────────┐       ┌──────────────────┐      ┌──────────────────┐
# │ orders-P0 (L)    │       │ orders-P0 (F)    │      │ orders-P0 (F)    │
# │ orders-P1 (F)    │       │ orders-P1 (L)    │      │ orders-P1 (F)    │
# │ orders-P2 (F)    │       │ orders-P2 (F)    │      │ orders-P2 (L)    │
# └──────────────────┘       └──────────────────┘      └──────────────────┘
#
# L = Leader, F = Follower
# ISR (In-Sync Replicas) = all replicas that are caught up with the leader

Replication

Each partition has a configurable replication factor (typically 3). One replica is the leader (handles reads/writes) and the others are followers (replicate the leader). If the leader fails, a follower is elected as the new leader. ISR (In-Sync Replicas) tracks which replicas are caught up.

ZooKeeper vs KRaft

Historically, Kafka used Apache ZooKeeper for metadata management (broker registration, topic configuration, controller election). KRaft (Kafka Raft) mode, production-ready since Kafka 3.3, moves metadata into Kafka itself using a Raft consensus protocol. KRaft eliminates the ZooKeeper dependency, supports more partitions, and simplifies deployment.

# KRaft mode server.properties (no ZooKeeper required)
# Kafka 3.3+ — production-ready metadata management

# Node roles: controller, broker, or both
process.roles=broker,controller

# Unique node ID
node.id=1

# Controller quorum voters: id@host:port
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093

# Listener configuration
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER

# Log directories
log.dirs=/var/kafka-logs
metadata.log.dir=/var/kafka-metadata

# Cluster ID (generate with kafka-storage.sh random-uuid)
# kafka-storage.sh format -t <uuid> -c server.properties

Producers

Producers publish records to Kafka topics. Understanding partitioning, acknowledgments, and idempotency is critical for reliable data ingestion.

Partitioning Strategies

Producers determine which partition receives a record using a partitioner. The default partitioner hashes the record key to select a partition (records with the same key always go to the same partition). If no key is provided, the sticky partitioner batches records to the same partition for efficiency.

Acknowledgments (acks)

The acks setting controls durability guarantees:

  • acks=0: Fire and forget. Producer does not wait for any acknowledgment. Fastest but risks data loss.
  • acks=1: Leader acknowledgment. Producer waits for the partition leader to write the record. Data loss possible if leader fails before replication.
  • acks=all: Full ISR acknowledgment. Producer waits for all in-sync replicas to write. Strongest durability guarantee. Use with min.insync.replicas=2 for production.
# Producer configuration — production settings
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092

# Durability
acks=all
enable.idempotence=true
max.in.flight.requests.per.connection=5
retries=2147483647

# Batching and throughput
batch.size=65536
linger.ms=10
buffer.memory=67108864
compression.type=lz4

# Serialization
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

# Partitioner
partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner

Idempotent Producers

Enable enable.idempotence=true to prevent duplicate records during retries. The broker assigns a producer ID and sequence number to each record, deduplicating any retries automatically. This has negligible performance overhead and should be enabled by default.

Compression

Producers can compress batches before sending. Compression reduces network bandwidth and broker storage at the cost of CPU. Supported codecs: none, gzip, snappy, lz4, zstd. Use lz4 or zstd for the best balance of speed and compression ratio.

Consumers

Consumers read records from Kafka topics. Consumer groups enable parallel processing, and offset management controls exactly which records are processed.

Consumer Groups

A consumer group is a set of consumers that cooperatively consume a topic. Each partition is assigned to exactly one consumer in the group. If there are more consumers than partitions, some consumers are idle. If a consumer fails, its partitions are reassigned to the remaining consumers (rebalancing).

Offset Management

Each consumer tracks its position in each partition via an offset. Offsets are committed to the __consumer_offsets internal topic. Auto-commit (enable.auto.commit=true) periodically commits offsets, but manual commit (commitSync/commitAsync) provides more control. Always commit after successfully processing a batch to avoid reprocessing or data loss.

Rebalancing Strategies

When consumers join or leave a group, partitions are rebalanced:

  • Eager: All partitions are revoked and reassigned. Causes a stop-the-world pause.
  • Cooperative Sticky: Only affected partitions are reassigned. Other consumers continue processing. Use CooperativeStickyAssignor for production.

Kafka with Node.js (KafkaJS)

KafkaJS is the most popular pure JavaScript client for Apache Kafka. It supports producers, consumers, admin operations, and exactly-once semantics.

Node.js Producer Example

// Node.js Producer with KafkaJS
const { Kafka, CompressionTypes } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
  retry: { retries: 5, initialRetryTime: 300 },
});

const producer = kafka.producer({
  idempotent: true,            // prevent duplicate messages
  maxInFlightRequests: 5,
  transactionalId: 'order-tx', // for exactly-once semantics
});

async function sendOrder(order) {
  await producer.connect();

  // Transactional send (exactly-once)
  const transaction = await producer.transaction();
  try {
    await transaction.send({
      topic: 'orders',
      compression: CompressionTypes.LZ4,
      messages: [
        {
          key: order.userId,      // ensures per-user ordering
          value: JSON.stringify(order),
          headers: {
            'event-type': 'ORDER_CREATED',
            'correlation-id': order.id,
          },
        },
      ],
    });

    // Send to analytics topic in same transaction
    await transaction.send({
      topic: 'order-analytics',
      messages: [
        {
          key: order.userId,
          value: JSON.stringify({
            orderId: order.id,
            total: order.total,
            timestamp: Date.now(),
          }),
        },
      ],
    });

    await transaction.commit();
    console.log('Order sent successfully:', order.id);
  } catch (err) {
    await transaction.abort();
    console.error('Transaction aborted:', err.message);
    throw err;
  }
}

Node.js Consumer Example

// Node.js Consumer with KafkaJS
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'order-processor',
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
});

const consumer = kafka.consumer({
  groupId: 'order-processing-group',
  sessionTimeout: 30000,
  heartbeatInterval: 3000,
  maxWaitTimeInMs: 5000,
  // Use CooperativeSticky for non-stop rebalancing
  partitionAssigners: [
    require('kafkajs').PartitionAssigners.roundRobin,
  ],
});

async function startConsumer() {
  await consumer.connect();
  await consumer.subscribe({
    topics: ['orders'],
    fromBeginning: false,
  });

  await consumer.run({
    autoCommit: false, // manual offset management
    eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => {
      for (const message of batch.messages) {
        const order = JSON.parse(message.value.toString());
        const eventType = message.headers['event-type']?.toString();

        console.log(
          'Processing:', eventType,
          'Partition:', batch.partition,
          'Offset:', message.offset
        );

        try {
          await processOrder(order); // your business logic
          resolveOffset(message.offset);
          await commitOffsetsIfNecessary();
        } catch (err) {
          console.error('Processing failed:', err);
          // Dead letter queue pattern
          await sendToDeadLetterQueue(message, err);
          resolveOffset(message.offset);
        }
      }
    },
  });
}

// Graceful shutdown
process.on('SIGTERM', async () => {
  console.log('Shutting down consumer...');
  await consumer.disconnect();
  process.exit(0);
});

Error Handling

// Error handling patterns for KafkaJS

const { Kafka } = require('kafkajs');
const kafka = new Kafka({
  clientId: 'resilient-service',
  brokers: ['kafka-1:9092', 'kafka-2:9092'],
  retry: {
    retries: 8,
    initialRetryTime: 100,
    maxRetryTime: 30000,
    factor: 2, // exponential backoff
  },
  connectionTimeout: 10000,
  requestTimeout: 30000,
});

const producer = kafka.producer();

// Retry wrapper with circuit breaker pattern
async function sendWithRetry(topic, messages, maxRetries = 3) {
  let attempt = 0;
  while (attempt < maxRetries) {
    try {
      await producer.send({ topic, messages });
      return; // success
    } catch (err) {
      attempt++;
      if (err.type === 'LEADER_NOT_AVAILABLE') {
        // Transient error — retry after backoff
        const delay = Math.min(1000 * Math.pow(2, attempt), 30000);
        console.warn('Leader not available, retrying in ' + delay + 'ms');
        await new Promise(r => setTimeout(r, delay));
      } else if (err.type === 'REQUEST_TIMED_OUT') {
        console.warn('Request timed out, attempt ' + attempt);
      } else {
        // Non-retriable error
        console.error('Fatal producer error:', err);
        throw err;
      }
    }
  }
  throw new Error('Max retries exceeded for topic: ' + topic);
}

Kafka with Java / Spring Boot

Spring Kafka provides deep integration with the Spring ecosystem. It wraps the Apache Kafka Java client with convenient abstractions like KafkaTemplate and @KafkaListener.

Spring Kafka Producer

// Spring Kafka Producer — OrderProducerService.java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;

@Service
public class OrderProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public OrderProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendOrder(String orderId, String orderJson) {
        CompletableFuture<SendResult<String, String>> future =
            kafkaTemplate.send("orders", orderId, orderJson);

        future.whenComplete((result, ex) -> {
            if (ex != null) {
                System.err.println("Failed to send order: " + ex.getMessage());
            } else {
                System.out.println("Order sent to partition "
                    + result.getRecordMetadata().partition()
                    + " offset " + result.getRecordMetadata().offset());
            }
        });
    }

    // Transactional send
    public void sendOrderTransactional(String orderId, String orderJson) {
        kafkaTemplate.executeInTransaction(ops -> {
            ops.send("orders", orderId, orderJson);
            ops.send("order-audit", orderId, "CREATED:" + orderJson);
            return true;
        });
    }
}

Spring Kafka Consumer

// Spring Kafka Consumer — OrderConsumerService.java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class OrderConsumerService {

    @KafkaListener(
        topics = "orders",
        groupId = "order-processing-group",
        concurrency = "3",   // 3 consumer threads
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrder(
            @Payload String orderJson,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            Acknowledgment ack) {

        try {
            System.out.println("Received order on partition "
                + partition + " offset " + offset);

            // Process the order
            Order order = objectMapper.readValue(orderJson, Order.class);
            orderService.process(order);

            // Manual acknowledgment after successful processing
            ack.acknowledge();
        } catch (Exception e) {
            System.err.println("Error processing order: " + e.getMessage());
            // Send to dead letter topic or retry topic
            kafkaTemplate.send("orders.DLT", orderJson);
            ack.acknowledge(); // still ack to avoid infinite loop
        }
    }

    // Batch listener for high-throughput scenarios
    @KafkaListener(topics = "events", groupId = "event-batch-group")
    public void handleEventBatch(List<String> events, Acknowledgment ack) {
        System.out.println("Batch of " + events.size() + " events");
        events.forEach(this::processEvent);
        ack.acknowledge();
    }
}

application.yml Configuration

# application.yml — Spring Kafka Configuration
spring:
  kafka:
    bootstrap-servers: kafka-1:9092,kafka-2:9092,kafka-3:9092

    producer:
      acks: all
      retries: 3
      batch-size: 65536
      linger-ms: 10
      compression-type: lz4
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        enable.idempotence: true
        max.in.flight.requests.per.connection: 5

    consumer:
      group-id: order-processing-group
      auto-offset-reset: earliest
      enable-auto-commit: false
      max-poll-records: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        isolation.level: read_committed
        partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor

    listener:
      ack-mode: manual
      concurrency: 3

Kafka with Python

confluent-kafka-python is the recommended Python client, built on librdkafka for high performance. It supports Avro serialization with Schema Registry integration.

Python Producer Example

# Python Producer with confluent-kafka
from confluent_kafka import Producer
import json
import socket

conf = {
    'bootstrap.servers': 'kafka-1:9092,kafka-2:9092',
    'client.id': socket.gethostname(),
    'acks': 'all',
    'enable.idempotence': True,
    'compression.type': 'lz4',
    'batch.size': 65536,
    'linger.ms': 10,
    'retries': 5,
}

producer = Producer(conf)

def delivery_callback(err, msg):
    if err:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Delivered to {msg.topic()} '
              f'partition [{msg.partition()}] '
              f'offset {msg.offset()}')

def send_order(order):
    producer.produce(
        topic='orders',
        key=order['user_id'],
        value=json.dumps(order).encode('utf-8'),
        headers={'event-type': 'ORDER_CREATED'},
        callback=delivery_callback,
    )
    # Trigger delivery callbacks
    producer.poll(0)

# Flush remaining messages on shutdown
def shutdown():
    remaining = producer.flush(timeout=30)
    if remaining > 0:
        print(f'Warning: {remaining} messages not delivered')

# Usage
order = {
    'id': 'ORD-001',
    'user_id': 'USR-123',
    'items': [{'sku': 'WIDGET-A', 'qty': 2, 'price': 9.99}],
    'total': 19.98,
}
send_order(order)

Python Consumer with Avro

# Python Consumer with Avro deserialization
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import (
    SerializationContext, MessageField
)

# Schema Registry client
sr_client = SchemaRegistryClient({
    'url': 'http://schema-registry:8081'
})

# Avro deserializer (auto-fetches schema from registry)
avro_deserializer = AvroDeserializer(
    sr_client,
    schema_str=None,  # auto-detect from schema registry
)

consumer_conf = {
    'bootstrap.servers': 'kafka-1:9092,kafka-2:9092',
    'group.id': 'order-avro-consumer',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
    'isolation.level': 'read_committed',
}

consumer = Consumer(consumer_conf)
consumer.subscribe(['orders-avro'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f'Consumer error: {msg.error()}')
            continue

        # Deserialize Avro message
        order = avro_deserializer(
            msg.value(),
            SerializationContext(msg.topic(), MessageField.VALUE),
        )

        print(f'Order: {order["id"]} '
              f'Total: ${order["total"]:.2f} '
              f'Partition: {msg.partition()} '
              f'Offset: {msg.offset()}')

        # Process order...
        process_order(order)

        # Manual commit after processing
        consumer.commit(asynchronous=False)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Kafka Streams

Kafka Streams is a client library for building real-time stream processing applications. It processes data from input topics and writes results to output topics, with built-in support for stateful operations, windowing, and exactly-once processing.

Core Concepts

  • KStream: An unbounded stream of records. Each record is an independent event (insert semantics).
  • KTable: A changelog stream. Each record is an update to a key (upsert semantics). Represents the latest value for each key.
  • Windowing: Groups records by time windows (tumbling, hopping, sliding, session) for time-based aggregations.
  • Joins: KStream-KStream (windowed), KStream-KTable (enrichment), KTable-KTable (materialized view).

Word Count Example

// Kafka Streams — Word Count Application (Java)
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Arrays;
import java.util.Properties;

public class WordCountApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
            Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
            Serdes.String().getClass());
        // Exactly-once processing
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
            StreamsConfig.EXACTLY_ONCE_V2);

        StreamsBuilder builder = new StreamsBuilder();

        // KStream: read from "text-input" topic
        KStream<String, String> textLines =
            builder.stream("text-input");

        // Process: split lines into words, group, count
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(line ->
                Arrays.asList(line.toLowerCase().split("\\W+")))
            .filter((key, word) -> word.length() > 0)
            .groupBy((key, word) -> word)
            .count(Materialized.as("word-counts-store"));

        // Write results to "word-counts" output topic
        wordCounts.toStream()
            .to("word-counts", Produced.with(
                Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(
            builder.build(), props);

        // Graceful shutdown
        Runtime.getRuntime().addShutdownHook(
            new Thread(streams::close));

        streams.start();
    }
}

Windowed Aggregation

// Windowed Aggregation — Orders per minute per region
StreamsBuilder builder = new StreamsBuilder();

KStream<String, Order> orders = builder.stream(
    "orders",
    Consumed.with(Serdes.String(), orderSerde)
);

// Tumbling window: 1-minute non-overlapping windows
KTable<Windowed<String>, Long> ordersPerMinute = orders
    .groupBy((key, order) -> order.getRegion())
    .windowedBy(TimeWindows.ofSizeWithNoGrace(
        Duration.ofMinutes(1)))
    .count(Materialized.as("orders-per-minute"));

// Hopping window: 5-minute window, advancing every 1 minute
KTable<Windowed<String>, Double> avgOrderValue = orders
    .groupBy((key, order) -> order.getRegion())
    .windowedBy(TimeWindows.ofSizeAndGrace(
        Duration.ofMinutes(5), Duration.ofSeconds(30))
        .advanceBy(Duration.ofMinutes(1)))
    .aggregate(
        () -> new OrderStats(0, 0.0),
        (key, order, stats) -> stats.add(order.getTotal()),
        Materialized.with(Serdes.String(), orderStatsSerde)
    )
    .mapValues(stats -> stats.getAverage());

// Session window: group by user session (30-min inactivity gap)
KTable<Windowed<String>, Long> userSessions = orders
    .groupBy((key, order) -> order.getUserId())
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(
        Duration.ofMinutes(30)))
    .count(Materialized.as("user-sessions"));

Kafka Connect

Kafka Connect is a framework for streaming data between Kafka and external systems without writing code. It uses pre-built connectors that handle serialization, offset management, and fault tolerance automatically.

Source Connectors (External to Kafka)

Source connectors read data from external systems and write to Kafka topics:

// Kafka Connect — JDBC Source Connector Configuration
// Reads from PostgreSQL, writes to Kafka topics
{
  "name": "postgres-source-orders",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://db:5432/ecommerce",
    "connection.user": "kafka_connect",
    "connection.password": "${POSTGRES_PASSWORD}",
    "table.whitelist": "orders,order_items,customers",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "updated_at",
    "incrementing.column.name": "id",
    "topic.prefix": "db.",
    "poll.interval.ms": 5000,
    "batch.max.rows": 1000,
    "transforms": "createKey,extractId",
    "transforms.createKey.type":
      "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id",
    "transforms.extractId.type":
      "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractId.field": "id"
  }
}

Sink Connectors (Kafka to External)

Sink connectors read from Kafka topics and write to external systems:

// Kafka Connect — S3 Sink Connector Configuration
// Reads from Kafka, writes Parquet files to S3
{
  "name": "s3-sink-orders",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": 3,
    "topics": "db.orders,db.order_items",
    "s3.region": "us-east-1",
    "s3.bucket.name": "data-lake-raw",
    "s3.part.size": 5242880,
    "flush.size": 10000,
    "rotate.interval.ms": 600000,
    "storage.class":
      "io.confluent.connect.s3.storage.S3Storage",
    "format.class":
      "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "parquet.codec": "snappy",
    "partitioner.class":
      "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "partition.duration.ms": 3600000,
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "en-US",
    "timezone": "UTC",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "updated_at"
  }
}

Debezium CDC Example

Debezium captures database changes (inserts, updates, deletes) and streams them as events. This is the gold standard for Change Data Capture (CDC):

// Debezium CDC — MySQL Source Connector
// Captures INSERT, UPDATE, DELETE events from MySQL binlog
{
  "name": "mysql-cdc-source",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-primary",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "${MYSQL_PASSWORD}",
    "database.server.id": "184054",
    "topic.prefix": "cdc",
    "database.include.list": "ecommerce",
    "table.include.list": "ecommerce.orders,ecommerce.customers",
    "include.schema.changes": true,
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes",
    "snapshot.mode": "initial",
    "transforms": "unwrap",
    "transforms.unwrap.type":
      "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": false,
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Schema Registry

Schema Registry provides a centralized repository for schemas (Avro, Protobuf, JSON Schema) used by Kafka producers and consumers. It enforces compatibility rules to prevent breaking changes and enables schema evolution.

Supported Formats

  • Avro: Compact binary format with schema evolution. Most widely used with Kafka. Schemas defined in JSON.
  • Protobuf: Google Protocol Buffers. Strongly typed, code-generated serialization. Good for cross-language services.
  • JSON Schema: Validates JSON payloads. Easiest to adopt but largest wire format.

Compatibility Modes

Schema Registry enforces compatibility rules when evolving schemas:

ModeAllowed ChangesUse Case
BACKWARDRemove fields, add optional fieldsDefault. New consumers can read old data
FORWARDAdd fields, remove optional fieldsOld consumers can read new data
FULLAdd/remove optional fields onlyBoth directions. Most restrictive
NONEAny change allowedDevelopment only. No checks

Registering and Using Schemas

# Schema Registry Compatibility Modes
#
# Mode             Allowed Changes               Use Case
# ──────────────── ──────────────────────────── ──────────────────────
# BACKWARD         - Remove fields               Default. New consumers
#                  - Add optional fields          can read old data.
#
# FORWARD          - Add fields                  Old consumers can read
#                  - Remove optional fields       new data.
#
# FULL             - Add optional fields          Both directions.
#                  - Remove optional fields       Most restrictive.
#
# NONE             - Any change allowed           No compatibility check.
#                                                 Use for development only.

# Set compatibility mode via REST API
curl -X PUT http://schema-registry:8081/config/orders-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'

# Register a new Avro schema
curl -X POST http://schema-registry:8081/subjects/orders-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schemaType": "AVRO",
    "schema": "{
      \"type\": \"record\",
      \"name\": \"Order\",
      \"namespace\": \"com.example.orders\",
      \"fields\": [
        {\"name\": \"id\", \"type\": \"string\"},
        {\"name\": \"user_id\", \"type\": \"string\"},
        {\"name\": \"total\", \"type\": \"double\"},
        {\"name\": \"status\", \"type\": \"string\",
         \"default\": \"PENDING\"},
        {\"name\": \"created_at\", \"type\": \"long\",
         \"logicalType\": \"timestamp-millis\"}
      ]
    }"
  }'

# Check compatibility before registering
curl -X POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schemaType": "AVRO", "schema": "..."}'

Exactly-Once Semantics (EOS)

Exactly-once semantics ensure that each record is processed exactly once, even in the presence of failures. This requires coordination between producers, brokers, and consumers.

Three Pillars of EOS

  1. Idempotent Producer: Prevents duplicate writes. Enabled with enable.idempotence=true. Each produce request has a unique sequence number that the broker deduplicates.
  2. Transactional Producer: Groups multiple writes into an atomic transaction. Either all records are committed or none. Requires transactional.id.
  3. Read Committed Consumer: Only reads records from committed transactions. Set isolation.level=read_committed.

Transactional Producer Example

// Exactly-Once Semantics — Transactional Producer (Java)
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-001");
props.put(ProducerConfig.ACKS_CONFIG, "all");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Initialize transactions (call once)
producer.initTransactions();

try {
    // Begin transaction
    producer.beginTransaction();

    // Send to multiple topics atomically
    producer.send(new ProducerRecord<>(
        "orders", orderId, orderJson));
    producer.send(new ProducerRecord<>(
        "order-events", orderId, "ORDER_CREATED"));
    producer.send(new ProducerRecord<>(
        "inventory-updates", productId,
        "DECREMENT:" + quantity));

    // Commit consumer offsets as part of the transaction
    // (for consume-transform-produce pattern)
    producer.sendOffsetsToTransaction(
        offsets, consumerGroupMetadata);

    // Commit — all records become visible atomically
    producer.commitTransaction();

} catch (ProducerFencedException e) {
    // Another producer with same transactional.id took over
    producer.close();
} catch (KafkaException e) {
    // Abort — none of the records become visible
    producer.abortTransaction();
}

Kafka Security

Production Kafka clusters require authentication, authorization, and encryption to protect data in transit and at rest.

SASL Authentication

Kafka supports multiple SASL mechanisms:

  • SASL/PLAIN: Username/password. Simple but credentials sent in cleartext (use with TLS).
  • SASL/SCRAM: Salted challenge-response. More secure than PLAIN, credentials stored as hashes.
  • SASL/GSSAPI: Enterprise-grade SSO integration. Complex setup but most secure.
  • SASL/OAUTHBEARER: OAuth 2.0 token-based auth. Modern and cloud-friendly.

SSL/TLS Encryption

TLS encrypts data in transit between clients and brokers, and between brokers (inter-broker communication). Configure ssl.keystore and ssl.truststore on both brokers and clients.

# Broker configuration — SASL/SCRAM + TLS
# server.properties

# Enable SASL_SSL for client connections
listeners=SASL_SSL://:9093
advertised.listeners=SASL_SSL://kafka-1.example.com:9093

# Inter-broker communication
inter.broker.listener.name=SASL_SSL
security.inter.broker.protocol=SASL_SSL

# SASL mechanism
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512

# TLS configuration
ssl.keystore.location=/etc/kafka/ssl/kafka.keystore.jks
ssl.keystore.password=keystore-pass
ssl.key.password=key-pass
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
ssl.truststore.password=truststore-pass
ssl.client.auth=required
ssl.endpoint.identification.algorithm=https

# --- Client configuration (producer/consumer) ---
# client.properties

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="app-user" \
  password="app-password";
ssl.truststore.location=/etc/kafka/ssl/client.truststore.jks
ssl.truststore.password=truststore-pass

ACLs (Access Control Lists)

Kafka ACLs control which principals can perform which operations on which resources:

# Kafka ACL Examples

# Grant producer permissions to user "order-service"
kafka-acls.sh --bootstrap-server kafka:9092 \
  --command-config admin.properties \
  --add --allow-principal User:order-service \
  --operation Write --operation Describe \
  --topic orders

# Grant consumer permissions to user "analytics-service"
kafka-acls.sh --bootstrap-server kafka:9092 \
  --command-config admin.properties \
  --add --allow-principal User:analytics-service \
  --operation Read --operation Describe \
  --topic orders \
  --group analytics-group

# Deny all access to a topic from a specific IP
kafka-acls.sh --bootstrap-server kafka:9092 \
  --command-config admin.properties \
  --add --deny-principal User:* \
  --deny-host 10.0.0.50 \
  --operation All \
  --topic sensitive-data

# List all ACLs
kafka-acls.sh --bootstrap-server kafka:9092 \
  --command-config admin.properties \
  --list

# Remove an ACL
kafka-acls.sh --bootstrap-server kafka:9092 \
  --command-config admin.properties \
  --remove --allow-principal User:old-service \
  --operation Write --topic orders

Performance Tuning

Kafka can achieve millions of messages per second when properly tuned. The key levers are batching, compression, partition count, and OS-level settings.

Producer Tuning

  • batch.size: Maximum bytes per batch. Increase to 65536-131072 for throughput workloads.
  • linger.ms: Time to wait for additional records before sending a batch. Set to 5-20ms to fill batches better.
  • buffer.memory: Total producer buffer memory. Increase for high-throughput producers.
  • compression.type: Use lz4 or zstd. Reduces network I/O and broker disk usage by 50-80%.

Consumer Tuning

  • fetch.min.bytes: Minimum data for a fetch response. Increase to 1024-65536 to reduce fetch frequency.
  • max.poll.records: Maximum records per poll. Tune based on processing time per record.
  • session.timeout.ms: Consumer failure detection timeout. Reduce for faster failover.

Broker and OS Tuning

# Broker and OS performance tuning

# --- server.properties (broker) ---

# Increase network and I/O threads
num.network.threads=8
num.io.threads=16
num.replica.fetchers=4

# Socket buffer sizes
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600

# Log flush (let OS handle — Kafka uses page cache)
log.flush.interval.messages=10000
log.flush.interval.ms=1000

# Replication
replica.lag.time.max.ms=30000
min.insync.replicas=2
unclean.leader.election.enable=false

# Log retention
log.retention.hours=168
log.retention.bytes=-1
log.segment.bytes=1073741824
log.cleanup.policy=delete

# --- OS-level tuning (Linux) ---

# Increase file descriptor limit
# /etc/security/limits.conf
# kafka  soft  nofile  100000
# kafka  hard  nofile  100000

# Increase page cache (vm.dirty_ratio)
echo 60 > /proc/sys/vm/dirty_background_ratio
echo 80 > /proc/sys/vm/dirty_ratio

# Increase max socket buffer size
sysctl -w net.core.wmem_max=2097152
sysctl -w net.core.rmem_max=2097152

# Disable swap
swapoff -a

# Use XFS filesystem for log directories
# XFS provides better sequential write performance than ext4

Partition Count Guidelines

Partition count determines parallelism. Guidelines:

  1. Start with 10-50 partitions per topic for most workloads.
  2. Use formula: max(T/P, T/C) where T=target throughput, P=producer throughput per partition, C=consumer throughput per partition.
  3. More partitions mean more file handles, leader elections, and end-to-end latency. Do not exceed 4000 partitions per broker.
  4. Partitions can only be increased, never decreased. Start conservatively.

Kafka Monitoring

Effective monitoring is critical for Kafka operations. Focus on broker health, producer/consumer metrics, and consumer lag.

Key JMX Metrics

Kafka exposes hundreds of JMX metrics. These are the most important:

# Key Kafka JMX Metrics to Monitor
#
# Broker Metrics
# ─────────────────────────────────────────────────────────────
# kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
#   → Messages received per second (cluster throughput)
#
# kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
#   → Bytes received per second
#
# kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
#   → Bytes sent per second (consumer fetch)
#
# kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
#   → Partitions where ISR < replication factor (ALERT if > 0)
#
# kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
#   → Rate of ISR shrinking (follower falling behind)
#
# kafka.controller:type=KafkaController,name=OfflinePartitionsCount
#   → Partitions with no leader (CRITICAL ALERT if > 0)
#
# kafka.controller:type=KafkaController,name=ActiveControllerCount
#   → Should always be 1 in the cluster
#
# Producer Metrics
# ─────────────────────────────────────────────────────────────
# kafka.producer:type=producer-metrics,name=record-send-rate
#   → Records sent per second
#
# kafka.producer:type=producer-metrics,name=request-latency-avg
#   → Average produce request latency
#
# Consumer Metrics
# ─────────────────────────────────────────────────────────────
# kafka.consumer:type=consumer-fetch-manager-metrics,name=records-lag-max
#   → Maximum consumer lag across all partitions (KEY METRIC)
#
# kafka.consumer:type=consumer-coordinator-metrics,name=rebalance-rate-per-hour
#   → Consumer group rebalance frequency

Prometheus + Grafana Setup

The standard monitoring stack for Kafka uses JMX Exporter to expose metrics to Prometheus, with Grafana dashboards for visualization:

# Prometheus + JMX Exporter configuration for Kafka

# 1. Download JMX Exporter jar
# wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.20.0/jmx_prometheus_javaagent-0.20.0.jar

# 2. Create kafka-jmx-config.yml
rules:
  - pattern: "kafka.server<type=BrokerTopicMetrics, name=(\w+)><>Count"
    name: "kafka_server_broker_topic_metrics_\$1_total"
    type: COUNTER
  - pattern: "kafka.server<type=ReplicaManager, name=(\w+)><>Value"
    name: "kafka_server_replica_manager_\$1"
    type: GAUGE
  - pattern: "kafka.controller<type=KafkaController, name=(\w+)><>Value"
    name: "kafka_controller_\$1"
    type: GAUGE
  - pattern: "kafka.server<type=KafkaRequestHandlerPool, name=RequestHandlerAvgIdlePercent><>MeanRate"
    name: "kafka_request_handler_avg_idle_percent"
    type: GAUGE
  - pattern: "kafka.network<type=RequestMetrics, name=(\w+), request=(\w+)><>Count"
    name: "kafka_network_request_\$1_\$2_total"
    type: COUNTER

# 3. Start Kafka broker with JMX Exporter
# KAFKA_OPTS="-javaagent:/opt/jmx_prometheus_javaagent.jar=7071:/opt/kafka-jmx-config.yml"

# 4. Prometheus scrape config (prometheus.yml)
scrape_configs:
  - job_name: 'kafka'
    static_configs:
      - targets:
        - 'kafka-1:7071'
        - 'kafka-2:7071'
        - 'kafka-3:7071'
    scrape_interval: 15s

# 5. Grafana Dashboard
# Import dashboard ID 7589 (Kafka Overview)
# or ID 12460 (Confluent Kafka)

Consumer Lag Monitoring

Consumer lag is the difference between the latest offset in a partition and the consumer group committed offset. Increasing lag indicates consumers cannot keep up with producers. Monitor with:

  • kafka-consumer-groups.sh: Built-in CLI tool showing lag per partition.
  • Burrow: LinkedIn open-source consumer lag monitoring with anomaly detection.
  • Prometheus: JMX Exporter or kafka-lag-exporter.

Cruise Control

LinkedIn Cruise Control automates Kafka cluster operations: partition rebalancing, broker decommissioning, and anomaly detection. It monitors load distribution and generates rebalance proposals.

# Cruise Control configuration and usage

# cruise-control.properties
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
zookeeper.connect=zk:2181  # or metadata.bootstrap.servers for KRaft
webserver.http.port=9090
capacity.config.file=/opt/cruise-control/capacity.json

# --- Common API calls ---

# Get cluster state
curl http://cruise-control:9090/kafkacruisecontrol/state

# Check partition load distribution
curl "http://cruise-control:9090/kafkacruisecontrol/load?json=true"

# Generate rebalance proposal (dry run)
curl "http://cruise-control:9090/kafkacruisecontrol/proposals?json=true"

# Execute rebalance
curl -X POST "http://cruise-control:9090/kafkacruisecontrol/rebalance?dryrun=false&json=true"

# Decommission a broker (move all partitions away)
curl -X POST "http://cruise-control:9090/kafkacruisecontrol/remove_broker?brokerid=2&dryrun=false"

# Add a new broker to the cluster (rebalance onto it)
curl -X POST "http://cruise-control:9090/kafkacruisecontrol/add_broker?brokerid=4&dryrun=false"

Frequently Asked Questions

What is Apache Kafka and when should I use it?

Kafka is a distributed event streaming platform for building real-time data pipelines and streaming applications. Use it when you need high-throughput event ingestion, decoupled microservices communication, event sourcing, log aggregation, or real-time analytics. Kafka handles millions of messages per second with low latency and strong durability guarantees.

What is the difference between Kafka and a traditional message queue like RabbitMQ?

Kafka is a distributed log (messages are retained and can be replayed), while traditional queues delete messages after consumption. Kafka supports multiple consumers reading the same data independently (consumer groups), provides ordering within partitions, and scales horizontally. RabbitMQ is better for complex routing and when you need message acknowledgment per message.

How many partitions should I create for a topic?

Start with 10-50 partitions for most workloads. The formula is max(T/P, T/C) where T is target throughput, P is producer throughput per partition, and C is consumer throughput per partition. More partitions increase parallelism but also increase metadata overhead, file handles, and rebalancing time. Partitions cannot be decreased once created.

Should I use ZooKeeper or KRaft mode?

For new deployments, use KRaft mode. It is production-ready since Kafka 3.3, eliminates the ZooKeeper dependency, supports more partitions per cluster, and simplifies operations. Existing ZooKeeper-based clusters can be migrated to KRaft using the migration tool provided by Apache Kafka.

How do I achieve exactly-once semantics in Kafka?

Enable idempotent producers (enable.idempotence=true), use transactional producers (set transactional.id and wrap sends in beginTransaction/commitTransaction), and configure consumers with isolation.level=read_committed. For Kafka Streams, set processing.guarantee=exactly_once_v2.

What is consumer lag and why does it matter?

Consumer lag is the difference between the latest message offset in a partition and the offset committed by a consumer group. Increasing lag means consumers are falling behind producers. This leads to stale data, increased end-to-end latency, and potential data loss if retention expires. Monitor lag with Burrow, Prometheus, or the kafka-consumer-groups.sh CLI tool.

How does Kafka guarantee message ordering?

Kafka guarantees ordering within a single partition only. Messages with the same key are always sent to the same partition (via key hashing), so per-key ordering is guaranteed. There is no ordering guarantee across partitions. If you need global ordering, use a single partition (limits throughput) or implement ordering logic at the application level.

What is Schema Registry and do I need it?

Schema Registry is a centralized service for managing schemas (Avro, Protobuf, JSON Schema) used by Kafka producers and consumers. It enables schema evolution with compatibility checks, preventing producers from publishing data that consumers cannot read. You need it if you have multiple teams producing/consuming data, require data governance, or want to enforce contracts between services.

𝕏 Twitterin LinkedIn
War das hilfreich?

Bleiben Sie informiert

Wöchentliche Dev-Tipps und neue Tools.

Kein Spam. Jederzeit abbestellbar.

Verwandte Tools ausprobieren

{ }JSON FormatterY→YAML to JSON Converter

Verwandte Artikel

Event-Driven Architecture Leitfaden: Kafka, RabbitMQ, Event Sourcing, CQRS & Saga

Umfassender Leitfaden für ereignisgesteuerte Architektur: Kafka, RabbitMQ, Event Sourcing, CQRS, Saga, Domain Events, asynchrones Messaging und Stream Processing.

Microservices-Muster Leitfaden: Saga, CQRS, Event Sourcing, Service Mesh & DDD

Umfassender Microservices-Muster-Leitfaden: Saga, CQRS, Event Sourcing, Istio Service Mesh, API Gateway, Circuit Breaker, verteiltes Tracing und DDD.

Redis Kompletter Leitfaden: Caching, Pub/Sub, Streams und Produktion

Redis meistern. Anleitung mit Datentypen, ioredis, Caching-Patterns, Session-Speicherung, Pub/Sub, Streams, Python redis-py, Rate Limiting und Transaktionen.