DevToolBoxGRATIS
Blogg

Event-Driven Architecture Guide: Kafka, RabbitMQ, Event Sourcing, CQRS & Saga Patterns

28 min readby DevToolBox Team

Event-Driven Architecture Guide: Kafka, RabbitMQ, Event Sourcing, CQRS, Sagas & Stream Processing

Master event-driven architecture — Apache Kafka deep dive, RabbitMQ patterns, event sourcing with projections and snapshots, CQRS command/query separation, saga orchestration vs choreography, async messaging patterns, schema evolution (Avro/Protobuf), serverless event processing, stream processing with Kafka Streams and Flink, testing strategies, and monitoring with distributed tracing.

TL;DR — Event-Driven Architecture in 60 Seconds
  • Events are immutable facts describing "something that happened"
  • Kafka for high-throughput event streaming and durable logs; RabbitMQ for flexible routing and task queues
  • Event sourcing stores every state change as an immutable event; replay to derive current state
  • CQRS separates read and write models, allowing each to scale and optimize independently
  • Saga pattern for distributed transactions — orchestration for complex flows, choreography for loose coupling
  • Use Avro/Protobuf + Schema Registry for event schema evolution
  • Kafka Streams / Flink for stateful stream processing (windowing, aggregation, joins)
  • Monitor consumer lag, dead letter queues, and end-to-end latency as core health indicators
Key Takeaways
  • EDA achieves temporal and spatial decoupling between services via async communication
  • Kafka partitions and consumer groups natively support horizontal scaling and message replay
  • RabbitMQ exchange types (direct/topic/fanout/headers) provide flexible message routing
  • Event sourcing + projections = full audit trail + multi-view read optimization
  • Schema evolution requires backward compatibility: only add optional fields, never remove required ones
  • Use contract testing, embedded brokers, and event replay to ensure system correctness
  • Distributed tracing (OpenTelemetry) + consumer lag monitoring are the two pillars of observability

1. Event-Driven Architecture Fundamentals

Event-driven architecture (EDA) is a software design paradigm where systems communicate by producing and consuming events. An event represents a state change — a fact that has already occurred. Unlike request-response models, producers in EDA do not know or care who consumes their events.

Core Concepts

ConceptDescriptionExample
EventImmutable fact recording a past state changeOrderPlaced, UserRegistered
CommandIntent to perform an action (may be rejected)PlaceOrder, RegisterUser
QueryRequest data without modifying stateGetOrderById, ListUsers
Event BusTransport channel connecting producers and consumersKafka, RabbitMQ, NATS
ProducerService or component that publishes eventsOrder Service, Payment Service
ConsumerService or component that subscribes to and processes eventsNotification Service, Analytics
// Event interface — the foundation of EDA
interface DomainEvent {
  eventId: string;           // Unique event identifier (UUID)
  eventType: string;         // e.g., "OrderPlaced"
  aggregateId: string;       // ID of the entity that produced the event
  aggregateType: string;     // e.g., "Order"
  timestamp: string;         // ISO 8601 timestamp
  version: number;           // Schema version for evolution
  payload: Record<string, unknown>;
  metadata: {
    correlationId: string;   // Traces a request across services
    causationId: string;     // ID of the event/command that caused this
    userId?: string;
  };
}

// Example: OrderPlaced event
const orderPlaced: DomainEvent = {
  eventId: "evt-a1b2c3d4",
  eventType: "OrderPlaced",
  aggregateId: "order-12345",
  aggregateType: "Order",
  timestamp: "2026-02-28T10:30:00Z",
  version: 1,
  payload: {
    customerId: "cust-67890",
    items: [
      { productId: "prod-001", quantity: 2, price: 29.99 },
      { productId: "prod-002", quantity: 1, price: 49.99 },
    ],
    totalAmount: 109.97,
    currency: "USD",
  },
  metadata: {
    correlationId: "corr-xyz-789",
    causationId: "cmd-place-order-456",
    userId: "user-abc",
  },
};
Tip: Name events in past tense (OrderPlaced, not OrderPlace) because events describe facts that already happened. Name commands in imperative form (PlaceOrder) because they express intent.

2. Apache Kafka Deep Dive

Apache Kafka is a distributed event streaming platform designed for high-throughput, durable, and replayable event processing. Kafka organizes data into topics, each divided into partitions for parallel processing and horizontal scaling.

Topics, Partitions, and Consumer Groups

// Kafka producer — Node.js with kafkajs
import { Kafka, Partitioners } from "kafkajs";

const kafka = new Kafka({
  clientId: "order-service",
  brokers: ["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"],
});

const producer = kafka.producer({
  createPartitioner: Partitioners.DefaultPartitioner,
  idempotent: true,            // Enable exactly-once semantics
  maxInFlightRequests: 5,
  transactionalId: "order-producer-txn",
});

async function publishOrderEvent(order: Order): Promise<void> {
  await producer.connect();

  // Key determines partition — same orderId always goes to same partition
  await producer.send({
    topic: "orders.events",
    messages: [
      {
        key: order.id,
        value: JSON.stringify({
          eventType: "OrderPlaced",
          payload: order,
          timestamp: new Date().toISOString(),
        }),
        headers: {
          "correlation-id": order.correlationId,
          "event-type": "OrderPlaced",
          "schema-version": "1",
        },
      },
    ],
  });
}
// Kafka consumer with consumer group
const consumer = kafka.consumer({
  groupId: "inventory-service-group",
  sessionTimeout: 30000,
  heartbeatInterval: 3000,
  maxBytesPerPartition: 1048576, // 1MB
});

