Kafka Notes

Broker, ZooKeeper, and KRaft

A broker is a Kafka server that stores topic partitions, handles producer writes, and serves consumer reads. A Kafka cluster is simply a group of brokers. One broker per partition acts as the partition leader — it handles all reads and writes for that partition while the others replicate it as followers.

ZooKeeper (legacy — removed in Kafka 4.0) was the external coordination service Kafka relied on to store cluster metadata: which brokers are alive, who is the controller, and which replicas are in sync. One broker was elected Controller by racing to write a node in ZooKeeper; the Controller then managed partition leadership changes whenever brokers joined or left. The downside was an extra cluster to operate and a slow metadata path that went through ZooKeeper.

KRaft (production-ready since Kafka 3.3) replaced ZooKeeper by embedding the Raft consensus algorithm directly into Kafka. A small quorum of controller nodes (typically 3 or 5) elects an Active Controller among themselves and replicates all cluster metadata through an internal topic called __cluster_metadata. Brokers receive metadata updates as a stream from the Active Controller — no separate ZooKeeper process needed.


Producer

Replication Factor and In-Sync Replicas

Replication factor is the total number of copies Kafka keeps for each partition — one leader and the rest as followers. With a replication factor of 3, there is 1 leader + 2 followers.

In-Sync Replicas (ISR) is the subset of those replicas that are currently caught up with the leader. A replica stays in the ISR as long as it has fetched up to the leader's latest offset within the replica.lag.time.max.ms window (default 30 s). If a follower falls behind — due to network issues, GC pause, or broker slowness — it is removed from the ISR. When it catches up again, it is re-added.

min.insync.replicas (min.ISR) is the key knob that connects the two:

Typical production setup:

ConfigValueMeaning
replication.factor33 copies total
min.insync.replicas2At least 2 must acknowledge
acksallProducer waits for all ISR replicas

This tolerates 1 broker failure while maintaining strong durability. You never want min.insync.replicas == replication.factor in production — if any single replica is unavailable the partition becomes unwritable.

The relationship in plain terms:

replication.factor = 3   →  [Leader | Follower-1 | Follower-2]
ISR (all healthy)        →  [Leader | Follower-1 | Follower-2]  ✓ 3 in ISR

Follower-2 crashes:
ISR shrinks              →  [Leader | Follower-1]               ✓ 2 in ISR ≥ min.ISR (2), writes still allowed

Follower-1 also crashes:
ISR shrinks              →  [Leader]                            ✗ 1 in ISR < min.ISR (2), writes blocked

Acknowledgements (acks)

linger.ms and batch.size

Idempotent Producer

The Problem It Solves

Without idempotence, a transient network failure during a produce request can cause a duplicate:

How It Works Internally

Enable with enable.idempotence=true. Kafka automatically enforces: acks=all, retries=Integer.MAX_VALUE, max.in.flight.requests.per.connection ≤ 5.

Producer ID (PID): On first connect, the producer sends an InitProducerId request to the broker. The broker assigns a globally unique Producer ID (PID) and returns it. The PID identifies this producer session.

Sequence Numbers: The producer maintains a monotonically increasing sequence number per (PID, TopicPartition), starting at 0. Every record gets stamped with the current sequence number before it is sent. The sequence number is part of the message metadata on the wire.

Broker-side deduplication: The broker tracks the last committed sequence number for every (PID, partition) pair.

This means retries are safe — the producer resends the same batch with the same sequence numbers and the broker simply ignores the duplicate.

Internal Batching Behaviour

The producer client has two internal components:

RecordAccumulator — an in-memory buffer organised as a map of TopicPartition → Deque<RecordBatch>. send() calls are non-blocking: they append the record to the appropriate batch in the accumulator and return a Future.

Sender thread — a single background thread that drains the accumulator and dispatches batches to brokers over the network.

With idempotent producer the Sender adds two guarantees:

Application thread(s)         Sender thread           Broker
   send(record) ──►  RecordAccumulator  ──batch──►  seq check
                      [TP-A: [batch0]]              deduplicate
                      [TP-B: [batch1]]              commit & ack

                     on retry: requeue
                     batch at front with
                     same PID + seq

Why max.in.flight.requests.per.connection=5 is safe: With plain retries (no idempotence) having multiple in-flight batches means a retry of batch-1 could arrive after batch-2 has already been committed — messages reorder. With idempotence the broker rejects any batch that arrives with a sequence number that creates a gap, so reordering is caught at the broker. Five in-flight batches is a deliberate balance: enough parallelism for throughput, small enough to bound the sequence tracking window on the broker.


