12/03/2024

Schema-Driven Stream Processing with Apache Flink

    

Schema-Driven Stream Processing with Apache Flink

How to produce simple JSON messages into a Kafka topic using Apache Flink. JSON is just one of many formats commonly used in stream processing. Other popular formats, such as Avro and Protobuf, are widely adopted for their ability to enforce structured definitions on processed data. One of their main advantages is their reliance on a schema, which provides a predefined structure to the data. Typically, this schema is stored and validated with the help of Schema Registry, which is a fundamental component of the Kafka ecosystem that ensures data consistency and compatibility. In the following sections, I’ll guide you through the steps to process such schema-driven messages in your Flink applications.


Schema Registry

I mentioned above about defining a structure for processed data. But you may wonder where that should be defined. As usual, there are many answers to the same question or technical challenge you have. In this case, the schema can be defined directly in your code, embedded in the messages, or stored in static files such as .avsc or .proto.While this approach works, it comes with some disadvantages, such as the need to maintain changes directly in the code and limited accessibility to the defined structure.

Therefore, the most common solution for schema management is a Schema Registry. This small yet powerful service offers many features that make it easier to manage topic schemas:

  • Centralized schema storage
  • Easy access via API
  • Schema versioning
  • Schema evolution support
  • Compatibility enforcement
  • Scalable design
  • Easy of deployment

If you remember, in my previous article, I was producing a message that looked like the one below:

Using this example, I will show you how to define a schema in the registry. If you followed all the steps in my previous article, you should have the Schema Registry service running already on your machine. You can verify it by executing the following command:

Otherwise check the steps here, and that should take you just a few minutes to make it running.


Avro Messages

Avro is one of the most commonly used formats for Kafka topics, alongside JSON and Protobuf. Its main advantages are its robust schema evolution capabilities and the ability to embed schemas within messages.

Before processing any Avro messages, we need to define a schema, which we will eventually store in the Schema Registry.


As you can see, it’s quite self-explanatory. It contains an array of field names along with their types, as well as the header part. Essentially, the record type represents a structured object with multiple fields, while the namespace helps to organize schemas, especially in large organizations.

Let’s register this schema in the Schema Registry. But before doing this, we can check if it’s accessible by sending a GET request to the following endpoint:

curl -X GET http://localhost:8081/subjects

Here, we’re hitting the /subjects endpoint, which is how schemas are organized in the Schema Registry. Under each subject, all versions of associated schema can be found. The subject names usually follow the convention topic-value for message payloads and topic-key for message keys. In our case, we are only interested in the message payload.

We should see that there are currently no existing subjects in the registry.

We’ll change it by running the command below:

curl -X POST \
     -H "Content-Type: application/vnd.schemaregistry.v1+json" \
     --data '{
         "schema": "{\"type\":\"record\",\"name\":\"Message\",\"namespace\":\"com.example.kafka\",\"fields\":[{\"name\":\"key1\",\"type\":\"string\"},{\"name\":\"key2\",\"type\":\"string\"},{\"name\":\"key3\",\"type\":\"int\"},{\"name\":\"key4\",\"type\":\"double\"}]}"
     }' \
     http://localhost:8081/subjects/test.topic.avro-value/versions

The schema should be created now, what could be verified by running the following command:

curl -X GET http://localhost:8081/subjects/test.topic.avro-value/versions/1

Brilliant, now we can start working on our Flink application. Let’s use JSON data that is already in the topic test.topic from the previous post, convert it into Avro format with the specific schema, and write it into another topic.

First, we need to add the Schema Registry connection details to the config.yaml file.

schema.registry: http://localhost:8081

After that, the Flink environment needs to be initialized and schema retrieved.

private static final String inputTopic = "test.topic";
private static final String targetTopic = "test.topic.avro";
private static final String  schemaSubject = "test.topic.avro-value";

// Load global configuration and setup Flink execution environment
        Configuration config = GlobalConfiguration.loadConfiguration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//  Retrieve brokers and Schema Registry URLs
        String brokers = config.get(ConfigOptions.key("bootstrap.servers").stringType().noDefaultValue());
        String schemaRegistryUrl = config.get(ConfigOptions.key("schema.registry").stringType().noDefaultValue());

 // Retrieve schema
 String schemaString = loadSchemaStringFromRegistry(schemaRegistryUrl);

// Method for schema retrieval
private static String loadSchemaStringFromRegistry(String schemaRegistryUrl) {
        LOG.info("Retrieving schema for subject : {}", schemaSubject);

        SchemaRegistryClient client = new CachedSchemaRegistryClient(schemaRegistryUrl, 100);
        try {
            return client.getLatestSchemaMetadata(schemaSubject).getSchema();
        } catch (IOException | RestClientException e) {
            throw new RuntimeException("Failed to fetch schema from registry", e);
        }
    }