async function startConsumer(): Promise<void> {
  await consumer.connect();
  await consumer.subscribe({
    topics: ["orders.events"],
    fromBeginning: false,
  });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const event = JSON.parse(message.value!.toString());
      const correlationId = message.headers?.["correlation-id"]?.toString();

      console.log(
        `[Partition \${partition}] Processing \${event.eventType}`,
        `offset=\${message.offset}, key=\${message.key}`
      );

      switch (event.eventType) {
        case "OrderPlaced":
          await reserveInventory(event.payload);
          break;
        case "OrderCancelled":
          await releaseInventory(event.payload);
          break;
        default:
          console.warn(`Unknown event type: \${event.eventType}`);
      }
    },
  });
}

Kafka Exactly-Once Semantics

Kafka achieves exactly-once semantics (EOS) through idempotent producers and the transactional API, ensuring messages are not duplicated on producer retries and consumers only see messages after transaction commit.

// Exactly-once: transactional produce + consume
async function processAndForward(inputTopic: string, outputTopic: string) {
  const transaction = await producer.transaction();

  try {
    // Read from input topic
    const messages = await consumeBatch(inputTopic);

    for (const msg of messages) {
      const enriched = await enrichEvent(msg);

      // Write to output topic within the same transaction
      await transaction.send({
        topic: outputTopic,
        messages: [{ key: msg.key, value: JSON.stringify(enriched) }],
      });
    }

    // Commit offsets and produced messages atomically
    await transaction.sendOffsets({
      consumerGroupId: "enrichment-group",
      topics: [{ topic: inputTopic, partitions: messages.map(m => ({
        partition: m.partition,
        offset: (parseInt(m.offset) + 1).toString(),
      }))}],
    });

    await transaction.commit();
  } catch (err) {
    await transaction.abort();
    throw err;
  }
}

3. RabbitMQ Patterns

RabbitMQ is a feature-rich message broker that provides flexible routing through exchanges and queues. It supports multiple exchange types, each suited for different messaging scenarios.

Exchange TypeRoutingUse Case
DirectExact routing key matchTask distribution, log levels
TopicWildcard pattern match on routing keyMulti-tenant events, geo routing
FanoutBroadcast to all bound queuesNotifications, cache invalidation
HeadersMatch on message header attributesComplex routing, content-type routing
// RabbitMQ with amqplib — Topic exchange + Dead Letter Queue
import amqp from "amqplib";

async function setupRabbitMQ() {
  const connection = await amqp.connect("amqp://localhost:5672");
  const channel = await connection.createChannel();

  // Dead letter exchange for failed messages
  await channel.assertExchange("dlx.events", "fanout", { durable: true });
  await channel.assertQueue("dlq.events", {
    durable: true,
    arguments: { "x-message-ttl": 86400000 }, // 24h retention
  });
  await channel.bindQueue("dlq.events", "dlx.events", "");

  // Main topic exchange
  await channel.assertExchange("domain.events", "topic", { durable: true });

  // Order events queue with DLQ
  await channel.assertQueue("order.processing", {
    durable: true,
    arguments: {
      "x-dead-letter-exchange": "dlx.events",
      "x-dead-letter-routing-key": "order.failed",
      "x-max-retries": 3,
    },
  });

  // Bind queue with topic pattern
  // order.* matches order.placed, order.shipped, etc.
  await channel.bindQueue("order.processing", "domain.events", "order.*");

  // Publish an event
  channel.publish(
    "domain.events",
    "order.placed",
    Buffer.from(JSON.stringify({
      eventType: "OrderPlaced",
      orderId: "order-12345",
      timestamp: new Date().toISOString(),
    })),
    {
      persistent: true,
      messageId: "msg-uuid-123",
      contentType: "application/json",
      headers: { "x-retry-count": 0 },
    }
  );

  // Consumer with manual acknowledgment
  await channel.prefetch(10); // Process 10 messages at a time
  await channel.consume("order.processing", async (msg) => {
    if (!msg) return;
    try {
      const event = JSON.parse(msg.content.toString());
      await processOrderEvent(event);
      channel.ack(msg); // Acknowledge success
    } catch (err) {
      const retryCount = (msg.properties.headers?.["x-retry-count"] || 0) + 1;
      if (retryCount >= 3) {
        channel.nack(msg, false, false); // Send to DLQ
      } else {
        channel.nack(msg, false, true);   // Requeue for retry
      }
    }
  });
}

4. Event Sourcing

Event sourcing stores state as a sequence of immutable events rather than just the current state. Current state is derived by replaying events in order. This provides a complete audit trail, time-travel query capability, and the ability to build multiple read-optimized views from the event stream.

Event Store Implementation

// Event Store — append-only with optimistic concurrency
interface StoredEvent {
  streamId: string;
  version: number;
  eventType: string;
  payload: Record<string, unknown>;
  metadata: Record<string, string>;
  timestamp: Date;
}

class EventStore {
  constructor(private db: Database) {}

  async appendToStream(
    streamId: string,
    events: StoredEvent[],
    expectedVersion: number
  ): Promise<void> {
    const currentVersion = await this.getStreamVersion(streamId);

    // Optimistic concurrency check
    if (currentVersion !== expectedVersion) {
      throw new ConcurrencyError(
        `Expected version \${expectedVersion}, got \${currentVersion}`
      );
    }

    const transaction = await this.db.beginTransaction();
    try {
      for (let i = 0; i < events.length; i++) {
        await transaction.execute(
          `INSERT INTO events (stream_id, version, event_type, payload, metadata, timestamp)
           VALUES (?, ?, ?, ?, ?, ?)`,
          [
            streamId,
            expectedVersion + i + 1,
            events[i].eventType,
            JSON.stringify(events[i].payload),
            JSON.stringify(events[i].metadata),
            events[i].timestamp,
          ]
        );
      }
      await transaction.commit();
    } catch (err) {
      await transaction.rollback();
      throw err;
    }
  }