Consumer

Consumer Group and Partition Assignment

A consumer group is a set of consumer instances sharing a single group.id. Kafka guarantees that each partition of a topic is assigned to exactly one consumer within the group at any given time — this is the fundamental unit of parallelism.

Rebalance is triggered when a consumer joins, leaves, or crashes, or when topic partitions change. During a rebalance all consumption pauses. The Sticky Assignor minimises disruption by keeping existing assignments unchanged and only moving partitions that have to move.

Message ordering is guaranteed within a partition — consumers receive records in the exact order they were produced to that partition. There is no ordering guarantee across partitions. Since each partition is assigned to exactly one consumer in a group, that consumer processes its partition's records in offset order.

Batch Consumer

Consumers do not pull one record at a time — poll(duration) returns a batch of up to max.poll.records records (default 500) in a single call.

while (true) {
    ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<K, V> record : records) {
        process(record);
    }
    consumer.commitSync();
}

Key configs:

ConfigDefaultPurpose
max.poll.records500Max records returned per poll() call
fetch.min.bytes1Broker waits until this many bytes are available before responding
fetch.max.wait.ms500 msMax time broker will wait to satisfy fetch.min.bytes
max.partition.fetch.bytes1 MBMax bytes fetched per partition per request

Tuning fetch.min.bytes and fetch.max.wait.ms together controls the latency-throughput tradeoff on the broker side: higher fetch.min.bytes means larger batches but more latency before the first record arrives.

Manual Acknowledgement

By default Kafka auto-commits offsets in the background every auto.commit.interval.ms (default 5 s). This can cause at-most-once semantics: if the consumer crashes after auto-commit but before finishing processing, records are skipped.

Set enable.auto.commit=false and commit explicitly for at-least-once guarantees.

Common pattern: use commitAsync() in the hot path and commitSync() only in the finally block on shutdown to ensure the last offsets are flushed.

try {
    while (running) {
        var records = consumer.poll(Duration.ofMillis(100));
        process(records);
        consumer.commitAsync((offsets, ex) -> { if (ex != null) log.warn(...); });
    }
} finally {
    consumer.commitSync();   // best-effort flush on shutdown
    consumer.close();
}

Nack / Retry (Single-Record Failure)

Kafka has no native negative-acknowledgement. When a single record fails processing:

Option 2 (seek back) is simple but blocks the partition — all records after the failed one are stuck until the failure is resolved. Option 3 (retry topic) is preferred for production as it decouples failure handling from the main consumer's throughput.

Batch Acknowledge

When processing records as a batch (e.g., bulk-inserting into a database), commit only after the entire batch succeeds:

var records = consumer.poll(Duration.ofMillis(100));
bulkInsert(records);      // if this throws, do NOT commit
consumer.commitSync();    // commit only on success

This gives at-least-once semantics at batch granularity. If the bulk insert fails partway through, the whole batch is reprocessed on the next poll — so bulkInsert should be idempotent (e.g., use upserts or de-duplicate on the database side).

To commit at record granularity within the batch, use commitSync(Map<TopicPartition, OffsetAndMetadata>) with a precise offset map rather than committing all assigned partitions at once.

Heartbeat Thread and Liveness Detection

The consumer runs a dedicated background heartbeat thread separate from the application thread that calls poll(). Liveness has two independent signals:

Heartbeat thread    ──heartbeat──►  Group Coordinator  ← session.timeout.ms
Application thread  ──poll()──►     (same coordinator)  ← max.poll.interval.ms

Both timeouts must be satisfied to keep the consumer in the group. If max.poll.interval.ms is exceeded, the consumer voluntarily leaves the group and triggers a rebalance — even if the heartbeat thread is still healthy.

Tuning guidance: if record processing is genuinely slow (e.g., calling an external API per record), increase max.poll.interval.ms or reduce max.poll.records so each poll() returns fewer records to process within the window.

Non-Blocking Retry Pattern (Retry Topics)

The retry-topic pattern decouples failure handling from the main consumer loop, keeping throughput high on the happy path.

main-topic  ──►  [Consumer]  ──failure──►  topic.retry-1
                     │                          │
                   success              [Retry Consumer, delay=30s]
                     │                          │──failure──►  topic.retry-2
                  commit                        │                    │
                                             success        [Retry Consumer, delay=5m]
                                              commit               │──failure──►  topic.dlt

Spring Kafka's @RetryableTopic implements this pattern automatically with configurable backoff and DLT routing.

