In the following articles, I’ll show you how to create and deploy Flink applications:
Where to start:
The easiest way to start is by writing a code in your favorit IDE (in my case it’s Intellij IDEA) and running it locally on your laptop. In Flink, you can choose between different APIs such as Java, Scala, Go, Python, and SQL. I decided on Java, because it’s the core language for Flink, and most of the core libraries, APIs, and features are developed primarily for Java.
Setup Kafka brokers:
We can have Kafka running locally in just a few steps with Confluent Community Docker setup, which is available and documented on this GitHub repository .
Here are the steps to follow:
git clone https://github.com/confluentinc/cp-all-in-one.git
cd cp-all-in-one-community
In our case, we only need Kafka, Zookeeper, and Schema-Registry from the docker-compose.yml file. The rest can be commented out, as we don’t need the other services.
Now, we can start the services by executing:
docker-compose up -d
After that, running docker ps should show the following Docker containers:
And voilà, we have Kafka available where data can be stored! As a final step, we can create a topic where we will eventually push the data.
docker-compose exec broker kafka-topics --create --topic test.topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Developing Producer
Alright, let’s start writing our producer using Java 11 and the latest Flink version, which is 1.20 at the moment. We’ll start with something simple: generating small JSON messages that will subsequently be written to Kafka.
The code below loads the Flink configuration from the file config.yaml and also creates a local execution environment. It simulates a cluster environment but runs all tasks on local threads.
public class KafkaJsonProducer {
private static final String TOPIC_DATA = "test.topic";
public static void main(String[] args) throws Exception {
Configuration config = GlobalConfiguration.loadConfiguration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
The config file used in this application is stored in the “conf” directory of the main project:
taskmanager.memory.process.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
rest.port: 8086
bootstrap.servers: localhost:9092
In the following lines of code, a source of the JSON messages is created:
DataStream<String> stream = streamProducer(env);
private static DataStream<String> streamProducer(StreamExecutionEnvironment env) {
return env.addSource(new SourceFunction<>() { private static final long serialVersionUID = 1L; private volatile boolean isRunning = true; private final ObjectMapper objectMapper = new ObjectMapper(); private final Random random = new Random(); final String[] predefinedValues = {"value1", "value2", "value3", "value4", "value5"}; @Override public void run(SourceContext<String> sourceContext) throws Exception { while (isRunning) { Map<String, Object> message = new HashMap<>(); message.put("key1", getRandomValue(predefinedValues)); message.put("key2", getRandomValue(predefinedValues)); message.put("key3", random.nextInt(100)); message.put("key4", random.nextDouble()); String jsonMessage = objectMapper.writeValueAsString(message); sourceContext.collect(jsonMessage); } } @Override public void cancel() { isRunning = false; } });
Now that the data is being generated, the last missing piece is to create a Kafka sink.
stream.sinkTo(buildKafkaGeneratorSink(config));
private static KafkaSink<String> buildKafkaGeneratorSink(Configuration config) { return KafkaSink.String>builder() .setBootstrapServers(config.getString(ConfigOptions.key("bootstrap.servers").stringType().noDefaultValue())) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(TOPIC_DATA) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setKafkaProducerConfig(new java.util.Properties()) .build(); }
We can run our application now, and verify whether the data is being loaded into the topic:
docker-compose exec broker kafka-console-consumer --bootstrap-server localhost:9092 --topic test.topic --from-beginning
Success !, in just a few steps we’ve managed to load data with Flink into a Kafka topic. However, running Flink locally is suitable only for development and simple testing. If we want to build a data product for a wider audience, the Flink Kubernetes Operator is a good choice. It provides a production-grade environment with scalability, reliability, efficient resource usage, and streamlined operations. I’ll show you how to get it running in just a few steps.
Flink Kubernetes Operator
Using the Flink Operator simplifies deployment on Kubernetes and allows for an easy transition from your local development environment to production platforms. In just a few steps, you can have a Flink job running on k8s, and I’ll show you how below.
Alright, everything seems straightforward. We don’t have a JAR file, but we already have our Flink application code, so we just need to build the package. We can also find on the operator’s project page how to create the Flink deployment and install the operator. The only missing piece is a Kubernetes cluster, but we can quickly create one with Minikube.
brew install minikube --> Install minikube on mac
minikube start --> start minikube cluster
brew install helm --> install helm
kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.15.3/cert-manager.yaml —-> deploy cert-manager on k8s
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0 —-> add operator repo
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator →install operator
Now we should see operator’s pod running on Kubernetes.
We’re ready to create our Flink deployment, which YAML might look like below:
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: kafka-producer spec: image: flink:1.17 flinkVersion: v1_17 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" rest.port: "8086" bootstrap.servers: "host.minikube.internal:9092" serviceAccount: flink podTemplate: metadata: name: flink-pod-template spec: containers: - name: flink-main-container volumeMounts: - name: flink-job-jar mountPath: /opt/flink/job volumes: - name: flink-job-jar persistentVolumeClaim: claimName: flink-job-jar-pvc jobManager: resource: memory: "1024m" cpu: 0.5 taskManager: resource: memory: "1024m" cpu: 0.5 job: jarURI: local:///opt/flink/job/KafkaProducer-1.0-SNAPSHOT.jar entryClass: org.example.KafkaJsonProducer parallelism: 1 upgradeMode: stateless
As you can see, I specified their key parameters based on which the job and task managers will be created. The last remaining step is to move our JAR file to Minikube cluster. In my case, I created PVC, and the uber JAR landed on that storage.
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: flink-job-jar-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
Now, we are ready to deploy the Flink application on Kubernetes cluster with just one command:
kubectl apply -f deployment.yml
After a few seconds, the pods should be visible in the cluster. The JSON data is flowing again to the Kafka topic, but this time from the application running within Minikube cluster.
As you can imagine, you can run the same code on your production Kubernetes clusters such as EKS, GKE, or any other that you use. To stop running the job, you just need to delete the deployment like you would do for other k8s resources:
kubectl delete flinkdeployment kafka-producer
I hope you find this tutorial useful and that it helps you start the adventure with Flink. I tried to keep it simple and focus on specific steps to get the application running. I understand you might have more specific questions, so I recommend you reviewing all the resources I included in this article.
Niciun comentariu:
Trimiteți un comentariu