  async readStream(streamId: string, fromVersion = 0): Promise<StoredEvent[]> {
    return this.db.query(
      `SELECT * FROM events
       WHERE stream_id = ? AND version > ?
       ORDER BY version ASC`,
      [streamId, fromVersion]
    );
  }
}

Projections and Snapshots

// Aggregate with snapshot support
class OrderAggregate {
  private state: OrderState = { status: "draft", items: [], total: 0 };
  private version = 0;
  private uncommittedEvents: StoredEvent[] = [];

  // Rebuild from events with snapshot optimization
  static async load(
    eventStore: EventStore,
    snapshotStore: SnapshotStore,
    orderId: string
  ): Promise<OrderAggregate> {
    const aggregate = new OrderAggregate();

    // Try loading from snapshot first
    const snapshot = await snapshotStore.get(orderId);
    if (snapshot) {
      aggregate.state = snapshot.state;
      aggregate.version = snapshot.version;
    }

    // Replay events after the snapshot
    const events = await eventStore.readStream(orderId, aggregate.version);
    for (const event of events) {
      aggregate.apply(event, false);
    }

    // Take a new snapshot every 100 events
    if (events.length > 100) {
      await snapshotStore.save(orderId, aggregate.state, aggregate.version);
    }

    return aggregate;
  }

  placeOrder(customerId: string, items: OrderItem[]): void {
    if (this.state.status !== "draft") {
      throw new Error("Order already placed");
    }
    this.apply({
      eventType: "OrderPlaced",
      payload: { customerId, items, total: this.calculateTotal(items) },
    } as StoredEvent, true);
  }

  private apply(event: StoredEvent, isNew: boolean): void {
    // State mutation based on event type
    switch (event.eventType) {
      case "OrderPlaced":
        this.state.status = "placed";
        this.state.items = event.payload.items as OrderItem[];
        this.state.total = event.payload.total as number;
        break;
      case "OrderShipped":
        this.state.status = "shipped";
        break;
      case "OrderCancelled":
        this.state.status = "cancelled";
        break;
    }
    this.version++;
    if (isNew) this.uncommittedEvents.push(event);
  }
}

Projections (Read Models)

// Projection — build read-optimized views from events
class OrderSummaryProjection {
  constructor(private readDb: Database) {}

  async handle(event: StoredEvent): Promise<void> {
    switch (event.eventType) {
      case "OrderPlaced":
        await this.readDb.execute(
          `INSERT INTO order_summaries
           (order_id, customer_id, status, total, item_count, placed_at)
           VALUES (?, ?, ?, ?, ?, ?)`,
          [
            event.streamId,
            event.payload.customerId,
            "placed",
            event.payload.total,
            (event.payload.items as unknown[]).length,
            event.timestamp,
          ]
        );
        break;

      case "OrderShipped":
        await this.readDb.execute(
          `UPDATE order_summaries
           SET status = ?, shipped_at = ?
           WHERE order_id = ?`,
          ["shipped", event.timestamp, event.streamId]
        );
        break;

      case "OrderCancelled":
        await this.readDb.execute(
          `UPDATE order_summaries
           SET status = ?, cancelled_at = ?
           WHERE order_id = ?`,
          ["cancelled", event.timestamp, event.streamId]
        );
        break;
    }
  }
}

5. CQRS Implementation

CQRS (Command Query Responsibility Segregation) separates write operations (commands) and read operations (queries) into distinct models. The write model is optimized for data consistency and business rule validation, while the read model is optimized for query performance. The two models are synchronized via events.

// CQRS — Command side
interface Command {
  type: string;
  payload: Record<string, unknown>;
  metadata: { userId: string; correlationId: string };
}

class CommandBus {
  private handlers = new Map<string, CommandHandler>();

  register(commandType: string, handler: CommandHandler): void {
    this.handlers.set(commandType, handler);
  }

  async dispatch(command: Command): Promise<void> {
    const handler = this.handlers.get(command.type);
    if (!handler) throw new Error(`No handler for: \${command.type}`);
    await handler.execute(command);
  }
}

class PlaceOrderHandler implements CommandHandler {
  constructor(
    private eventStore: EventStore,
    private snapshotStore: SnapshotStore
  ) {}

  async execute(command: Command): Promise<void> {
    const { customerId, items } = command.payload;

    // Load aggregate from event store
    const order = await OrderAggregate.load(
      this.eventStore, this.snapshotStore, command.payload.orderId as string
    );

    // Execute business logic (may throw if invalid)
    order.placeOrder(customerId as string, items as OrderItem[]);

    // Persist new events
    await this.eventStore.appendToStream(
      command.payload.orderId as string,
      order.getUncommittedEvents(),
      order.getVersion()
    );
  }
}
// CQRS — Query side
interface Query {
  type: string;
  params: Record<string, unknown>;
}

class QueryBus {
  private handlers = new Map<string, QueryHandler>();

  register(queryType: string, handler: QueryHandler): void {
    this.handlers.set(queryType, handler);
  }

  async dispatch<T>(query: Query): Promise<T> {
    const handler = this.handlers.get(query.type);
    if (!handler) throw new Error(`No handler for: \${query.type}`);
    return handler.execute(query) as Promise<T>;
  }
}

class GetOrderSummaryHandler implements QueryHandler {
  constructor(private readDb: Database) {}

  async execute(query: Query): Promise<OrderSummary> {
    // Query the read-optimized projection table
    const result = await this.readDb.queryOne(
      `SELECT order_id, customer_id, status, total, item_count,
              placed_at, shipped_at, cancelled_at
       FROM order_summaries WHERE order_id = ?`,
      [query.params.orderId]
    );
    if (!result) throw new NotFoundError("Order not found");
    return result;
  }
}
Tip: CQRS does not require event sourcing. You can use a regular database as the write model and sync changes to read models via change data capture (CDC) or domain events. Start with simple CQRS when system complexity is low.

6. Saga Pattern

