Building a Cloud Storage Kubernetes Operator with Go and Operator SDK
In the last article, we looked at Mutating Admission Webhooks as a way to extend Kubernetes. In this article we’ll explore another concept: Kubernetes Operators.
Kubernetes Operators
The Kubernetes docs defines operators as:
Operators are software extensions to Kubernetes that make use of custom resources to manage applications and their components
This might seem a little abstract right now, but we’ll try to explore this concept further by actually implementing an Operator.
Autobucket Operator
In this article we’ll build “Autobucket Operator”, a Kubernetes operator that automatically manages Cloud Object Storage (like GCP Cloud Storage Buckets or S3 Buckets) for a Kubernetes Deployment. Here is a schema that represents the general idea:
Whenever a Kubernetes Deployment with a specific set of annotations is created, we’d like the operator controller to create a Bucket Custom Resource (CR), and whenever a Bucker CR is created, we’d like the operator controller to create a Cloud Storage Bucket.
Let’s code
The companion repo for this article is available on github, so you can follow along.
The Operator is built using Operator SDK/Kubebuilder and Golang. To get started we define a Bucket Custom Resource in Go code:
// BucketSpec defines the desired state of Bucket
type BucketSpec struct {
// Cloud platform
// +kubebuilder:validation:Enum=gcp
// +kubebuilder:validation:Required
Cloud BucketCloud `json:"cloud"`
// FullName is the cloud storage bucket full name
// +kubebuilder:validation:Required
FullName string `json:"fullName"`
// OnDeletePolicy defines the behavior when the Deployment/Bucket objects are deleted
// +kubebuilder:validation:Enum=destroy;ignore
// +kubebuilder:validation:Required
OnDeletePolicy BucketOnDeletePolicy `json:"onDeletePolicy"`
}
// BucketStatus defines the observed state of Bucket
type BucketStatus struct {
// CreatedAt is the cloud storage bucket creation time
CreatedAt string `json:"createdAt,omitempty"`
}
Kubebuilder then generates for us the corresponding Kubernetes Custom Resource Definition, which allows to define a Bucket in yaml like this:
apiVersion: ab.leclouddev.com/v1
kind: Bucket
metadata:
name: my-app
spec:
cloud: gcp
fullName: ab-default-my-app
onDeletePolicy: destroy
However we won’t really need to create the Bucket CR manually. The idea is to add custom annotations to a Deployment resource, and have the operator create and manage the Bucket Custom Resource automatically for us, as in this example:
apiVersion: apps/v1
kind: Deployment
metadata:
name: bucket-text-api
labels:
app: bucket-text-api
annotations:
## Custom Annotations start here
ab.leclouddev.com/cloud: gcp
ab.leclouddev.com/name-prefix: ab
ab.leclouddev.com/on-delete-policy: destroy
## Custom Annotations end here
spec:
replicas: 2
selector:
matchLabels:
app: bucket-text-api
template:
metadata:
labels:
app: bucket-text-api
spec:
containers:
- name: bucket-text-api
image: quay.io/didil/bucket-text-api
ports:
- containerPort: 8000
env:
- name: PORT
value: "8000"
- name: GCP_PROJECT
value : autobucket-demo
- name: BUCKET_NAME
value: ab-default-bucket-text-api
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /var/secrets/gcp/sa.json
volumeMounts:
- mountPath: /var/secrets/gcp
name: storage-writer-key
resources:
limits:
cpu: "500m"
memory: "128Mi"
volumes:
- name: storage-writer-key
secret:
secretName: storage-writer-key
The example above is a pretty regular k8s Deployment, but if you look at the metadata.annotations section, you’ll see that there is a series of custom annotations with keys starting with “ab.leclouddev.com”, those are the special instructions for our operator controller to create the Bucket CR.
But how does the controller actually work ?
The Reconcile Loop
The operator controller watches the Deployments and whenever it finds a Deployment with the special annotation “ab.leclouddev.com/cloud”, it will create (if missing) a matching Bucket CR. Luckily kubebuilder and controller-runtime do the heavy lifting for us here and we basically just have to define our deployment’s “Reconcile Loop”, which checks deployments and reconciles the Bucket resources:
func (r *DeploymentReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("deployment", req.NamespacedName)
dep := &appsv1.Deployment{}
err := r.Get(ctx, req.NamespacedName, dep)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue
log.Info("Deployment resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
log.Error(err, "Failed to get deployment")
return ctrl.Result{}, err
}
bucketCloud := abv1.BucketCloud(dep.Annotations[bucketCloudKey])
if bucketCloud == "" {
// no autobucket annotation
return ctrl.Result{}, nil
}
// Check if the bucket object already exists, if not create a new one
bucket := &abv1.Bucket{}
err = r.Get(ctx, types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, bucket)
if err != nil && errors.IsNotFound(err) {
// Define new
bucket, err := r.bucketForDeployment(dep)
if err != nil {
log.Error(err, "Failed to build new Bucket", "Bucket.Name", dep.Name)
return ctrl.Result{}, err
}
log.Info("Creating a new Bucket", "Bucket.Name", bucket.Name)
err = r.Create(ctx, bucket)
if err != nil {
log.Error(err, "Failed to create new Bucket", "Bucket.Name", bucket.Name)
return ctrl.Result{}, err
}
// created successfully - return and requeue
return ctrl.Result{Requeue: true}, nil
} else if err != nil {
log.Error(err, "Failed to get Bucket")
return ctrl.Result{}, err
}
// check if bucket ondelete policy must be updated
bucketOnDeletePolicy := abv1.BucketOnDeletePolicy(dep.Annotations[bucketOnDeletePolicyKey])
if bucketOnDeletePolicy != bucket.Spec.OnDeletePolicy {
bucket.Spec.OnDeletePolicy = bucketOnDeletePolicy
log.Info("Updating Bucket OnDeletePolicy", "Bucket.Name", bucket.Name, "Bucket.OnDeletePolicy", bucketOnDeletePolicy)
if err := r.Update(context.Background(), bucket); err != nil {
log.Error(err, "Failed to update bucket")
return ctrl.Result{}, err
}
// updated successfully - return and requeue
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, nil
}
The code above mainly creates the Bucket resources if they’re missing. The next step is to create the actual Cloud Storage buckets when a Bucket CR is created. We’ll take care of that in the buckets controller/reconcile loop:
func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("bucket", req.NamespacedName)
bucket := &abv1.Bucket{}
err := r.Get(ctx, req.NamespacedName, bucket)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue
log.Info("Bucket resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
log.Error(err, "Failed to get bucket")
return ctrl.Result{}, err
}
// CODE SKIPPED: Bucket Deletion Code
// check if the storage bucket has been created yet
if bucket.Status.CreatedAt == "" {
// bucket not yet created
log.Info("Creating Bucket", "Bucket.Cloud", bucket.Spec.Cloud, "Bucket.Name", bucket.Name)
switch bucket.Spec.Cloud {
case abv1.BucketCloudGCP:
err := r.createGCPBucket(ctx, bucket)
if err != nil {
log.Error(err, "Failed to create gcp Bucket", "Bucket.Name", bucket.Name)
return ctrl.Result{}, err
}
default:
log.Info("Bucket Cloud unknown.", "Bucket.Cloud", bucket.Spec.Cloud)
return ctrl.Result{}, nil
}
bucket.Status.CreatedAt = time.Now().Format(time.RFC3339)
err = r.Client.Status().Update(ctx, bucket)
if err != nil {
log.Error(err, "Failed to update bucket status")
return ctrl.Result{}, err
}
// Status updated - return and requeue
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, nil
}
This last piece of code creates Cloud Storage buckets, and updates our Bucket CR status with a creation time stamp “CreatedAt”, so we can track whether our Cloud Storage buckets have been created yet.
How about testing ?
As a big fan of automated testing, it’s comforting to learn that we can easily write tests for our controllers using envtest (which runs a local k8s control plane so we can run our tests against it), the Gingko testing framework and the Gomega matching/assertion library:
Context("When creating a deployment", func() {
var deployment *appsv1.Deployment
var bucket *abv1.Bucket
It("Should create the bucket crd", func() {
ctx := context.Background()
gcpSvc.On("CreateBucket", mock.AnythingOfType("*context.emptyCtx"), BucketFullName).Return(nil)
deployment = &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: DeploymentName,
Namespace: NamespaceName,
Annotations: map[string]string{
"ab.leclouddev.com/cloud": "gcp",
"ab.leclouddev.com/name-prefix": "abtest",
"ab.leclouddev.com/on-delete-policy": "ignore",
},
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "test",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "test",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
Name: "test",
Image: "busybox",
},
},
},
},
},
}
Expect(k8sClient.Create(ctx, deployment)).Should(Succeed())
// wait for bucket creation
Eventually(func() error {
bucket = &abv1.Bucket{}
err := k8sClient.Get(ctx, types.NamespacedName{Name: DeploymentName, Namespace: deployment.Namespace}, bucket)
if err != nil {
return err
}
if cloud := bucket.Spec.Cloud; cloud != "gcp" {
return fmt.Errorf("wrong cloud %v", cloud)
}
if fullName := bucket.Spec.FullName; fullName != BucketFullName {
return fmt.Errorf("wrong full name %v", fullName)
}
return nil
}, timeout, interval).Should(BeNil())
})
In the test above, we create a Deployment with our special annotations via the kubernetes API client, and then we check that the operator controller creates a Bucket CR with the right specs.
Does it work ?
Let’s try our operator in a little demo. For this, I have created another github repository bucket-text-api which is a simple Go REST API that takes a JSON input and saves text to a Cloud Storage bucket.
For this demo I have created a Kubernetes cluster (v1.18.6), a GCP project, and a GCP service account (detailed GCP instructions here). Then we’ll install our resources and controller manager:
# install the k8s resources
$ make install
# deploy the controller manager
$ GCP_PROJECT=autobucket-demo make deploy
Our operator is now running in the cluster, so we’ll create the sample deployment (bucket-text-api):
$ kubectl apply -f config/samples/deployment.yaml
Let’s check that our deployment is running:
$ kubectl get deployment
NAME READY UP-TO-DATE AVAILABLE AGE
bucket-text-api 2/2 2 2 1m
and let’s check that our Bucket CR was created:
$ kubectl get bucket
NAME CLOUD FULLNAME CREATEDAT
bucket-text-api gcp ab-default-bucket-text-api 2020-11-19T15:31:34Z
As expected, the Bucket CR was created and we can see its full name and that it has a CreatedAt timestamp, which means our Cloud Storage bucket was also created.
Let’s check the Cloud Storage bucket is actually there using the gsutil tool:
$ gsutil ls -L -p autobucket-demo
gs://ab-default-bucket-text-api/ :
Storage class: STANDARD
Location type: multi-region
Location constraint: US
Versioning enabled: None
Logging configuration: None
Website configuration: None
CORS configuration: None
Lifecycle configuration: None
Requester Pays enabled: None
Labels: None
Default KMS key: None
Time created: Thu, 19 Nov 2020 15:31:34 GMT
Time updated: Thu, 19 Nov 2020 15:31:34 GMT
Great ! Let’s now try to use our bucket-text-api app to save a text file to the bucket. The Deployment was exposed via a NodePort Service on port 3008 of the cluster nodes so we can access it using:
$ curl --request POST \
--url http://<kubernetes-node-ip>:30008/save \
--header 'Content-Type: application/json' \
--data '{
"name": "test.txt",
"content": "hello operator !"
}'
And finally let’s see if the file is saved on the Cloud Storage bucket:
$ gsutil cat gs://ab-default-bucket-text-api/test.txt
hello operator !
The file is there and everything seems to be working as expected 🎉 .
CR Deletion and Finalizers
One last bit I didn’t mention yet is what happens when a Deployment or Bucket CR is deleted.
The operator provides a special deployment annotation “ab.leclouddev.com/on-delete-policy” which can be set to “destroy” or “ignore”. If it is set to “destroy” as in our example above, the operator will delete the Cloud Storage bucket when the Bucket CR is deleted, and also when the Deployment is deleted since a Deployment deletion triggers a Bucket CR deletion (use carefully as you might lose data). This is done through Kubernetes Finalizers, which I highly encourage you to read on, and you can check the full code here.
Let’s try to delete the deployment:
$ kubectl delete deployment bucket-text-api
deployment.apps "bucket-text-api" deleted
$ kubectl get bucket
No resources found in default namespace.
$ gsutil ls -p autobucket-demo gs://ab-default-bucket-text-apiBucketNotFoundException: 404 gs://ab-default-bucket-text-api bucket does not exist.
Deleting the deployment also deleted the Bucket CR and the Cloud Storage bucket as expected.
Conclusion
We have seen in this example how Kubernetes Operators can allow us to automate cloud infrastructure logic. But there are of course a lot more uses to operators, and you can check this list of operators in the wild.
While functional, this first version of Autobucket Operator is pretty basic and only handle Google Cloud Platform storage buckets, but there are already a few items on the todo list, such as support for AWS S3 bucket and more advanced configuration options.
I hope you have found this article useful and as usual please let me know if you have any questions or remarks, and if you’d like to contribute to Autobucket Operator please open github issues and send pull requests !