11/24/2024

Kafka Producer with Apache Flink in a few steps

In the following articles, I’ll show you how to create and deploy Flink applications:

Where to start:

The easiest way to start is by writing a code in your favorit IDE (in my case it’s Intellij IDEA) and running it locally on your laptop. In Flink, you can choose between different APIs such as Java, Scala, Go, Python, and SQL. I decided on Java, because it’s the core language for Flink, and most of the core libraries, APIs, and features are developed primarily for Java. 



Setup Kafka brokers:

We can have Kafka running locally in just a few steps with Confluent Community Docker setup, which is available and documented on this GitHub repository

Here are the steps to follow:

git clone https://github.com/confluentinc/cp-all-in-one.git 

cd cp-all-in-one-community

In our case, we only need Kafka, Zookeeper, and Schema-Registry from the docker-compose.yml file. The rest can be commented out, as we don’t need the other services.

Now, we can start the services by executing: 
docker-compose up -d

After that, running docker ps should show the following Docker containers:


And voilà, we have Kafka available where data can be stored! As a final step, we can create a topic where we will eventually push the data.

docker-compose exec broker kafka-topics --create --topic test.topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1


Developing Producer

Alright, let’s start writing our producer using Java 11 and the latest Flink version, which is 1.20 at the moment. We’ll start with something simple: generating small JSON messages that will subsequently be written to Kafka.


The code below loads the Flink configuration from the file config.yaml and also creates a local execution environment. It simulates a cluster environment but runs all tasks on local threads.


public class KafkaJsonProducer {

    private static final String TOPIC_DATA = "test.topic";

    public static void main(String[] args) throws Exception {

        Configuration config = GlobalConfiguration.loadConfiguration();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 


The config file used in this application is stored in the “conf” directory of the main project:

taskmanager.memory.process.size: 1024m

taskmanager.numberOfTaskSlots: 1

parallelism.default: 1

rest.port: 8086

bootstrap.servers: localhost:9092


In the following lines of code, a source of the JSON messages is created:

DataStream<String> stream = streamProducer(env);

private static DataStream<String> streamProducer(StreamExecutionEnvironment env) {

       return env.addSource(new SourceFunction<>() {

            private static final long serialVersionUID = 1L;
            private volatile boolean isRunning = true;
            private final ObjectMapper objectMapper = new ObjectMapper();
            private final Random random = new Random();

            final String[] predefinedValues = {"value1", "value2", "value3", "value4", "value5"};

            @Override
            public void run(SourceContext<String> sourceContext) throws Exception {
                while (isRunning) {
                    Map<String, Object> message = new HashMap<>();
                    message.put("key1", getRandomValue(predefinedValues));
                    message.put("key2", getRandomValue(predefinedValues));
                    message.put("key3", random.nextInt(100));
                    message.put("key4", random.nextDouble());

                    String jsonMessage = objectMapper.writeValueAsString(message);
                    sourceContext.collect(jsonMessage);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

Now that the data is being generated, the last missing piece is to create a Kafka sink.

stream.sinkTo(buildKafkaGeneratorSink(config));

private static KafkaSink<String> buildKafkaGeneratorSink(Configuration config) {        return KafkaSink.String>builder() .setBootstrapServers(config.getString(ConfigOptions.key("bootstrap.servers").stringType().noDefaultValue()))        .setRecordSerializer(KafkaRecordSerializationSchema.builder()                        .setTopic(TOPIC_DATA)  .setValueSerializationSchema(new SimpleStringSchema())                        .build()) .setKafkaProducerConfig(new java.util.Properties()) .build();    }


We can run our application now, and verify whether the data is being loaded into the topic:

docker-compose exec broker kafka-console-consumer --bootstrap-server localhost:9092 --topic test.topic --from-beginning



Success !, in just a few steps we’ve managed to load data with Flink into a Kafka topic. However, running Flink locally is suitable only for development and simple testing. If we want to build a data product for a wider audience, the Flink Kubernetes Operator is a good choice. It provides a production-grade environment with scalability, reliability, efficient resource usage, and streamlined operations. I’ll show you how to get it running in just a few steps.

Flink Kubernetes Operator

Using the Flink Operator simplifies deployment on Kubernetes and allows for an easy transition from your local development environment to production platforms. In just a few steps, you can have a Flink job running on k8s, and I’ll show you how below.


Alright, everything seems straightforward. We don’t have a JAR file, but we already have our Flink application code, so we just need to build the package. We can also find on the operator’s project page how to create the Flink deployment and install the operator. The only missing piece is a Kubernetes cluster, but we can quickly create one with Minikube.

brew install minikube --> Install minikube on mac

minikube start --> start minikube cluster

brew install helm --> install helm 

kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.15.3/cert-manager.yaml —-> deploy cert-manager on k8s

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0 —-> add operator repo

helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator →install operator


Now we should see operator’s pod running on Kubernetes.



We’re ready to create our Flink deployment, which YAML might look like below:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: kafka-producer
spec:
  image: flink:1.17
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
    rest.port: "8086"
    bootstrap.servers: "host.minikube.internal:9092"
  serviceAccount: flink
  podTemplate:
    metadata:
      name: flink-pod-template
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
            - name: flink-job-jar
              mountPath: /opt/flink/job
      volumes:
        - name: flink-job-jar
          persistentVolumeClaim:
            claimName: flink-job-jar-pvc
  jobManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: local:///opt/flink/job/KafkaProducer-1.0-SNAPSHOT.jar
    entryClass: org.example.KafkaJsonProducer
    parallelism: 1
    upgradeMode: stateless

As you can see, I specified their key parameters based on which the job and task managers will be created. The last remaining step is to move our JAR file to Minikube cluster. In my case, I created PVC, and the uber JAR landed on that storage.

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-job-jar-pvc
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi 

Now, we are ready to deploy the Flink application on Kubernetes cluster with just one command:

kubectl apply -f deployment.yml

After a few seconds, the pods should be visible in the cluster. The JSON data is flowing again to the Kafka topic, but this time from the application running within Minikube cluster.


As you can imagine, you can run the same code on your production Kubernetes clusters such as EKS, GKE, or any other that you use. To stop running the job, you just need to delete the deployment like you would do for other k8s resources:

kubectl delete flinkdeployment kafka-producer

I hope you find this tutorial useful and that it helps you start the adventure with Flink. I tried  to keep it simple and focus on specific steps to get the application running. I understand you might have more specific questions, so I recommend you reviewing all the resources I included in this article.



Niciun comentariu:

QUARKUS & GraphQL

 QUARKUS & GraphQL https://www.geeksforgeeks.org/graphql-tutorial/ https://quarkus.io/guides/smallrye-graphql-client https://www.mastert...