Kubernetes Operator with Java and Quarkus
 
                                The goal of this blog post
A Kubernetes Operator, referred to as an operator in the remainder of this article, can be used for various purposes. The most common ones are managing database clusters, preparing database schemas, and restoring backups. Even though Kubernetes provides extensive support for multiple container management operations, it doesn’t cover all potential cases related to application management. Some applications need the manual intervention of a human operator. A Kubernetes operator is an application that runs in the cluster and manages resources. It allows the transfer of operational knowledge of the human operator into software components. Operators can be used for multiple purposes such as restoring data, creating cloud resources, resilience testing, and many others.
The purpose of this blog post is to demonstrate how to quickly write and deploy a simple Kubernetes operator using Java, and provide an example of tasks that can be performed using an operator in the context of the deployment of a service built with Quarkus.
I decided to use a Quarkus service written in Java for the following reasons:
- 
Being able to develop operators in Java can bring a lot of benefits for companies and individuals that are focused mainly on the Java stack. 
- 
Technologies such as containers, microservices, immutable infrastructure, deployed via declarative code are common elements of cloud-native computing approach. An Operator needs to be deployed in a container, and the state of resources managed by an operator is defined in a declarative way. Quarkus with the main focus on cloud-native, and microservice solutions looks like a good fit for building an operator. 
- 
The operator SDK also has neat features such as generating Kubernetes resources for operator deployments and creating custom resource definition yaml files based on the Java code. 
- 
The Java kubernetes-client supports multiple operations: such as creating resources, patching, copying files, executing commands, fetching logs and many others. 
What this operator does?
This simple hello world operator demonstrates how to create a stateful set via Kubernetes API and make sure that the data is loaded into each pod and consumed by the application before creating new pods.
 
It performs the following operations with Kubernetes API using the java kubernetes-client and quarkus-operator-sdk :
- 
watches custom resource creation/modification 
- 
watches pod creation/modification 
- 
creates and scales statefulsets 
- 
checks content of log output 
- 
executes commands in a container 
Project setup
Quarkus project provides developers with tools that allow bootstrapping an application without spending a lot of time on boilerplate code preparation https://code.quarkus.io. The generated project also includes a Dockerfile for building Docker images.
Dockerfile
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3
WORKDIR /work/
RUN chown 1001 /work \
    && chmod "g+rwX" /work \
    && chown 1001:root /work