Circuit Breaker

When a downstream dependency (database, external API) is degraded, continuing to consume and fail records wastes resources and floods retry topics. A circuit breaker pauses consumption during outages.

States: Closed (normal) → Open (paused) → Half-Open (testing recovery).

Kafka-specific implementation:

// On repeated failures, open the circuit:
consumer.pause(consumer.assignment());   // stop fetching, stay in group

// poll() must still be called to send heartbeats and avoid max.poll.interval.ms breach:
while (circuitOpen) {
    consumer.poll(Duration.ofMillis(100));  // returns empty — partitions are paused
    if (dependencyHealthy()) {
        consumer.resume(consumer.assignment());
        circuitOpen = false;
    }
}

consumer.pause() stops the broker from returning records for those partitions but keeps the consumer in the group and heartbeating. This is critical — if you simply stop calling poll() you will breach max.poll.interval.ms and trigger a rebalance.

The circuit opens after N consecutive failures (configurable threshold), waits a cooldown period, then enters half-open state where a single test record is processed to confirm recovery before fully resuming.


Transactions

Kafka transactions give you exactly-once semantics (EOS) across produce and consume operations. Without understanding what the broker actually writes — and what the consumer actually reads — the guarantee is easy to break silently.

What a Transaction Actually Writes to a Partition

When a producer sends messages inside a transaction, Kafka does not immediately make those messages visible to consumers. The broker writes them to the partition log but marks them as part of an open transaction. Two types of special records are involved:

1. Regular data records — written with the producer's PID (Producer ID) and a transaction flag set in the record batch header. They sit in the log at their assigned offsets, but are invisible to read_committed consumers until the transaction commits.

2. Transaction markers (control records) — written by the broker after the producer calls commitTransaction() or abortTransaction(). A marker is a special log entry at a new offset that signals COMMIT or ABORT for a given PID.

Partition log (offsets):

offset 0  │ data record  (PID=42, txn=open)   ← written during transaction
offset 1  │ data record  (PID=42, txn=open)   ← written during transaction
offset 2  │ COMMIT marker (PID=42)             ← written by broker on commitTransaction()

Without the COMMIT marker at offset 2, offsets 0 and 1 are not considered committed data — they are in limbo. An ABORT marker instead would tell consumers to discard offsets 0 and 1 entirely.

The Transaction Coordinator and __transaction_state

Kafka has a dedicated internal component called the Transaction Coordinator — one per broker, selected by hashing the producer's transactional.id. It manages transaction lifecycle and persists state in the internal topic __transaction_state.

Transaction flow step by step:

Producer                        Transaction Coordinator           Partition Leaders
   │                                     │                              │
   ├─ initTransactions() ───────────────►│ assign PID + epoch           │
   │◄─ PID, epoch ───────────────────────┤                              │
   │                                     │                              │
   ├─ beginTransaction() [local only]    │                              │
   │                                     │                              │
   ├─ send(records) ─────────────────────┼──────────────────────────────► written, txn=open
   │                                     │                              │
   ├─ sendOffsetsToTransaction() ───────►│ record consumed offsets      │
   │                                     │  in __transaction_state      │
   │                                     │                              │
   ├─ commitTransaction() ──────────────►│ write COMMIT to              │
   │                                     │  __transaction_state         │
   │                                     ├─ write COMMIT marker ────────► offset 2: COMMIT(PID=42)
   │◄─ ack ──────────────────────────────┤                              │

The epoch prevents zombie producers: if a producer crashes and restarts with the same transactional.id, it gets a new higher epoch. The old epoch is fenced — any in-flight batches from the old instance are rejected by brokers.

isolation.level=read_committed — What It Does

This is a consumer-side config. It controls which records from the partition log the consumer is allowed to see.

read_uncommitted (default) — the consumer reads every record at every offset as soon as it lands in the log, regardless of transaction state. It sees records from committed transactions, in-flight (open) transactions, and aborted transactions alike.

read_committed — the consumer reads only records that are part of a committed transaction (or records written outside of any transaction). It does this by tracking the Last Stable Offset (LSO):

LSO = the offset up to which all transactions are resolved (committed or aborted)

Partition log:
offset 0  │ data (PID=42, txn=open)
offset 1  │ data (PID=42, txn=open)
offset 2  │ COMMIT marker (PID=42)   ← LSO advances past here once this is written
offset 3  │ data (PID=99, txn=open)  ← LSO stops here; txn still open
offset 4  │ data (PID=99, txn=open)

