Watchers in Kubernetes


main

The Kubernetes API is a resource-based (RESTful) programmatic interface provided via HTTP. It supports retrieving, creating, updating, and deleting primary resources via the standard HTTP verbs (POST, PUT, PATCH, DELETE, GET), includes additional subresources for many objects that allow fine grained authorization (such as binding a pod to a node), and can accept and serve those resources in different representations for convenience or efficiency. It also supports efficient change notifications on resources via “watches” and consistent lists to allow other components to effectively cache and synchronize the state of resources.

Kubernetes API basic concepts

First we are going to see how we can communicate directly with the Kubernetes API and how we can list resources that already exist in the cluster.

Before we start, you need to have a Kubernetes cluster, and the kubectl command-line tool must be configured to communicate with your cluster.

Run below commands in a terminal.

Starts a proxy to the Kubernetes API server

kubectl proxy --port 8080

Exploring the Kubernetes API

Open a new terminal and try below commands.

Get the API versions:

curl http://localhost:8080/api/

Get a list of pods:

curl http://localhost:8080/api/v1/namespaces/default/pods

The output should look similar to this:

{
  "kind": "PodList",
  "apiVersion": "v1",
  "metadata": {
    "resourceVersion": "33074"
  },
  "items": [
    {
      "metadata": {
        "name": "kubernetes-bootcamp-2321272333-ix8pt",
        "generateName": "kubernetes-bootcamp-2321272333-",
        "namespace": "default",
        "uid": "ba21457c-6b1d-11e6-85f7-1ef9f1dab92b",
        "resourceVersion": "33003",
        "creationTimestamp": "2016-08-25T23:43:30Z",
        "labels": {
          "pod-template-hash": "2321272333",
          "run": "kubernetes-bootcamp"
        },
        ...
}

Get the list of ingresses in a given namespace (example: default namespace):

curl http://localhost:8080/apis/extensions/v1beta1/namespaces/default/ingresses

Code a client to watch for events in a Kubernets cluster

To enable clients to build a model of the current state of a cluster, all Kubernetes object resource types are required to support consistent lists and an incremental change notification feed called a watch. Every Kubernetes object has a resourceVersion field representing the version of that resource as stored in the underlying database.

When retrieving a collection of resources (either namespace or cluster scoped), the response from the server will contain a resourceVersion value that can be used to initiate a watch against the server. The server will return all changes (creates, deletes, and updates) that occur after the supplied resourceVersion. This allows a client to fetch the current state and then watch for changes without missing any updates.

If the client watch is disconnected they can restart a new watch from the last returned resourceVersion, or perform a new collection request and begin again. See Resource Version Semantics for more detail.

All the code used on the examples is located in THIS Github repository.

In this demo, we are going to develop a code that will monitor for changes in an Annotation of any Ingress within a Kubernetes cluster.

in main.go we first define the constant with the annotation we want to watch: const Annotation = "external-dns.alpha.kubernetes.io/hostname".

Here we are using an Annotaion that uses the Custom Resource: External DNS to synchronize exposed Kubernetes Services and Ingresses with DNS providers. In general we can monitor any annotation or use our own annotation as well. This is due the very nice flexibility of Kubernetes Annotations.

The client is going to start and first thing we need to do is retrieve the current state of the world. Basically get all Ingresses that already exists in Kubernetes when client start:

var api = clientset.ExtensionsV1beta1().Ingresses("")
ingresses, err := api.List(context.TODO(), metav1.ListOptions{})

We are going to iterate on the list returned by the Kubernetes API and print out all annotations found in the cluster for all namespaces. This is happeing in THIS function.

At this point you can compare if the state of world is what you desired or you need to trigger any action based on that. Basically you can at this point:

- Compare with a database or config file if an annotation was added to an existed Ingress.

- There is a new ingress with this annotation that was added while the client was not running.

- There was an ingress that was deleted while the client was not running.

Now, we are going to setup a watcher and the client will be listen for events on that watcher.

// SETUP WATCHER CHANNEL
resourceVersion := ingresses.ListMeta.ResourceVersion
log.Info("Setting up a watcher channel")
watcher, err := api.Watch(context.TODO(), metav1.ListOptions{ResourceVersion: resourceVersion})
if err != nil {
  panic(err.Error())
}
ch := watcher.ResultChan()

Here we create a channel and assign it to the watcher. The watcher is only watching for events on a single kubernetes object. In this case, it is only watching on changes on Ingresses.

You can also setup multiple watchers. I will write a post about that in the future. Think of this as how some Kubernetes Core components work. For example the Scheduler watch for events like pods, services and others in order to react on these changes.

Now that we have a watcher setup. We are just going to listen to the channel:

// LISTEN TO CHANNEL
log.Info("Watching for changes...")
for {
  event := <-ch
  ingresses, ok := event.Object.(*extensionsV1beta1.Ingress)
  if !ok {
    panic("Could not cast to Endpoint")
  }
  log.WithFields(log.Fields{
    "Domain": ingresses.Annotations[Annotation],
    "Event":  event.Type,
  }).Info("A change has been detected")
}

This is an example output from my cluster, where I had 10 Ingresses previously created before the client ran but 5 of them have the annotation defined in the code.

Then I added a new ingress with the annotation, then I deleted, I added again, I modified another Ingress resource (mytestdomain2.example.com) and then I deleted. ALL tese changes are detected by the watcher I previously setup in the code.

INFO[0000] Using out of cluster config                  
INFO[0000] Getting Services resources version           
INFO[0000] Getting current state of the world. Show all annotations that already exists in the cluster 
INFO[0000] mytestdomain1.example.com                  
INFO[0000] mytestdomain12.example.com                   
INFO[0000] mytestdomain3.example.com                       
INFO[0000] mytestdomain4.example.com                         
INFO[0000] mytestdomain5.example.com              
INFO[0000] Setting up a watcher channel                 
INFO[0000] Watching for changes...                      
INFO[0010] A change has been detected                    Domain=mytestdomain6.example.com Event=ADDED
INFO[0021] A change has been detected                    Domain=mytestdomain6.example.com Event=DELETED
INFO[0028] A change has been detected                    Domain=mytestdomain6.example.com Event=ADDED
INFO[0042] A change has been detected                    Domain=mytestdomain6.example.com Event=MODIFIED
INFO[0048] A change has been detected                    Domain=mytestdomain2.example.com Event=MODIFIED
INFO[0052] A change has been detected                    Domain=mytestdomain2.example.com Event=DELETED

I think is enough of Kubernetes API internals and watcher to digest in one blog post. But I do want to recommend this very helpful presentation about Kubernetes watchers:

Stay tuned for more updates about watchers!

Resources:

Kubernetes API Concepts

Use an HTTP Proxy to Access the Kubernetes API

External DNS

Kubernetes Annotations

Back to blog