The saga pattern manages distributed transactions across multiple services. Each saga step is a local transaction paired with a compensating action for rollback. Sagas come in two flavors: orchestration (central coordinator) and choreography (event-driven).

AspectOrchestrationChoreography
CoordinationCentral orchestrator directs all stepsEach service listens for events and decides
CouplingOrchestrator knows all stepsServices only know their own events
VisibilityCentralized state trackingDistributed, needs tracing tools
Best ForComplex multi-step workflows (5+ steps)Simple flows (2-4 steps)

Orchestration Saga Implementation

// Orchestration Saga — Order fulfillment
type SagaStep = {
  name: string;
  execute: (context: SagaContext) => Promise<void>;
  compensate: (context: SagaContext) => Promise<void>;
};

class OrderSaga {
  private steps: SagaStep[] = [
    {
      name: "ReserveInventory",
      execute: async (ctx) => {
        ctx.inventoryReservationId = await inventoryService.reserve(
          ctx.orderId, ctx.items
        );
      },
      compensate: async (ctx) => {
        await inventoryService.cancelReservation(ctx.inventoryReservationId);
      },
    },
    {
      name: "ProcessPayment",
      execute: async (ctx) => {
        ctx.paymentId = await paymentService.charge(
          ctx.customerId, ctx.totalAmount
        );
      },
      compensate: async (ctx) => {
        await paymentService.refund(ctx.paymentId);
      },
    },
    {
      name: "ArrangeShipping",
      execute: async (ctx) => {
        ctx.shipmentId = await shippingService.createShipment(
          ctx.orderId, ctx.shippingAddress
        );
      },
      compensate: async (ctx) => {
        await shippingService.cancelShipment(ctx.shipmentId);
      },
    },
  ];

  async run(context: SagaContext): Promise<void> {
    const completedSteps: SagaStep[] = [];

    for (const step of this.steps) {
      try {
        console.log(`Executing step: \${step.name}`);
        await step.execute(context);
        completedSteps.push(step);
      } catch (error) {
        console.error(`Step \${step.name} failed:`, error);
        // Compensate in reverse order
        for (const completed of completedSteps.reverse()) {
          try {
            console.log(`Compensating step: \${completed.name}`);
            await completed.compensate(context);
          } catch (compError) {
            console.error(`Compensation failed for \${completed.name}:`, compError);
            // Log to dead letter queue for manual intervention
            await deadLetterQueue.publish({
              sagaId: context.sagaId,
              failedStep: completed.name,
              error: compError,
            });
          }
        }
        throw new SagaFailedError(step.name, error);
      }
    }
  }
}

7. Domain Events in DDD

In Domain-Driven Design, domain events represent meaningful state changes in the business domain. Aggregate roots produce domain events after executing commands, and event handlers respond to these events within the same bounded context or across bounded contexts, achieving eventual consistency.

// Domain Events in DDD — Aggregate producing events
abstract class AggregateRoot {
  private domainEvents: DomainEvent[] = [];
  protected version = 0;

  protected addDomainEvent(event: DomainEvent): void {
    this.domainEvents.push(event);
  }

  getDomainEvents(): DomainEvent[] {
    return [...this.domainEvents];
  }

  clearDomainEvents(): void {
    this.domainEvents = [];
  }
}

class Order extends AggregateRoot {
  private status: OrderStatus = "draft";
  private items: OrderItem[] = [];

  place(customerId: string, items: OrderItem[]): void {
    // Business rule validation
    if (items.length === 0) throw new Error("Order must have items");
    if (this.status !== "draft") throw new Error("Order already placed");

    this.status = "placed";
    this.items = items;

    this.addDomainEvent({
      eventId: generateUUID(),
      eventType: "OrderPlaced",
      aggregateId: this.id,
      aggregateType: "Order",
      timestamp: new Date().toISOString(),
      version: ++this.version,
      payload: { customerId, items, total: this.calculateTotal() },
      metadata: { correlationId: generateUUID(), causationId: "" },
    });
  }

  cancel(reason: string): void {
    if (this.status === "shipped") {
      throw new Error("Cannot cancel shipped order");
    }
    this.status = "cancelled";

    this.addDomainEvent({
      eventId: generateUUID(),
      eventType: "OrderCancelled",
      aggregateId: this.id,
      aggregateType: "Order",
      timestamp: new Date().toISOString(),
      version: ++this.version,
      payload: { reason },
      metadata: { correlationId: generateUUID(), causationId: "" },
    });
  }
}

// Event dispatcher — publish domain events after persistence
class DomainEventDispatcher {
  private handlers = new Map<string, DomainEventHandler[]>();

  subscribe(eventType: string, handler: DomainEventHandler): void {
    const existing = this.handlers.get(eventType) || [];
    this.handlers.set(eventType, [...existing, handler]);
  }

  async dispatch(events: DomainEvent[]): Promise<void> {
    for (const event of events) {
      const handlers = this.handlers.get(event.eventType) || [];
      await Promise.all(
        handlers.map(h => h.handle(event))
      );
    }
  }
}

8. Async Messaging Patterns

Async messaging patterns define how services exchange messages. Choosing the right pattern depends on message semantics (event vs command), number of consumers, and delivery guarantee requirements.

PatternDescriptionConsumersUse Case
Pub/SubOne-to-many broadcast, all subscribers receive a copyMultipleEvent notification, cache invalidation
Point-to-PointOne-to-one, message processed by exactly one consumerOneTask queues, command processing
Request-ReplyAsync request-response via temporary reply queueOneAsync RPC, long operations
Competing ConsumersMultiple consumers compete for messages from one queueMultiple (competing)Load balancing, horizontal scaling
// Request-Reply pattern with correlation IDs
class AsyncRequestReply {
  private pendingRequests = new Map<string, {
    resolve: (value: unknown) => void;
    reject: (reason: unknown) => void;
    timer: NodeJS.Timeout;
  }>();