A read_committed consumer can consume up to and including offset 2 (the LSO). Offsets 3 and 4 are held back until PID=99's transaction resolves. Additionally, the broker sends an abort index alongside the fetch response — the consumer uses this to silently skip over records that belong to aborted transactions.

The consumer never sees the transaction markers themselves — those are internal control records filtered out by the client library.

Why read_uncommitted Makes Transactions Useless

Suppose a producer reads from topic A, transforms the records, and writes to topic B — all inside a transaction. The goal is: either all writes to B succeed atomically, or none are visible.

With read_uncommitted on the consumer of topic B:

t=0  Producer begins transaction
t=1  Producer writes record-1 to topic B  (offset 0, txn=open)
t=2  Producer writes record-2 to topic B  (offset 1, txn=open)
t=3  Consumer of B reads offset 0 → sees record-1   ← PROBLEM
t=4  Producer crashes before commitTransaction()
t=5  Transaction Coordinator writes ABORT marker
t=6  Consumer of B never reads offset 1

The consumer already processed record-1 at t=3 — before the transaction was resolved. The producer aborted, so record-1 was never meant to be visible. But the read_uncommitted consumer has no way to know that — it already committed the offset and acted on the data. The atomicity guarantee is completely broken.

With read_committed, the consumer at t=3 checks the LSO — offsets 0 and 1 are behind an open transaction, so the LSO has not advanced. The consumer's fetch returns nothing. At t=5, when the ABORT marker lands, the broker sends the abort index and the consumer skips both records. The partition appears empty — exactly the correct behaviour.

Exactly-Once Consume-Transform-Produce

The canonical EOS pattern in Kafka is:

[Source Topic] ──► Consumer (read_committed) ──► Transform ──► Producer (transactional) ──► [Sink Topic]

                                                          sendOffsetsToTransaction()
                                                          (commits consumed offsets
                                                           atomically with the produce)

sendOffsetsToTransaction() registers the consumer group's offsets as part of the current transaction. When the transaction commits, both the output records in the sink topic and the input offsets in __consumer_offsets are committed atomically. If the transaction aborts, neither is visible. This prevents the double-processing that would happen if the produce succeeded but the offset commit failed (or vice versa).

ComponentConfigEffect
Producertransactional.idEnables transactional API; assigns stable PID + epoch
Producerenable.idempotence=trueAutomatically enabled with transactional.id
Consumerisolation.level=read_committedOnly sees records from committed transactions
ConsumersendOffsetsToTransaction()Ties offset commit to the current transaction

Without isolation.level=read_committed on every consumer downstream of the transactional producer, the transaction boundary means nothing — consumers will read and act on data from transactions that may still abort.


Schema (Schema Registry)

Compatibility modes:

ModeMeaning
BackwardNew schema can read data written with the old schema. Upgrade consumers first.
ForwardOld schema can read data written with the new schema. Upgrade producers first.
Full (Transitive)Both backward and forward compatible.

Field changes and compatibility:


Avro — Binary Serialization and Payload Efficiency

Avro is a binary serialization format designed by Apache. In Kafka it is the most common choice for encoding messages because it significantly reduces wire size compared to JSON or XML.

Why Avro Saves Payload Size

1. Schema is stored separately — not in every message.

With the Confluent Schema Registry, the schema is registered once and assigned an integer ID. Each Kafka message contains only a compact 5-byte header:

[0x00]  [schema-id: 4 bytes]  [binary payload...]
 magic     registry pointer

Field names, types, and structure are never repeated in the message itself.

2. No key-value overhead.

JSON must embed field names as strings in every record:

{"userId": 12345, "name": "Alice", "active": true}

Avro writes only the values in the schema-defined order, binary-encoded — no keys, no quotes, no braces, no colons.

3. Compact binary encoding for values.

4. Concrete size comparison — a simple record: userId=1, name="Alice", active=true:

FormatEncoded size (approx.)
JSON~40 bytes
Avro binary (5-byte header + payload)~13 bytes

Roughly 60–70% smaller in practice for typical domain objects. Savings grow larger as the number of fields increases, because JSON repeats every field name while Avro never does.

Avro Schema Example

{
  "type": "record",
  "name": "User",
  "namespace": "com.example",
  "fields": [
    {"name": "userId", "type": "int"},
    {"name": "name",   "type": "string"},
    {"name": "active", "type": "boolean"}
  ]
}

How the Schema Registry Fits In

The schema is fetched once per unique schema ID and cached locally — subsequent deserialization is a pure in-memory lookup.