The consumer code for the JSON topic looks like below. You can define a consumer group or starting offset as per your preference.

// Consume JSON topic
   DataStream<String> inputStream = kafkaJsonSourceStream(env, inputTopic, brokers);

private static DataStream<String> kafkaJsonSourceStream(StreamExecutionEnvironment env,
                    String topic, String brokers) {
        LOG.info("Consuming JSON data from topic: {}", topic);

         KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setGroupId("testGroup-" + topic)
                .setTopics(topic)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

         return env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source - " + topic);
    }

Now, the application should consume the data. However, we want to have it in the Avro format, so this code below will make the conversion. We use transient objects and initialization for schema in the open method, mostly to avoid serialization errors, which very likely might appear.

// Convert topic data to Avro format
        DataStream<GenericRecord> avroStream = inputStream.map(new JsonToAvroMapper(schemaString))
                .returns(new GenericRecordAvroTypeInfo(schema));

public static class JsonToAvroMapper extends RichMapFunction<String, GenericRecord> {
        private transient ObjectMapper objectMapper;
        private transient Schema schema;
        private final String schemaString;

        public JsonToAvroMapper(String schemaString) {
            this.schemaString = schemaString;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            objectMapper = new ObjectMapper();
            schema = new Schema.Parser().parse(schemaString);
        }

        @Override
        public GenericRecord map(String json) throws Exception {
            Map dataMap = objectMapper.readValue(json, Map.class);
            GenericRecord record = new GenericData.Record(schema);
            for (Schema.Field field : schema.getFields()) {
                record.put(field.name(), dataMap.get(field.name()));
            }
            return record;
        }
    }

Finally, we reached the last step, when the data will be flowing to the Avro topic.

// Sink data to Kafka topic
avroStream.sinkTo(kafkaAvroSinkStream(targetTopic, brokers, schema, schemaRegistryUrl));

private static KafkaSink<GenericRecord> kafkaAvroSinkStream(String topic, String brokers, Schema avroSchema,
                    String schemaRegistryUrl) {
        LOG.info("Sink Avro topic " + topic);


        KafkaSink<GenericRecord> sink = KafkaSink.<GenericRecord>builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(topic)
                        .setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forGeneric(
                                topic + "-value",
                                avroSchema,
                                schemaRegistryUrl
                        ))
                        .build()
                )
                .build();

        return sink;
    }

Let’s check if the data is the Avro topic. We can utilize Kafka’s console consumer for this purpose:

kafka-avro-console-consumer --bootstrap-server localhost:9092 \
                                --topic test.topic.avro \
                                --property schema.registry.url=http://localhost:8081 
--from-beginning

We made it! 👏 As you can see it wasn’t too difficult to process and produce Avro messages using Flink.


Protobuf Messages

Protocol Buffers (Protobuf) is another popular data format used in Kafka. Its main advantages are efficient binary serialization and strong data typing, resulting in smaller message sizes and faster processing. Protobuf also supports schema evolution, allowing updates to message schemas without breaking compatibility with existing consumers.

Similarly to the previous case, we need to push the respective schema to the registry. here is the Protobuf schema:


As you see, in this format, field numbers are used for encoding, which makes messages more compact. Field names are ignored during serialization, therefore changing the field names is fine as long as the number remains unchanged.

Schema needs to be registered, as it was done for Avro:

curl -X POST \
  -H "Content-Type: application/json" \
  -d '{
        "schemaType": "PROTOBUF",
        "schema": "syntax = \"proto3\";\npackage com.example.kafka;\n\nmessage Message {\n    string key1 = 1;\n    string key2 = 2;\n    int32 key3 = 3;\n    double key4 = 4;\n}"
      }' \
  http://localhost:8081/subjects/test.topic.proto-value/versions

In our application, similar to the case with Avro, we start by consuming data from the JSON topic. Later, it’s converted to DynamicMessage type, which is the best choice when the schema changes dynamically.

DataStream<DynamicMessage> protobufStream = inputStream.map(new JsonToProtobufMapper(schemaString));

private static class JsonToProtobufMapper extends RichMapFunction<String, DynamicMessage> {
        private  Descriptors.Descriptor schemaDescriptor;
        private final String schemaString;
        private transient ObjectMapper objectMapper;

        public JsonToProtobufMapper(String schemaString) {
            this.schemaString = schemaString;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            objectMapper = new ObjectMapper();
            ProtobufSchema protobufSchema = new ProtobufSchema(schemaString);
            schemaDescriptor = protobufSchema.toDescriptor();
        }

        @Override
        public DynamicMessage map(String json) throws Exception {
            Map<String, Object> jsonMap = objectMapper.readValue(json, Map.class);
            DynamicMessage.Builder builder = DynamicMessage.newBuilder(schemaDescriptor);

            for (Descriptors.FieldDescriptor field : schemaDescriptor.getFields()) {
                if (jsonMap.containsKey(field.getName())) {
                    builder.setField(field, jsonMap.get(field.getName()));
                }
            }
            return builder.build();
        }
    }