  constructor(
    private channel: MessageChannel,
    private replyQueue: string,
    private timeoutMs = 30000
  ) {
    // Listen for replies on the dedicated reply queue
    this.channel.consume(this.replyQueue, (msg) => {
      if (!msg) return;
      const correlationId = msg.properties.correlationId;
      const pending = this.pendingRequests.get(correlationId);

      if (pending) {
        clearTimeout(pending.timer);
        this.pendingRequests.delete(correlationId);
        const reply = JSON.parse(msg.content.toString());
        if (reply.error) {
          pending.reject(new Error(reply.error));
        } else {
          pending.resolve(reply.data);
        }
      }
      this.channel.ack(msg);
    });
  }

  async request(queue: string, payload: unknown): Promise<unknown> {
    const correlationId = generateUUID();

    return new Promise((resolve, reject) => {
      const timer = setTimeout(() => {
        this.pendingRequests.delete(correlationId);
        reject(new Error("Request timed out"));
      }, this.timeoutMs);

      this.pendingRequests.set(correlationId, { resolve, reject, timer });

      this.channel.sendToQueue(queue, Buffer.from(JSON.stringify(payload)), {
        correlationId,
        replyTo: this.replyQueue,
        expiration: String(this.timeoutMs),
      });
    });
  }
}

9. Event Schema Evolution

In long-lived event-driven systems, event schemas inevitably need to evolve. The key to schema evolution is maintaining backward compatibility, ensuring new consumers can read old events and old consumers are not broken by new events.

Compatibility TypeDescriptionRules
BackwardNew consumers can read old eventsNew fields must have defaults
ForwardOld consumers can read new eventsNo field removal, no type changes
FullBoth forward and backward compatibleOnly add optional fields with defaults
// Avro schema evolution example
// Version 1 — Original OrderPlaced schema
const orderPlacedV1 = {
  type: "record",
  name: "OrderPlaced",
  namespace: "com.example.events",
  fields: [
    { name: "orderId", type: "string" },
    { name: "customerId", type: "string" },
    { name: "totalAmount", type: "double" },
    { name: "currency", type: "string" },
    { name: "timestamp", type: "string" },
  ],
};

// Version 2 — Added optional fields (backward compatible)
const orderPlacedV2 = {
  type: "record",
  name: "OrderPlaced",
  namespace: "com.example.events",
  fields: [
    { name: "orderId", type: "string" },
    { name: "customerId", type: "string" },
    { name: "totalAmount", type: "double" },
    { name: "currency", type: "string" },
    { name: "timestamp", type: "string" },
    // New optional fields with defaults — backward compatible
    { name: "discountCode", type: ["null", "string"], default: null },
    { name: "priority", type: "string", default: "normal" },
    { name: "channel", type: "string", default: "web" },
  ],
};

// Schema Registry client
import { SchemaRegistry } from "@kafkajs/confluent-schema-registry";

const registry = new SchemaRegistry({
  host: "http://schema-registry:8081",
});

// Register schema with compatibility check
async function registerSchema() {
  const { id } = await registry.register({
    type: "AVRO",
    schema: JSON.stringify(orderPlacedV2),
  }, {
    subject: "orders.events-value",
  });
  console.log("Registered schema with id:", id);
}

// Produce with schema validation
async function produceWithSchema(event: OrderPlacedEvent) {
  const schemaId = await registry.getLatestSchemaId("orders.events-value");
  const encodedValue = await registry.encode(schemaId, event);

  await producer.send({
    topic: "orders.events",
    messages: [{ key: event.orderId, value: encodedValue }],
  });
}

10. Serverless Event Processing

Serverless platforms are a natural fit for event-driven architecture because functions are inherently event handlers. AWS Lambda + EventBridge and Azure Functions + Event Grid are two mainstream serverless event processing solutions.

// AWS Lambda + EventBridge — Order event processing
// serverless.yml (Serverless Framework)
// service: order-processor
// provider:
//   name: aws
//   runtime: nodejs20.x
// functions:
//   processOrder:
//     handler: handler.processOrder
//     events:
//       - eventBridge:
//           pattern:
//             source: ["order-service"]
//             detail-type: ["OrderPlaced", "OrderCancelled"]

import { EventBridgeEvent, Context } from "aws-lambda";
import { EventBridgeClient, PutEventsCommand } from "@aws-sdk/client-eventbridge";

const eventBridge = new EventBridgeClient({});

// Producer — publish event to EventBridge
export async function publishOrderEvent(order: Order): Promise<void> {
  await eventBridge.send(new PutEventsCommand({
    Entries: [{
      Source: "order-service",
      DetailType: "OrderPlaced",
      Detail: JSON.stringify({
        orderId: order.id,
        customerId: order.customerId,
        totalAmount: order.total,
        items: order.items,
      }),
      EventBusName: "orders-bus",
    }],
  }));
}

// Consumer — Lambda handler triggered by EventBridge
export async function processOrder(
  event: EventBridgeEvent<"OrderPlaced", OrderPayload>,
  context: Context
): Promise<void> {
  const { orderId, customerId, totalAmount } = event.detail;

  console.log(
    `Processing \${event["detail-type"]} for order \${orderId}`,
    `requestId=\${context.awsRequestId}`
  );

  // Idempotency check — prevent duplicate processing
  const alreadyProcessed = await checkIdempotencyKey(orderId);
  if (alreadyProcessed) {
    console.log(`Order \${orderId} already processed, skipping`);
    return;
  }

  await reserveInventory(orderId);
  await sendConfirmationEmail(customerId, orderId);
  await storeIdempotencyKey(orderId);

  // Chain next event
  await eventBridge.send(new PutEventsCommand({
    Entries: [{
      Source: "fulfillment-service",
      DetailType: "InventoryReserved",
      Detail: JSON.stringify({ orderId, customerId }),
      EventBusName: "orders-bus",
    }],
  }));
}
Tip: Lambda functions may be retried, so idempotency is essential. Use DynamoDB conditional writes or Redis SET NX for idempotency keys, ensuring the same event is not processed twice.