COPY --chown=1001:root target/*-runner /work/application
EXPOSE 8080
USER 1001
CMD ["./application", "-Dquarkus.http.host=0.0.0.0"]Add dependencies into a project:
The next step is to add following dependencies:
Quarkus Extension to write operators in Java.
<dependency>
<groupId>io.quarkiverse.operatorsdk</groupId>
<artifactId>quarkus-operator-sdk</artifactId>
<version>1.8.2</version>
</dependency>Kubernetes Java Client for interacting and managing kubernetes resources via REST API. In recent years, this client has evolved to a full-fledged alternative to the kubectl command-line tool for Java-based development.
<dependency>
 <groupId>io.kubernetes</groupId>
 <artifactId>client-java</artifactId>
 <version>12.0.0</version>
</dependency>To facilitate local development we need to create a proxy connection between Kubernetes cluster API and a local machine.
kubectl --namespace kube-system --disable-filter=true proxy kube-apiserver-kind-control-plane
Additionally, some extra steps are required to configure a local development environment. In the following snippet of code, I disabled certificates validation and provided Kubernetes API endpoint name and default namespace.
quarkus.kubernetes-client.trust-certs=false quarkus.kubernetes-client.namespace=default quarkus.kubernetes-client.master-url=http://127.0.0.1:8001/
Kubernetes custom resource definitions
It’s possible to extend the Kubernetes API with custom resources in order to store and modify the desired object specification and state.
Based on the content of the classes in the below application, it generates custom-resource-definitions which can be used to create custom-resources. When the application starts it outputs yaml, which is the custom resource definitions to a file in the ./target/kubernetes/ directory.
Create a custom resource definition which extends the existing Kubernetes API.
@Group("example.com")
@Version("v1alpha1")
@ShortNames("hw")
public class HelloWorld extends CustomResource<HelloWorldSpec, HelloWorldStatus> implements Namespaced {
   private HelloWorldSpec spec;
   private HelloWorldStatus status;
   // {...}
}Creating specification fields of the custom resource.
public class HelloWorldSpec {
   private String name;
   private String image;
   private String data;
   private int replicas;
   // {...}
}The status fields of the custom resource is used to store the status of the replica set. In this particular case, it is used to count the number of deployed pods.
public class HelloWorldStatus {
   private Integer readyReplicas = 0;
   // {...}
}Create a custom resource of kind HelloWorld.
apiVersion: example.com/v1alpha1
kind: HelloWorld
metadata:
  name: hello-world-example
spec:
  name: hello-world
  image: busybox
  replicas: 3
  data: |
   Example of injected datakubectl create -f hello-world-example.yaml
A client to connect to the Kubernetes API
@Singleton
public class KubernetesClientProducer {
   @Produces
   public KubernetesClient kubernetesClient() {
       return new DefaultKubernetesClient(command in container);
   }
}A controller which listens to custom resource creation and updates
@Controller(namespaces = "default")
public class HelloWorldController  implements ResourceController<HelloWorld> {
   / ... /
   @Override
   public DeleteControl deleteResource(HelloWorld resource, Context<HelloWorld> context) {...}
   @Override
   public UpdateControl<HelloWorld> createOrUpdateResource(HelloWorld helloWorldRequest, Context<HelloWorld> context) {...}
   @Override
   public void init(EventSourceManager eventSourceManager) {...}
}Some logic to give an example of what can be done with an operator
When the custom resource HelloWorld is created or updated the operator receives an event. Based on this event it creates a stateful set according to custom object content.
 
HelloWorldController.java
@Override
public UpdateControl<HelloWorld> createOrUpdateResource(HelloWorld helloWorldRequest, Context<HelloWorld> context) {
   final var spec = helloWorldRequest.getSpec();
   StatefulSet statefulset = kubernetesClient.apps().statefulSets().withName(spec.getName()).get();
   if(statefulset == null) {
       log.info("Create statefulset " + spec.getName());
       statefulset = new StatefulSetBuilder()
               .withNewMetadata()
               .withName(spec.getName())
               .endMetadata()
               .withNewSpec()
               .withReplicas(1)
               .withNewTemplate()
               .withNewMetadata()
               .addToLabels("app", spec.getName())
               .endMetadata()
               .withNewSpec()
               .addNewContainer()
               .withName(spec.getName())
               .withImage(spec.getImage())
               .withCommand("sh", "-c", "while sleep 5; do cat /tmp/data.txt; done")
               .addNewPort()
               .withContainerPort(80)
               .endPort()
               .endContainer()
               .endSpec()
               .endTemplate()
               .withNewSelector()
               .addToMatchLabels("app", spec.getName())
               .endSelector()
               .endSpec()
               .build();
       kubernetesClient.apps().statefulSets().inNamespace(namespace).create(statefulset);
       return UpdateControl.noUpdate();
   } else if (statefulset.getStatus().getReplicas() < spec.getReplicas()) {
       log.info("Scale statefulset up: " + spec.getName());
       kubernetesClient.apps()
               .statefulSets()
               .inNamespace(namespace).withName(spec.getName()).scale(statefulset.getStatus().getReplicas() + 1, true);
       return UpdateControl.noUpdate();
   } else if (statefulset.getStatus().getReplicas() > spec.getReplicas()) {
       log.info("Scale statefulset down: " + spec.getName());
       kubernetesClient.apps()
               .statefulSets()
               .inNamespace(namespace).withName(spec.getName()).scale(spec.getReplicas(), true);
       return UpdateControl.noUpdate();
   }
   return UpdateControl.noUpdate();Containers are running the following command to check if the data is present in the container, and outputs an error to a log if data is not present.
"while sleep 5; do cat /tmp/data.txt; done"
PodWatcher watches pod creation and modification. This functionality provides the ability to dynamically react to changes of the cluster resources.
public class PodWatcher {
private final KubernetesClient kubernetesClient;
void onStartup(@Observes StartupEvent startupEvent) throws IOException {
   public PodWatcher(KubernetesClient kubernetesClient) {
      this.kubernetesClient = kubernetesClient;
   }
   kubernetesClient.pods().watch(new Watcher<Pod>() {
   @Override
   public void eventReceived(Action action, Pod pod) {...}
   @Override
   public void onClose(WatcherException e) {...}
});
}Start an operator
@QuarkusMain
public class Main implements QuarkusApplication {
   @Inject
   Operator operator;
   public static void main(String... args) {
       Quarkus.run(Main.class, args);
   }
   @Override
   public int run(String... args) throws Exception {
       operator.start();
       Quarkus.waitForExit();
       return 0;
   }
}When a pod is created, the operator fetches logs and checks if the data from the custom object is present in the pod, in case if it is not present operator outputs data to a file in the pod.
log.info("Get pod logs " + pod.getMetadata().getName());
String logs = kubernetesClient.pods()
        .inNamespace(namespace)
        .withName(podName)
        .getLog();
log.info("Check if data is available in pod " + pod.getMetadata().getName());
if (!logs.contains("Example of injected data")) {
    log.info("Inject data into pod " + podName);
    newExecWatch(kubernetesClient, namespace, podName, spec.getData());
}echo "Example of injected data" > /tmp/data.txt
After executing a command, the operator checks the log of the current pod again. If the application has consumed the data, it adds one pod to the replica set cluster and repeats this operation until all desired pods are present and contain the data.
kubernetesClient.apps().statefulSets().inNamespace(namespace).withName(name).scale(replicaSize + 1, true);Full code of PodWatcher
PodWatcher.java
public class PodWatcher {
    @ConfigProperty(name = "quarkus.kubernetes-client.namespace")
    String namespace;
    ObjectMapper mapper = new ObjectMapper();
    private final KubernetesClient kubernetesClient;
    public PodWatcher(KubernetesClient kubernetesClient) {
        this.kubernetesClient = kubernetesClient;
    }
    private final Logger log = LoggerFactory.getLogger(PodWatcher.class);
    void onStartup(@Observes StartupEvent startupEvent) throws IOException {
        List<Pod> pods = kubernetesClient.pods().inNamespace(namespace).list().getItems();
        kubernetesClient.pods().watch(new Watcher<Pod>() {
            @Override
            public void eventReceived(Action action, Pod pod) {
                log.info("Received " + action + ", pod name " + pod.getMetadata().getName());
                CustomResourceDefinitionContext helloWorldCustomResource = CustomResourceDefinitionContext.fromCustomResourceType(HelloWorld.class);
                Map<String, Object> cr = kubernetesClient.customResource(helloWorldCustomResource).get(namespace, "hello-world-example");
                HelloWorldSpec spec = mapper.convertValue(cr.get("spec"), HelloWorldSpec.class);
                if (action == Action.ADDED) {
                    StatefulSet statefulset = kubernetesClient.apps().statefulSets().withName(spec.getName()).get();
                    String podName = pod.getMetadata().getName();
                    try {
                        Thread.sleep(10 * 1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("Get pod logs " + pod.getMetadata().getName());
                    String logs = kubernetesClient.pods()
                            .inNamespace(namespace)
                            .withName(podName)
                            .getLog();
                    log.info("Check if data is available in pod " + pod.getMetadata().getName());
                    if (!logs.contains("Example of injected data")) {
                        log.info("Inject data into pod " + podName);
                        newExecWatch(kubernetesClient, namespace, podName, spec.getData());
                    }
                    try {
                        Thread.sleep(10 * 1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    logs = kubernetesClient.pods()
                            .inNamespace(pod.getMetadata().getNamespace())
                            .withName(pod.getMetadata().getName())
                            .getLog();
                    if (logs.contains("Example of injected data")) {
                        log.info("Data is available in pod " + pod.getMetadata().getName());
                        if (spec.getReplicas() > statefulset.getStatus().getReplicas()) {
                            log.info("Scale statefulset size, current size " + spec.getName());
                            try {
                                scaleStatefulSet(namespace, spec.getName(), statefulset.getStatus().getReplicas());
                            } catch (Exception ex) {
                                log.error(ex.getMessage());
                            }
                        }
                        {
                            log.info("Statefulset size: " + statefulset.getStatus().getReplicas() + ", desired: " + statefulset.getStatus().getReplicas());
                        }
                    }
                }
            }
            @Override
            public void onClose(WatcherException e) {
            }
        });
    }
    private ExecWatch newExecWatch(KubernetesClient client, String namespace, String podName, String data) {
        return client.pods().inNamespace(namespace).withName(podName)
                .readingInput(System.in)
                .writingOutput(System.out)
                .writingError(System.err)
                .withTTY()
                .usingListener(new SimpleListener())
                .exec("sh", "-c", "echo \"" + data + "\" > /tmp/data.txt");
    }
    void scaleStatefulSet(String namespace, String name, int replicaSize) {
        log.info("Scale statefulset " + name);
        kubernetesClient.apps().statefulSets().inNamespace(namespace).withName(name).scale(replicaSize + 1, true);
    }
}Operator deployment in the cluster
The files for building docker images are generated during project creation by quarkus.io and the Kubernetes resources for operator deployment are generated by the operator SDK.
Build the application
./mvnw package
We can effortlessly build a docker image using a Dockerfile which was generated previously by Quarkus bootstrap page.
docker build -f \ src/main/docker/Dockerfile.jvm \ -t user/k8-operator-quarkus-hello-world:1.0.0-SNAPSHOT .
Deploy the application
Before deploying an Operator to a cluster we need to apply the cluster role and role binding to provide an operator with all necessary permissions to execute API calls.
cluster-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: quarkus-operatork-example
rules:
  - apiGroups:
      - ""
    resources:
      - pods
    verbs:
      - get
      - list
      - watch
      - create
      - update
      - delete
      - patch
  - apiGroups:
      - apiextensions.k8s.io
    resources:
      - customresourcedefinitions
    verbs:
      - get
      - list
      - watch
      - create
      - update
      - delete
      - patch
  - apiGroups:
      - example.com
      - helloworlds.example.com
    resources:
      - helloworlds
    verbs:
      - get
      - list
      - watch
      - create
      - update
      - delete
      - patch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: code-with-quarkus-admin
subjects:
  - kind: ServiceAccount
    name: code-with-quarkus
    namespace: default
roleRef:
  kind: ClusterRole
  name: quarkus-operator-example
  apiGroup: ""kubectl apply -f deployment/cluster-rolebinding.yaml
Deploy an operator in a cluster using auto-generated kubernetes resources such as RBAC, Service, ServiceAccount, or/and Deployment.
kubectl apply -f target/kubernetes/kubernetes.yml
kubernetes.yml
---
apiVersion: v1
kind: ServiceAccount
metadata:
  annotations:
    app.quarkus.io/build-timestamp: 2021-04-18 - 14:25:30 +0000
  labels:
    app.kubernetes.io/version: 1.0.0-SNAPSHOT
    app.kubernetes.io/name: k8-operator-quarkus-hello-world
  name: k8-operator-quarkus-hello-world
---
apiVersion: v1
kind: Service
metadata:
  annotations:
    app.quarkus.io/build-timestamp: 2021-04-18 - 14:25:30 +0000
  labels:
    app.kubernetes.io/name: k8-operator-quarkus-hello-world
    app.kubernetes.io/version: 1.0.0-SNAPSHOT
  name: k8-operator-quarkus-hello-world
spec:
  ports:
  - name: http
    port: 8877
    targetPort: 8877
  selector:
    app.kubernetes.io/name: k8-operator-quarkus-hello-world
    app.kubernetes.io/version: 1.0.0-SNAPSHOT
  type: ClusterIP
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: k8-operator-quarkus-hello-world-view
roleRef:
  kind: ClusterRole
  apiGroup: rbac.authorization.k8s.io
  name: view
subjects:
- kind: ServiceAccount
  name: k8-operator-quarkus-hello-world
---
apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    app.quarkus.io/build-timestamp: 2021-04-18 - 14:25:30 +0000
  labels:
    app.kubernetes.io/version: 1.0.0-SNAPSHOT
    app.kubernetes.io/name: k8-operator-quarkus-hello-world
  name: k8-operator-quarkus-hello-world
spec:
  replicas: 1
  selector:
    matchLabels:
      app.kubernetes.io/version: 1.0.0-SNAPSHOT
      app.kubernetes.io/name: k8-operator-quarkus-hello-world
  template:
    metadata:
      annotations:
        app.quarkus.io/build-timestamp: 2021-04-18 - 14:25:30 +0000
      labels:
        app.kubernetes.io/version: 1.0.0-SNAPSHOT
        app.kubernetes.io/name: k8-operator-quarkus-hello-world
    spec:
      containers:
      - env:
        - name: KUBERNETES_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        image: user/k8-operator-quarkus-hello-world:1.0.0-SNAPSHOT
        imagePullPolicy: Always
        livenessProbe:
          failureThreshold: 3
          httpGet:
            path: /q/health/live
            port: 8877
            scheme: HTTP
          initialDelaySeconds: 0
          periodSeconds: 30
          successThreshold: 1
          timeoutSeconds: 10
        name: k8-operator-quarkus-hello-world
        ports:
        - containerPort: 8877
          name: http
          protocol: TCP
        readinessProbe:
          failureThreshold: 3
          httpGet:
            path: /q/health/ready
            port: 8877
            scheme: HTTP
          initialDelaySeconds: 0
          periodSeconds: 30
          successThreshold: 1
          timeoutSeconds: 10
      serviceAccount: k8-operator-quarkus-hello-worldThe final check
After some time we can validate that the replica set is deployed, and the data is injected into each pod.
 
Helpful links:
