9/29/2025

Using Avro with a Quarkus Application + Kafka + Schema Registry

 

Using Avro with a Quarkus Application + Kafka + Schema Registry (Product Example)

This guide walks you through building a Quarkus application that:

  • Sends and receives Kafka messages

  • Uses Avro for message serialization

  • Integrates with a Schema Registry

  • Works with a custom Product object


Prerequisites

You'll need:

  • Java 17+

  • Maven

  • Docker (for Kafka & Schema Registry)

  • Basic understanding of Quarkus, Kafka, and Avro


Architecture Overview

We’ll build a system with:

  • A REST API to send Product data to a Kafka topic

  • A Kafka consumer that reads Product messages

  • A REST API to stream consumed products using SSE (Server-Sent Events)

  • Avro used for data serialization

  • Schema stored and retrieved from a Schema Registry


1. Create the Quarkus Project

Generate a project with required extensions:

quarkus create app org.acme:product-avro-kafka \ --extension='rest-jackson,messaging-kafka,apicurio-registry-avro' \ --no-code cd product-avro-kafka

2. Define the Avro Schema

Create the Avro schema file product.avsc inside src/main/avro:

{ "namespace": "org.acme.kafka.quarkus", "type": "record", "name": "Product", "fields": [ { "name": "name", "type": "string" }, { "name": "price", "type": "double" } ] }

When you build the app, Quarkus automatically generates the Java class Product.java based on this schema.


3. Implement the Kafka Producer

Create ProductResource.java to expose a REST endpoint that sends product data to Kafka:

package org.acme.kafka; import org.acme.kafka.quarkus.Product; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; import org.jboss.logging.Logger; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.core.Response; @Path("/products") public class ProductResource { private static final Logger LOGGER = Logger.getLogger(ProductResource.class); @Channel("products") Emitter<Product> emitter; @POST public Response sendProduct(Product product) { LOGGER.infof("Sending product: %s - $%.2f", product.getName(), product.getPrice()); emitter.send(product); return Response.accepted().build(); } }

4. Kafka Configuration

In src/main/resources/application.properties, add:

# Kafka producer config mp.messaging.outgoing.products.connector=smallrye-kafka mp.messaging.outgoing.products.topic=products mp.messaging.outgoing.products.apicurio.registry.auto-register=true

If using Confluent Schema Registry, use quarkus-confluent-registry-avro extension instead and replace the apicurio property with:

mp.messaging.outgoing.products.auto.register.schemas=true

5. Implement the Kafka Consumer and SSE Streaming

Create a class ConsumedProductResource.java:

package org.acme.kafka; import jakarta.enterprise.context.ApplicationScoped; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; import org.acme.kafka.quarkus.Product; import org.eclipse.microprofile.reactive.messaging.Channel; import io.smallrye.mutiny.Multi; import org.jboss.resteasy.reactive.RestStreamElementType; @ApplicationScoped @Path("/consumed-products") public class ConsumedProductResource { @Channel("products-from-kafka") Multi<Product> products; @GET @Produces(MediaType.SERVER_SENT_EVENTS) @RestStreamElementType(MediaType.TEXT_PLAIN) public Multi<String> stream() { return products.map(p -> String.format("Product: %s - $%.2f", p.getName(), p.getPrice())); } }

6. Kafka Consumer Configuration

Add to application.properties:

# Kafka consumer config mp.messaging.incoming.products-from-kafka.connector=smallrye-kafka mp.messaging.incoming.products-from-kafka.topic=products mp.messaging.incoming.products-from-kafka.auto.offset.reset=earliest mp.messaging.incoming.products-from-kafka.enable.auto.commit=false

7. Running the Application in Dev Mode

Run the application:

./mvnw quarkus:dev

Thanks to Dev Services, Kafka and Apicurio Schema Registry are started automatically in dev mode. No extra Docker setup needed.

Test the endpoints:

  • Send a product:

curl -X POST -H "Content-Type: application/json" \ -d '{"name":"Laptop","price":999.99}' \ http://localhost:8080/products
  • View the streamed products:

curl -N http://localhost:8080/consumed-products

You’ll see output like:

Product: Laptop - $999.99

8. Running in Production (JVM or Native)

In production, you'll need to run Kafka and Schema Registry yourself (e.g., via Docker). Here's a minimal docker-compose.yml:

version: '2' services: zookeeper: image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0 ... kafka: image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0 ... schema-registry: image: apicurio/apicurio-registry-mem:2.4.2.Final ports: - "8081:8080" environment: - REGISTRY_STORAGE=mem

You will also need to configure these properties:

kafka.bootstrap.servers=localhost:9092 mp.messaging.connector.smallrye-kafka.schema.registry.url=http://localhost:8081/apis/registry/v2

9. Building the Application

For JVM build:

./mvnw package java -jar target/quarkus-app/quarkus-run.jar

For native build (requires GraalVM):

./mvnw package -Pnative ./target/product-avro-kafka-1.0.0-SNAPSHOT-runner

10. Summary

You've just built a Quarkus application that:

✅ Defines a Product Avro schema
✅ Generates Java code from .avsc files
✅ Produces and consumes Avro messages using Kafka
✅ Uses Apicurio (or Confluent) Schema Registry
✅ Leverages Quarkus Dev Services for quick setup
✅ Streams data with Server-Sent Events (SSE)

Niciun comentariu:

ANTLR 4 into a Quarkus app

  Using ANTLR 4 with Quarkus: A Practical Guide Why combine ANTLR + Quarkus? Quarkus gives you a fast, cloud/native Java framework with ...