11. Stream Processing

Stream processing engines like Kafka Streams and Apache Flink perform real-time stateful computations over unbounded event streams, including windowed aggregations, stream-stream joins, and complex event processing.

Kafka Streams Example

// Kafka Streams topology — Java
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;

public class OrderAnalyticsTopology {

  public static Topology build() {
    StreamsBuilder builder = new StreamsBuilder();

    // Source: read order events
    KStream<String, OrderEvent> orders = builder
      .stream("orders.events",
        Consumed.with(Serdes.String(), orderEventSerde));

    // Windowed aggregation: order count per customer per hour
    KTable<Windowed<String>, Long> hourlyOrderCounts = orders
      .filter((key, event) -> "OrderPlaced".equals(event.getType()))
      .selectKey((key, event) -> event.getCustomerId())
      .groupByKey()
      .windowedBy(
        TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1))
      )
      .count(Materialized.as("hourly-order-counts"));

    // Real-time revenue per product
    KTable<String, Double> revenueByProduct = orders
      .filter((key, event) -> "OrderPlaced".equals(event.getType()))
      .flatMapValues(event -> event.getItems())
      .selectKey((key, item) -> item.getProductId())
      .groupByKey()
      .aggregate(
        () -> 0.0,
        (productId, item, total) -> total + item.getPrice() * item.getQuantity(),
        Materialized.as("revenue-by-product")
      );

    // Stream-stream join: match orders with payments within 5 min
    KStream<String, PaymentEvent> payments = builder
      .stream("payments.events",
        Consumed.with(Serdes.String(), paymentEventSerde));

    KStream<String, OrderWithPayment> matched = orders
      .join(
        payments,
        (order, payment) -> new OrderWithPayment(order, payment),
        JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
        StreamJoined.with(Serdes.String(), orderEventSerde, paymentEventSerde)
      );

    matched.to("orders.matched",
      Produced.with(Serdes.String(), orderWithPaymentSerde));

    return builder.build();
  }
}

Apache Flink Windowing Example

// Apache Flink — Python (PyFlink) windowed aggregation
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.time import Time
from pyflink.common.watermark_strategy import WatermarkStrategy
import json

env = StreamExecutionEnvironment.get_execution_environment()

# Configure Kafka source
kafka_source = KafkaSource.builder() \
    .set_bootstrap_servers("kafka:9092") \
    .set_topics("orders.events") \
    .set_group_id("flink-analytics") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .build()

orders = env.from_source(
    kafka_source,
    WatermarkStrategy.for_bounded_out_of_orderness(Time.seconds(5)),
    "OrderEvents"
)

# Tumbling window: revenue per minute
revenue_per_minute = orders \
    .map(lambda raw: json.loads(raw)) \
    .filter(lambda e: e["eventType"] == "OrderPlaced") \
    .key_by(lambda e: "global") \
    .window(TumblingEventTimeWindows.of(Time.minutes(1))) \
    .reduce(lambda a, b: {
        "totalRevenue": a["totalRevenue"] + b["payload"]["totalAmount"],
        "orderCount": a["orderCount"] + 1,
    })

revenue_per_minute.print()
env.execute("Order Revenue Analytics")

12. Testing Event-Driven Systems

Testing event-driven systems requires special strategies: contract testing validates schema compatibility between producers and consumers, event replay testing verifies projection correctness, and chaos engineering tests system resilience under broker failures.

// Contract testing with embedded Kafka (Jest)
import { EmbeddedKafka } from "@testcontainers/kafka";

describe("OrderPlaced event contract", () => {
  let kafka: EmbeddedKafka;

  beforeAll(async () => {
    kafka = await new EmbeddedKafka().start();
  });

  afterAll(async () => {
    await kafka.stop();
  });

  test("producer publishes valid OrderPlaced event", async () => {
    const producer = kafka.producer();
    await producer.connect();

    // Produce event
    await producer.send({
      topic: "orders.events",
      messages: [{
        key: "order-123",
        value: JSON.stringify({
          eventType: "OrderPlaced",
          orderId: "order-123",
          customerId: "cust-456",
          totalAmount: 99.99,
          items: [{ productId: "p1", quantity: 2, price: 49.995 }],
          timestamp: new Date().toISOString(),
        }),
      }],
    });

    // Consume and validate schema
    const consumer = kafka.consumer({ groupId: "test-group" });
    await consumer.connect();
    await consumer.subscribe({ topics: ["orders.events"] });

    const received = await consumeOneMessage(consumer);
    const event = JSON.parse(received.value!.toString());

    // Contract assertions
    expect(event).toHaveProperty("eventType", "OrderPlaced");
    expect(event).toHaveProperty("orderId");
    expect(event).toHaveProperty("customerId");
    expect(event).toHaveProperty("totalAmount");
    expect(typeof event.totalAmount).toBe("number");
    expect(Array.isArray(event.items)).toBe(true);
    expect(event.items[0]).toHaveProperty("productId");
    expect(event.items[0]).toHaveProperty("quantity");
    expect(event.items[0]).toHaveProperty("price");
  });
});

