12/03/2024

Kafka Producer with Apache Flink in a few steps


In this and the following articles, I’ll show you how to create and deploy Flink applications. There will be covered common use cases that you’re likely to encounter in your projects.


Where to start

Probably, I should start with a proper theoretical introduction about the architecture, main purpose of the tool, and so on. But I’ll skip this part, because I want to focus here on showing you a quick path on how to build and run a Flink application. There is very good documentation on the official project’s websites that explains all of this, in case you need more information.

The easiest way to start is by writing a code in your favorit IDE (in my case it’s Intellij IDEA) and running it locally on your laptop. Hmm..just which language to pick? In Flink, you can choose between different APIs such as Java, Scala, Python, and SQL. I decided on Java, because it’s the core language for Flink, and most of the core libraries, APIs, and features are developed primarily for Java.

Now, we can just sit and start developing Java code and execute it in the IDE, which should produce messages to Kafka. Well, that would work if you already have running Kafka brokers. If you don’t, then we need to resolve this small obstacle.


Setup Kafka brokers

We can have Kafka running locally in just a few steps with Confluent Community Docker setup, which is available and documented on this GitHub repository.

Here are the steps to follow:

git clone https://github.com/confluentinc/cp-all-in-one.git 
cd cp-all-in-one-community

In our case, we only need Kafka, Zookeeper, and Schema-Registry from the docker-compose.yml file. The rest can be commented out, as we don’t need the other services.

Now, we can start the services by executing:

docker-compose up -d

After that, running docker ps should show the following Docker containers:


And voilà, we have Kafka available where data can be stored! As a final step, we can create a topic where we will eventually push the data.

docker-compose exec broker kafka-topics --create --topic test.topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Developing Producer

Alright, let’s start writing our producer using Java 11 and the latest Flink version, which is 1.20 at the moment. We’ll start with something simple: generating small JSON messages that will subsequently be written to Kafka.


The code below loads the Flink configuration from the file config.yaml and also creates a local execution environment. It simulates a cluster environment but runs all tasks on local threads.

public class KafkaJsonProducer {

    private static final String TOPIC_DATA = "test.topic";

    public static void main(String[] args) throws Exception {

        Configuration config = GlobalConfiguration.loadConfiguration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

The config file used in this application is stored in the “conf” directory of the main project:

taskmanager.memory.process.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
rest.port: 8086
bootstrap.servers: localhost:9092

In the following lines of code, a source of the JSON messages is created:

DataStream<String> stream = streamProducer(env);

private static DataStream<String> streamProducer(StreamExecutionEnvironment env) {
        return env.addSource(new SourceFunction<>() {

            private static final long serialVersionUID = 1L;
            private volatile boolean isRunning = true;
            private final ObjectMapper objectMapper = new ObjectMapper();
            private final Random random = new Random();

            final String[] predefinedValues = {"value1", "value2", "value3", "value4", "value5"};

            @Override
            public void run(SourceContext<String> sourceContext) throws Exception {
                while (isRunning) {
                    Map<String, Object> message = new HashMap<>();
                    message.put("key1", getRandomValue(predefinedValues));
                    message.put("key2", getRandomValue(predefinedValues));
                    message.put("key3", random.nextInt(100));
                    message.put("key4", random.nextDouble());

                    String jsonMessage = objectMapper.writeValueAsString(message);
                    sourceContext.collect(jsonMessage);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

Now that the data is being generated, the last missing piece is to create a Kafka sink.

stream.sinkTo(buildKafkaGeneratorSink(config));

private static KafkaSink<String> buildKafkaGeneratorSink(Configuration config) {
        return KafkaSink.<String>builder()
                .setBootstrapServers(config.getString(ConfigOptions.key("bootstrap.servers").stringType().noDefaultValue()))
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(TOPIC_DATA)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setKafkaProducerConfig(new java.util.Properties())
                .build();
    }

We can run our application now, and verify whether the data is being loaded into the topic:

docker-compose exec broker kafka-console-consumer --bootstrap-server localhost:9092 --topic test.topic --from-beginning

Success !, in just a few steps we’ve managed to load data with Flink into a Kafka topic. However, running Flink locally is suitable only for development and simple testing. If we want to build a data product for a wider audience, the Flink Kubernetes Operator is a good choice. It provides a production-grade environment with scalability, reliability, efficient resource usage, and streamlined operations. I’ll show you how to get it running in just a few steps.


Flink Kubernetes Operator

Using the Flink Operator simplifies deployment on Kubernetes and allows for an easy transition from your local development environment to production platforms. In just a few steps, you can have a Flink job running on k8s, and I’ll show you how below.


Alright, everything seems straightforward. We don’t have a JAR file, but we already have our Flink application code, so we just need to build the package. We can also find on the operator’s project page how to create the Flink deployment and install the operator. The only missing piece is a Kubernetes cluster, but we can quickly create one with Minikube.

brew install minikube     --> Install minikube on mac
minikube start                --> start minikube cluster
brew install helm            --> install helm 

kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.15.3/cert-manager.yaml     —-> deploy cert-manager on k8s

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-
kubernetes-operator-1.9.0    —-> add operator repo

helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator  →install operator

Now we should see operator’s pod running on Kubernetes.


Cool, so far so good. Now it’s time to deploy our Flink job on Kubernetes. Before that, lets build the JAR file:

mvn clean package

We’re ready to create our Flink deployment, which YAML might look like below:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: kafka-producer
spec:
  image: flink:1.17
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
    rest.port: "8086"
    bootstrap.servers: "host.minikube.internal:9092"
  serviceAccount: flink
  podTemplate:
    metadata:
      name: flink-pod-template
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
            - name: flink-job-jar
              mountPath: /opt/flink/job
      volumes:
        - name: flink-job-jar
          persistentVolumeClaim:
            claimName: flink-job-jar-pvc
  jobManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: local:///opt/flink/job/KafkaProducer-1.0-SNAPSHOT.jar
    entryClass: org.example.KafkaJsonProducer
    parallelism: 1
    upgradeMode: stateless

As you can see, I specified their key parameters based on which the job and task managers will be created. The last remaining step is to move our JAR file to Minikube cluster. In my case, I created PVC, and the uber JAR landed on that storage.

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-job-jar-pvc
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi 

Now, we are ready to deploy the Flink application on Kubernetes cluster with just one command:

kubectl apply -f deployment.yml

After a few seconds, the pods should be visible in the cluster. The JSON data is flowing again to the Kafka topic, but this time from the application running within Minikube cluster.


As you can imagine, you can run the same code on your production Kubernetes clusters such as EKS, GKE, or any other that you use.

To stop running the job, you just need to delete the deployment like you would do for other k8s resources:

kubectl delete flinkdeployment kafka-producer

I hope you find this tutorial useful and that it helps you start the adventure with Flink. I tried  to keep it simple and focus on specific steps to get the application running. I understand you might have more specific questions, so I recommend you reviewing all the resources I included in this article.

Niciun comentariu:

QUARKUS & GraphQL

 QUARKUS & GraphQL https://www.geeksforgeeks.org/graphql-tutorial/ https://quarkus.io/guides/smallrye-graphql-client https://www.mastert...