In our case, Flink cannot use its native serializers to handle data in the DynamicMessage format. Therefore, we need to implement a custom serializer and register it with Kryo, which will be used for serialization and deserialization of DynamicMessage objects within the Flink pipeline.

env.getConfig().registerTypeWithKryoSerializer(DynamicMessage.class, new DynamicMessageSerializer(schemaString));

private static class DynamicMessageSerializer extends Serializer<DynamicMessage> implements Serializable {

        private transient Descriptors.Descriptor schemaDescriptor;
        private final String schemaString;

        public DynamicMessageSerializer(String schemaString) { this.schemaString = schemaString; }

        @Override
        public void write(Kryo kryo, Output output, DynamicMessage dynamicMessage) {
            byte[] bytes = dynamicMessage.toByteArray();
            output.writeInt(bytes.length);
            output.writeBytes(bytes);
        }

        @Override
        public DynamicMessage read(Kryo kryo, Input input, Class<DynamicMessage> type) {
            int length = input.readInt();
            byte[] bytes = input.readBytes(length);

            try {
                ProtobufSchema protobufSchema = new ProtobufSchema(schemaString);
                schemaDescriptor = protobufSchema.toDescriptor();
                return DynamicMessage.parseFrom(schemaDescriptor, bytes);
            } catch (Exception e) {
                throw new RuntimeException("Failed to deserialize message", e);
            }
        }
    }

We’re almost there, the data is already in the right format to send it into the Protobuf topic. The last missing piece is to create a sink that will allow us to get the data flowing.

protobufStream.sinkTo(kafkaProtobufSinkStream(targetTopic, brokers, schemaRegistryUrl, schemaRegistryClient));

private static KafkaSink<DynamicMessage> kafkaProtobufSinkStream(String topic, String brokers,
                                                                     String schemaRegistryUrl, SchemaRegistryClient schemaRegistryClient) {
        LOG.info("Sink Protobuf topic " + topic);

        Map<String, Object> serdeConfig = new HashMap<>();
        serdeConfig.put("schema.registry.url", schemaRegistryUrl);


        KafkaSink<DynamicMessage> sink = KafkaSink.<DynamicMessage>builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(topic)
                        .setValueSerializationSchema(new FlinkKafkaProtobufSerializationSchema(topic, schemaRegistryClient, serdeConfig))
                        .build()
                )
                .build();

        return sink;
    }

Obviously, the data needs to be serialized from DynamicMessage into the Protobuf binary format before Kafka can write it to a topic. Therefore, we need also a serializer for that purpose.

 private static class FlinkKafkaProtobufSerializationSchema implements SerializationSchema<DynamicMessage> {

        private final String topic;
        private transient KafkaProtobufSerializer<DynamicMessage> protobufSerializer;
        private final transient SchemaRegistryClient schemaRegistryClient;
        private final Map<String, ?> configs;

        public FlinkKafkaProtobufSerializationSchema(String topic, SchemaRegistryClient schemaRegistryClient,
                                                     Map<String, ?> configs) {

            this.topic = topic;
            this.schemaRegistryClient = schemaRegistryClient;
            this.configs = configs;
        }

        @Override
        public void open(InitializationContext context) throws Exception {
            if (protobufSerializer == null) {
                protobufSerializer = new KafkaProtobufSerializer<>(schemaRegistryClient);
                protobufSerializer.configure(configs, false);
            }
        }

        @Override
        public byte[] serialize(DynamicMessage message) {
            if (protobufSerializer == null) {
                throw new IllegalStateException("Serializer is not initialized");
            }
            return protobufSerializer.serialize(topic, message);
        }
    }

Let’s execute our application and verify whether the data is being processed.

./kafka-protobuf-console-consumer --bootstrap-server localhost:9092 \
                                  --topic test.topic.proto \
                                  --property schema.registry.url=http://localhost:8081 \
                                  --from-beginning


The data is now in the topic, so it looks like we’ve made it ! Now, you know how to consume and produce messages on Kafka topics and handle different formats such as Avro and Protobuf with Schema Registry integration.

Equipped with this knowledge and building on insights from the previous article, you’re now prepared to run production pipelines and deploy them using the Flink Kubernetes Operator.

Happy Streaming!

Note: Ensure you have all the necessary dependencies in your build configuration (e.g., Maven or Gradle), including Flink, Kafka, Avro, Protobuf, and Schema Registry client libraries.

If you have any questions or need further clarification on any of the steps, feel free to leave a comment or reach out directly. Happy coding!

QUARKUS & GraphQL

 QUARKUS & GraphQL https://www.geeksforgeeks.org/graphql-tutorial/ https://quarkus.io/guides/smallrye-graphql-client https://www.mastert...