// Event replay testing — verify projections
describe("OrderSummaryProjection", () => {
  test("rebuilds correct state from event sequence", async () => {
    const events: StoredEvent[] = [
      {
        streamId: "order-001",
        version: 1,
        eventType: "OrderPlaced",
        payload: {
          customerId: "cust-1",
          items: [{ productId: "p1", quantity: 2, price: 25 }],
          total: 50,
        },
        metadata: {},
        timestamp: new Date("2026-01-01"),
      },
      {
        streamId: "order-001",
        version: 2,
        eventType: "OrderShipped",
        payload: { trackingNumber: "TRACK-123" },
        metadata: {},
        timestamp: new Date("2026-01-02"),
      },
    ];

    const projection = new OrderSummaryProjection(testDb);
    for (const event of events) {
      await projection.handle(event);
    }

    const summary = await testDb.queryOne(
      "SELECT * FROM order_summaries WHERE order_id = ?",
      ["order-001"]
    );

    expect(summary.status).toBe("shipped");
    expect(summary.total).toBe(50);
    expect(summary.item_count).toBe(1);
  });
});

Chaos Engineering Tests

// Chaos engineering — simulate broker failures
describe("Resilience under Kafka broker failure", () => {
  test("consumer recovers after broker restart", async () => {
    const kafka = await new KafkaContainer()
      .withExposedPorts(9092)
      .start();

    // Produce messages
    await produceTestMessages(kafka, 100);

    // Start consuming
    const consumed: string[] = [];
    const consumer = startTestConsumer(kafka, (msg) => {
      consumed.push(msg);
    });

    // Wait for partial consumption
    await waitUntil(() => consumed.length >= 50);

    // Simulate broker crash
    await kafka.stop();
    await delay(5000); // Broker is down for 5 seconds
    await kafka.start();

    // Verify consumer reconnects and processes remaining messages
    await waitUntil(() => consumed.length === 100, 30000);
    expect(consumed.length).toBe(100);

    // Verify no duplicates (idempotent processing)
    const uniqueIds = new Set(consumed.map(m => JSON.parse(m).eventId));
    expect(uniqueIds.size).toBe(100);
  });
});

13. Monitoring & Observability

Observability in event-driven systems revolves around three key dimensions: distributed tracing (request chains across services), consumer lag monitoring (message processing speed), and dead letter queue alerting (failed messages).

// Distributed tracing with OpenTelemetry
import { trace, context, propagation, SpanKind } from "@opentelemetry/api";
import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node";
import { JaegerExporter } from "@opentelemetry/exporter-jaeger";

const tracer = trace.getTracer("order-service", "1.0.0");

// Producer — inject trace context into message headers
async function produceWithTracing(
  topic: string,
  event: DomainEvent
): Promise<void> {
  const span = tracer.startSpan(`publish \${event.eventType}`, {
    kind: SpanKind.PRODUCER,
    attributes: {
      "messaging.system": "kafka",
      "messaging.destination": topic,
      "messaging.message_id": event.eventId,
      "event.type": event.eventType,
    },
  });

  // Inject trace context into Kafka headers
  const headers: Record<string, string> = {};
  propagation.inject(context.active(), headers);

  try {
    await producer.send({
      topic,
      messages: [{
        key: event.aggregateId,
        value: JSON.stringify(event),
        headers,
      }],
    });
    span.setStatus({ code: 0 }); // OK
  } catch (err) {
    span.setStatus({ code: 2, message: String(err) }); // ERROR
    throw err;
  } finally {
    span.end();
  }
}

// Consumer — extract trace context from message headers
async function consumeWithTracing(
  message: KafkaMessage
): Promise<void> {
  // Extract parent trace context from Kafka headers
  const parentCtx = propagation.extract(
    context.active(),
    message.headers
  );

  const span = tracer.startSpan(
    "process OrderPlaced",
    { kind: SpanKind.CONSUMER },
    parentCtx
  );

  return context.with(
    trace.setSpan(parentCtx, span),
    async () => {
      try {
        const event = JSON.parse(message.value!.toString());
        await processEvent(event);
        span.setStatus({ code: 0 });
      } catch (err) {
        span.setStatus({ code: 2, message: String(err) });
        throw err;
      } finally {
        span.end();
      }
    }
  );
}

Consumer Lag & DLQ Monitoring

// Consumer lag monitoring with Prometheus metrics
import { Counter, Gauge, Histogram } from "prom-client";

const consumerLag = new Gauge({
  name: "kafka_consumer_lag",
  help: "Consumer lag in messages",
  labelNames: ["topic", "partition", "consumer_group"],
});

const messagesProcessed = new Counter({
  name: "messages_processed_total",
  help: "Total messages processed",
  labelNames: ["topic", "event_type", "status"],
});

const processingDuration = new Histogram({
  name: "message_processing_duration_seconds",
  help: "Time to process a single message",
  labelNames: ["topic", "event_type"],
  buckets: [0.01, 0.05, 0.1, 0.5, 1, 5, 10],
});

const dlqMessages = new Counter({
  name: "dlq_messages_total",
  help: "Messages sent to dead letter queue",
  labelNames: ["topic", "error_type"],
});

// Monitor consumer lag periodically
async function monitorLag(admin: KafkaAdmin, groupId: string) {
  const offsets = await admin.fetchOffsets({ groupId });
  const topicOffsets = await admin.fetchTopicOffsets("orders.events");

  for (const partition of offsets) {
    const latest = topicOffsets.find(
      t => t.partition === partition.partition
    );
    if (latest) {
      const lag = parseInt(latest.offset) - parseInt(partition.offset);
      consumerLag.set(
        { topic: "orders.events", partition: String(partition.partition), consumer_group: groupId },
        lag
      );
    }
  }
}

// Alerting rules (Prometheus/Grafana)
// ALERT ConsumerLagHigh
//   IF kafka_consumer_lag > 10000
//   FOR 5m
//   LABELS { severity = "warning" }
//   ANNOTATIONS { summary = "Consumer lag exceeding 10K messages" }
//
// ALERT DLQMessagesIncreasing
//   IF rate(dlq_messages_total[5m]) > 0
//   FOR 1m
//   LABELS { severity = "critical" }
//   ANNOTATIONS { summary = "Messages being sent to DLQ" }

