KPipe: A Modern, High-Performance Kafka Consumer in Java — Powered by Java 25 Features
Howdy!
If you’re building Kafka-based systems in Java and dealing with high-throughput pipelines, you’ve probably faced the usual pain points:
Multiple serialization/deserialization cycles per message (one per transformation step)
Thread pool exhaustion under load
Scattered retry logic and error handling
High GC pressure from object allocations in parallel processing
Enter KPipe — a fresh, functional, and blazing-fast Kafka consumer library that leverages modern Java to solve these issues elegantly.
Released in version 1.0.0 today (March 9, 2026), KPipe is built for the Java era of virtual threads and composable pipelines.
And the best part? It pairs perfectly with Java 25’s finalized Scoped Values (JEP 506) and preview Structured Concurrency (JEP 505, fifth preview) — two features from Project Loom that make concurrent code safer, cleaner, and more observable.
Let’s dive into what KPipe is, why it’s exciting, and walk through a practical example using Java 25.
What is KPipe?
KPipe is a modern Kafka consumer created by Mariano Gonzalez (@eschizoid). GitHub
Key highlights from the project:
Virtual threads (Project Loom) for massive concurrency with almost zero overhead
Composable functional pipelines: Define pure transformations that run in memory after a single deserialization
Single Ser/De cycle: Deserialize once → transform → serialize only at the sink (or skip serialization entirely)
Native support for JSON (via ultra-fast DslJson) and Avro (including Confluent Schema Registry wire format)
Built-in metrics, configurable retries, graceful shutdown, and gap-free offset management
Extensible via MessageProcessorRegistry and MessageSinkRegistry
JMH benchmarks included — shows better throughput and lower GC than traditional parallel consumers or Confluent’s Parallel Consumer
As of today: ~33 stars, actively maintained (latest commit ~March 8, 2026), Apache 2.0 license, requires Java 25+.
Why KPipe in 2026?
Compared to raw KafkaConsumer + custom parallelism, Spring Kafka, or Kafka Streams, KPipe gives you:
Cleaner, more testable code (pure functions)
Dramatically reduced GC and memory pressure
Natural scaling to thousands of concurrent records via virtual threads
Easy integration with Quarkus, Micronaut, Spring Boot (examples available in previous discussions)
Focus on real-world performance — single Ser/De + virtual threads = win
Java 25 + KPipe: Structured Concurrency & Scoped Values in Action
Java 25 (released September 2025 as an LTS) finalized Scoped Values — a lightweight, immutable, leak-free alternative to ThreadLocal — and kept Structured Concurrency in preview with major API improvements.
Scoped Values → propagate immutable context (e.g., trace IDs, tenant info) safely across virtual threads
StructuredTaskScope → treat related subtasks as one unit: automatic cancellation on failure, clean error propagation, better observability
Here’s a complete, runnable example that:
Processes JSON messages from Kafka
Enriches each message with parallel “external” lookups (simulated)
Propagates a request ID via Scoped Value
Uses Structured Concurrency for safe parallelism
// KPipeJava25Demo.java
// Compile & run with Java 25 preview features:
// javac --enable-preview --release 25 KPipeJava25Demo.java
// java --enable-preview KPipeJava25Demo
import kpipe.consumer.ConsumerRunner;
import kpipe.consumer.FunctionalConsumer;
import kpipe.config.KafkaConsumerConfig;
import kpipe.processor.MessageProcessorRegistry;
import kpipe.registry.MessageSinkRegistry;
import kpipe.sink.MessageSink;
import kpipe.OffsetManager;
import java.time.Duration;
import java.util.concurrent.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class KPipeJava25Demo {
// Scoped Value to propagate trace/correlation ID safely
private static final ScopedValue<String> REQUEST_ID = ScopedValue.newInstance();
public static void main(String[] args) throws Exception {
String bootstrap = "localhost:9092";
String group = "java25-kpipe-group";
String topic = "events-json";
var processorRegistry = new MessageProcessorRegistry("java25-demo");
// Custom operator: enrich message with parallel external calls
processorRegistry.registerJsonOperator("enrichWithExternal", jsonNode -> {
try (var scope = StructuredTaskScope.ShutdownOnFailure.newInstance()) {
// Fork lightweight subtasks on virtual threads
var nameTask = scope.fork(() -> fetchNameFromExternal());
var ageTask = scope.fork(() -> fetchAgeFromExternal());
scope.join(); // Wait for all
scope.throwIfFailed(); // Cancel others + propagate if any fails
((ObjectNode) jsonNode).put("enrichedName", nameTask.get());
((ObjectNode) jsonNode).put("enrichedAge", ageTask.get());
return jsonNode;
} catch (Exception e) {
throw new RuntimeException("Enrichment failed for request " + REQUEST_ID.get(), e);
}
});
// Define the pipeline: uppercase → enrich (parallel) → timestamp → source
var pipeline = processorRegistry.jsonPipeline(
"uppercase",
"enrichWithExternal", // ← Structured Concurrency magic
"addTimestamp",
"addSource"
);
var sinkRegistry = new MessageSinkRegistry();
MessageSink<byte[], byte[]> loggingSink = sinkRegistry.pipeline("logging"); // logs to console
var functionalConsumer = FunctionalConsumer.<byte[], byte[]>builder()
.withProperties(KafkaConsumerConfig.createConsumerConfig(bootstrap, group))
.withTopic(topic)
.withProcessor((key, value, ctx) -> {
// Generate and bind request ID via Scoped Value
String reqId = "req-" + System.currentTimeMillis() + "-" + Thread.currentThread().threadId();
return ScopedValue.where(REQUEST_ID, reqId).call(() -> {
System.out.println("Processing message - requestId: " + REQUEST_ID.get());
return pipeline.process(key, value, ctx);
});
})
.withMessageSink(loggingSink)
.withOffsetManagerProvider(consumer ->
OffsetManager.builder(consumer)
.withCommitInterval(Duration.ofSeconds(10))
.build())
.withMetrics(true)
.build();
var runner = ConsumerRunner.builder(functionalConsumer)
.withMetricsInterval(5000)
.withShutdownTimeout(15000)
.withShutdownHook(true)
.build();
runner.start();
Thread.currentThread().join(); // Keep running until interrupted
}
// Simulated external calls (replace with real HttpClient/DB in production)
private static String fetchNameFromExternal() throws InterruptedException {
Thread.sleep(300);
return "Alice - traced via " + REQUEST_ID.get();
}
private static int fetchAgeFromExternal() throws InterruptedException {
Thread.sleep(400);
return 42;
}
}What’s Happening Here?
ScopedValue propagates the request ID to all subtasks (even nested virtual threads) without ThreadLocal leaks.
StructuredTaskScope runs enrichments in parallel — if one fails, others cancel automatically, and errors bubble up cleanly.
KPipe handles the Kafka plumbing: single deserialization, functional transforms, metrics, offsets.
Everything scales naturally thanks to virtual threads.
Final Thoughts
KPipe feels like one of those “why didn’t this exist earlier?” libraries. It combines Kafka’s power with modern Java’s elegance: virtual threads for scale, functional pipelines for clarity, and now Java 25’s concurrency primitives for safety.
Whether you’re processing IoT events, logs at scale, or real-time data, give KPipe a spin — run the JMH benchmarks in the repo and see the difference.
What do you think? Have you tried similar approaches? Drop a comment!
Useful Links
- Java 25 JEPs:
Happy coding! 🚀