Technology Comparison

DimensionKafkaRabbitMQAWS EventBridge
ThroughputVery high (millions/sec)High (tens of thousands/sec)Moderate (quota-limited)
Message RetentionConfigurable (days/infinite)Deleted after consumption24h replay
Message ReplayNative (offset seeking)Not supportedLimited (event archive)
Routing FlexibilityKey-based partitioningVery high (exchanges + bindings)Rule-based content routing
Ops ComplexityHigh (ZooKeeper/KRaft)ModerateLow (fully managed)
Best ForEvent sourcing, streaming, data pipelinesTask queues, RPC, complex routingServerless event routing, AWS integration

Frequently Asked Questions

What is event-driven architecture and when should I use it?

Event-driven architecture (EDA) is a software design pattern where state changes are captured as immutable events and propagated asynchronously between loosely coupled services. Use EDA when you need temporal decoupling between producers and consumers, high throughput and scalability, real-time data processing, audit trails, or when multiple downstream services need to react to the same business event independently.

What is the difference between Apache Kafka and RabbitMQ?

Kafka is a distributed log designed for high-throughput event streaming with persistent storage, consumer groups, and replay capability. RabbitMQ is a traditional message broker optimized for flexible routing, message acknowledgment, and complex exchange patterns. Use Kafka for event sourcing, stream processing, and high-volume data pipelines. Use RabbitMQ for task queues, RPC-style communication, and scenarios requiring complex routing logic with exchanges.

What is event sourcing and how does it differ from traditional CRUD?

Event sourcing stores every state change as an immutable event in an append-only event store, rather than overwriting the current state in a database. The current state is derived by replaying events. This provides a complete audit trail, enables temporal queries, supports event replay for debugging, and allows building multiple read-optimized projections. Traditional CRUD only stores the latest state and loses change history.

What is CQRS and why combine it with event sourcing?

CQRS (Command Query Responsibility Segregation) separates write operations (commands) from read operations (queries) into distinct models. When combined with event sourcing, commands produce events that are stored in the event store, and projections consume these events to build read-optimized views. This allows independent scaling of reads and writes, optimized query models, and eventual consistency between write and read sides.

What is the saga pattern for distributed transactions?

The saga pattern manages distributed transactions across multiple microservices by breaking them into a sequence of local transactions, each paired with a compensating action for rollback. Orchestration uses a central coordinator to direct the saga steps. Choreography uses events where each service listens for events and triggers the next step. Use orchestration for complex workflows with many steps, and choreography for simpler flows with fewer services.

How do I handle schema evolution in event-driven systems?

Use a schema registry (Confluent Schema Registry) with serialization formats like Avro or Protobuf that support schema evolution. Follow compatibility rules: backward compatibility allows new consumers to read old events, forward compatibility allows old consumers to read new events. Always add optional fields with defaults, never remove required fields, and version your event schemas. Use the schema registry to enforce compatibility checks at publish time.

How do I test event-driven systems effectively?

Use contract testing to verify event schemas between producers and consumers. Use embedded brokers (EmbeddedKafka, TestContainers) for integration tests. Implement event replay testing by storing known event sequences and verifying projections produce correct state. Use consumer-driven contract testing with tools like Pact. For resilience, apply chaos engineering to simulate broker failures, network partitions, and consumer lag.

How do I monitor and observe event-driven architectures?

Monitor consumer lag (difference between latest offset and consumer offset) as the primary health indicator. Use distributed tracing (OpenTelemetry) to propagate correlation IDs through event headers across service boundaries. Set up dead letter queue monitoring with alerts for poison messages. Track end-to-end latency from event production to final consumption. Use tools like Kafka Manager, Grafana with Prometheus exporters, and Jaeger for trace visualization.

Conclusion

Event-driven architecture is a powerful paradigm for building scalable, resilient, and loosely coupled distributed systems. Apache Kafka and RabbitMQ provide messaging infrastructure for different scenarios. Event sourcing and CQRS offer audit trails and performance optimization for complex business logic. The saga pattern addresses distributed transaction challenges. Schema evolution ensures long-term maintainability. Stream processing engines enable real-time analytics. Comprehensive testing and monitoring strategies ensure production reliability. Choose the right combination of patterns and tools for your specific needs, start simple, and introduce more patterns as complexity grows.

Golden Rule: Do not introduce complex patterns prematurely. Start with simple pub/sub and introduce event sourcing, CQRS, or sagas only when you encounter specific scalability or consistency challenges. Every additional pattern increases operational complexity.
𝕏 Twitterin LinkedIn
Var detta hjälpsamt?

Håll dig uppdaterad

Få veckovisa dev-tips och nya verktyg.

Ingen spam. Avsluta när som helst.

Try These Related Tools

{ }JSON FormatterGTGraphQL to TypeScriptY→YAML to JSON Converter

Related Articles

Microservices Patterns Guide: Saga, CQRS, Event Sourcing, Service Mesh & Domain-Driven Design

Complete microservices patterns guide covering Saga pattern, CQRS, event sourcing, service mesh with Istio, API gateway patterns, circuit breaker, distributed tracing, domain-driven design, and microservices testing strategies.

Apache Kafka Guide: Architecture, Producers, Consumers, Streams, Connect & Performance Tuning

Complete Apache Kafka guide covering architecture, producers, consumers, Kafka Streams, Kafka Connect, Schema Registry, exactly-once semantics, security, performance tuning, and monitoring with Prometheus.

DevOps Pipeline Guide: CI/CD, GitHub Actions, Docker, Infrastructure as Code & Deployment Strategies

Complete DevOps pipeline guide covering CI/CD fundamentals, GitHub Actions, GitLab CI, Docker multi-stage builds, Terraform, Pulumi, deployment strategies, secrets management, GitOps, and pipeline security.