Gracias a un reto propuesto por un compañero de Paradigma Digital descubrimos KEDA, una herramienta tan ligera como práctica y poderosa. Con KEDA podemos escalar recursos desde “cero” en respuesta a eventos procedentes de dentro y fuera de nuestro cluster Kubernetes. Es una herramienta en estado Graduated por la CNCF y, como hemos comprobado, posee una excelente documentación, facilidad de uso y un nivel de soporte suficiente como para ser considerada una tecnología a adoptar en nuestros entornos productivos.

Pertenezco a un grupo habilitador de la tecnología dentro de Paradigma Digital, y vamos a empezar una serie de artículos que nos permitan ir demostrando las soluciones aportadas por este grupo a los distintos proyectos reales de la compañía.

Hoy veremos cómo encontrar la mejor solución a un reto propuesto por uno de nuestros compañeros. Para ello usaremos Me-hab (Método Habilitador: es sentido común, pero si le pones un nombre es más comercial 😋) que consiste en un proceso iterativo de 5 pasos:

  1. Entender bien y hacer preguntas con dos objetivos.
  1. Con las manos vacías, imaginar una solución (solución de papel en blanco).
  2. Buscar en el conocimiento humano accesible (vamos, internet) soluciones y herramientas que encajen con nuestra solución imaginada.
  3. Evaluar la solución imaginada usando las herramientas y los conocimientos encontrados. En este paso, es necesario realizar pruebas de concepto para poder probar y medir cómo de eficiente es la solución que hemos imaginado usando las herramientas y conocimientos que hemos encontrado.
  4. Comunicar lo mejor posible nuestra solución al peticionario para ponerla a prueba y, si es necesario, volver al punto 1.

El reto

Para un proyecto actual de Paradigma Digital, un compañero (Arquitecto de Soluciones) se puso en contacto con nosotros para proponernos un reto:

“Necesitamos levantar procesos batch en un entorno Kubernetes a partir de mensajes presentes en un topic de kafka”.

Solución de folio en blanco

Partiendo de un cluster Kubernetes levantado, pero con el uso de recursos en el mínimo posible:

  1. Necesitamos un proceso que esté escuchando la presencia de mensajes en un topic kafka (o alguna métrica asociada que nos indique esta presencia, por ejemplo, el lag de un consumer group).
  2. Ese proceso deberá tener la capacidad de crear objetos Job y el consiguiente Pod dentro de nuestro cluster kubernetes.
  3. El job y el pod deberán tener asignados recursos suficientes para que la ejecución sea posible. Para ello, deberá existir algún mecanismo que permita escalar el cluster si no hay recursos disponibles.
  4. Una vez terminada la ejecución, deberíamos mantener trazas y métricas correspondientes al trabajo realizado.

La búsqueda de herramientas

Empezamos investigando dentro de las herramientas serverless compatibles con Kubernetes, puesto que lo que necesitamos es crear recursos al vuelo para la ejecución de trabajos en respuesta a eventos externos a la plataforma, a ser posible con estados latentes con el menor consumo posible.
Buscando en CNCF Landscape encontramos varias herramientas que se ajustan a nuestra solución imaginada.

Estas son las más destacadas:

De su página web obtenemos la definición de KEDA, que parece un buen candidato para solucionar nuestro reto:

Definición de KEDA

KEDA es un Autoscaler Dirigido por Eventos basado en Kubernetes. Con KEDA, puedes gestionar el escalado de cualquier contenedor en Kubernetes basándote en el número de eventos que necesitan ser procesados.
KEDA es un componente ligero y de un solo propósito que se puede añadir a cualquier clúster de Kubernetes. KEDA funciona junto con componentes estándares de Kubernetes como el Horizontal Pod Autoscaler y puede extender la funcionalidad sin sobrescribir ni duplicar. Con KEDA puedes mapear explícitamente las aplicaciones que quieres usar para escalado basado en eventos, mientras que otras aplicaciones continúan funcionando. Esto hace de KEDA una opción flexible y segura para funcionar al lado de cualquier número de otras aplicaciones o marcos de Kubernetes.

De momento nos vamos a centrar en KEDA, puesto que en una evaluación preliminar encontramos que es el candidato más fuerte:

Ahora sólo nos queda hacer una prueba y constatar los puntos antes mencionados.

Preparación de la prueba

Conexión a un cluster kubernetes

En mi caso estoy usando un cluster Kubernetes en local, pero cualquier conexión nos valdría.
Necesitamos tener instalado:

kubectl

$ kubectl cluster-info
Kubernetes control plane is running at https://127.0.0.1:6443
CoreDNS is running at https://127.0.0.1:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
Metrics-server is running at https://127.0.0.1:6443/api/v1/namespaces/kube-system/services/https:metrics-server:https/proxy

$ kubectl version
Client Version: v1.29.2
Kustomize Version: v5.0.4-0.20230601165947-6ce0bf390ce3
Server Version: v1.28.3+k3s2

y helm

$ helm version
version.BuildInfo{Version:"v3.14.1", GitCommit:"e8858f8696b144ee7c533bd9d49a353ee6c4b98d", GitTreeState:"clean", GoVersion:"go1.21.7"}

Instalación de Kubeview

Para tener cierta visibilidad sobre los Resources Kubernetes, vamos a usar una herramienta muy sencilla pero muy potente que nos muestra los recursos desplegados y el estado de salud que tienen.

git clone https://github.com/benc-uk/kubeview.git
cd kubeview/charts
helm install kubeview ./kubeview -f example-values.yaml -n kubeview --create-namespace

Para probar que está correctamente instalado, levantamos un port-forward:

kubectl port-forward svc/kubeview -n kubeview 8080:80

Y probamos en http://localhost:8080. Si nos vamos al namespace kubeview, podremos ver todos los resources de la propia herramienta.

Instalación del cluster de Kafka

Antes de nada, tenemos que instalar un cluster de Kafka para poder hacer la prueba. Usando esta documentación con helm:

helm repo add confluentinc https://packages.confluent.io/helm
helm repo update
helm upgrade --install confluent-operator \
  confluentinc/confluent-for-kubernetes \
  --namespace confluent --create-namespace

Esto despliega el operador:

Ahora tenemos que configurar nuestro cluster, lo mínimo para hacer la prueba que nos ocupa. Para ello, seguimos el quickstart deploy de este git

Una vez instalado el operador, el siguiente paso es instalar el cluster. Pero necesitamos uno muy sencillo, sin ksqlDB, connect y restProxy.

Creamos el fichero confluent-platform-simple.yaml:

apiVersion: platform.confluent.io/v1beta1
kind: Zookeeper
metadata:
  name: zookeeper
  namespace: confluent
spec:
  replicas: 1
  image:
    application: confluentinc/cp-zookeeper:7.6.0
    init: confluentinc/confluent-init-container:2.8.0
  dataVolumeCapacity: 10Gi
  logVolumeCapacity: 10Gi
  podTemplate:
    resources:
      requests:
        cpu: 100m
        memory: 256Mi
    podSecurityContext:
      fsGroup: 1000
      runAsUser: 1000
      runAsNonRoot: true
apiVersion: platform.confluent.io/v1beta1
kind: Kafka
metadata:
  name: kafka
  namespace: confluent
spec:
  replicas: 1
  image:
    application: confluentinc/cp-server:7.6.0
    init: confluentinc/confluent-init-container:2.8.0
  dataVolumeCapacity: 10Gi
  configOverrides:
    server:
      - "confluent.license.topic.replication.factor=1"
      - "confluent.metrics.reporter.topic.replicas=1"
      - "confluent.tier.metadata.replication.factor=1"
      - "confluent.metadata.topic.replication.factor=1"
      - "confluent.balancer.topic.replication.factor=1"
      - "confluent.security.event.logger.exporter.kafka.topic.replicas=1"
      - "event.logger.exporter.kafka.topic.replicas=1"
      - "offsets.topic.replication.factor=1"
      - "confluent.cluster.link.enable=true"
      - "password.encoder.secret=secret"
  podTemplate:
    resources:
      requests:
        cpu: 200m
        memory: 512Mi
    podSecurityContext:
      fsGroup: 1000
      runAsUser: 1000
      runAsNonRoot: true
  metricReporter:
    enabled: true
apiVersion: platform.confluent.io/v1beta1
kind: ControlCenter
metadata:
  name: controlcenter
  namespace: confluent
spec:
  replicas: 1
  image:
    application: confluentinc/cp-enterprise-control-center:7.6.0
    init: confluentinc/confluent-init-container:2.8.0
  dataVolumeCapacity: 10Gi
  configOverrides:
    server:
      - confluent.controlcenter.command.topic.replication=1
      - confluent.controlcenter.replication.factor=1
      - confluent.metrics.reporter.topic.replicas=1
      - confluent.metrics.topic.replication=1
      - confluent.monitoring.interceptor.topic.replication=1
      - confluent.controlcenter.internal.topics.replication=1
  externalAccess:
    type: loadBalancer
    loadBalancer:
      domain: minikube.domain
  podTemplate:
    resources:
      requests:
        cpu: 500m
        memory: 512Mi
    probe:
      liveness:
        periodSeconds: 10
        failureThreshold: 5
        timeoutSeconds: 500
    podSecurityContext:
      fsGroup: 1000
      runAsUser: 1000
      runAsNonRoot: true
  dependencies:
    schemaRegistry:
      url: http://schemaregistry.confluent.svc.cluster.local:8081
apiVersion: platform.confluent.io/v1beta1
kind: SchemaRegistry
metadata:
  name: schemaregistry
  namespace: confluent
spec:
  replicas: 1
  image:
    application: confluentinc/cp-schema-registry:7.6.0
    init: confluentinc/confluent-init-container:2.8.0
  podTemplate:
    resources:
      requests:
        cpu: 100m
        memory: 256Mi
    podSecurityContext:
      fsGroup: 1000
      runAsUser: 1000
      runAsNonRoot: true

Instalamos los elementos del cluster:

kubectl apply -f confluent-platform-simple.yaml

Al cabo de un rato, comprobamos mediante kubeview (o usando kubectl get pods) que todo está creado y funcionando correctamente.

k get pods
NAME                                 READY   STATUS    RESTARTS   AGE
confluent-operator-d8b8d5b65-tdndj   1/1     Running   0          33m
zookeeper-0                          1/1     Running   0          4m55s
kafka-0                              1/1     Running   0          3m40s
controlcenter-0                      1/1     Running   0          2m25s
schemaregistry-0                     1/1     Running   0          2m25s

Comprobamos que podemos acceder al controlcenter:

kubectl port-forward svc/controlcenter -n confluent 9021:9021

Comprobamos en http://localhost:9021

La prueba de concepto

Las características que tiene que cumplir KEDA para poder ser usado en la solución de nuestro reto son las siguientes:

Parece que es un buen candidato para resolver nuestro reto.

Instalación de KEDA

Pre-condiciones

Para instalar KEDA, vamos a seguir sus indicaciones de la página de deploy. Hay múltiples formas de instalar KEDA, pero nosotros vamos a utilizar Helm:

helm repo add kedacore https://kedacore.github.io/charts
helm repo update 
helm install keda kedacore/keda --namespace keda --create-namespace

En la consola de kubeview, en el namespace keda, podemos ver cuando esté todo levantado:

Ya tenemos instalado KEDA. Ahora vamos a ver su funcionalidad.

Configurar el trigger con el conector Kafka

Según el reto, los eventos que van a provocar la ejecución de un job son eventos provenientes de un topic de Kafka. Según la documentación, existe un conector que nos permite realizar está acción, veamos cómo hacerlo.

Lo primero es crear un topic que nos sirva como ejemplo de trigger. En el control center, pulsamos en el cluster > topics > + Add Topic

Comprobamos que el topic está correctamente creado:

Una vez creado nuestro topic, procedemos a crear un ScaledJob cuyo trigger sea un topic Kafka llamado test-topic en un consumer group llamado my-group:

Fichero scaledjob.yaml:

apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
  name: kafka-scaledjob
  namespace: default
spec:
  jobTargetRef:
    template:
      spec:
        containers:
          - name: kafka
            image: confluentinc/cp-kafka:7.6.1
            command: [ "/bin/sh", "-c" ]
            args:
              - echo start;
                kafka-console-consumer
                --bootstrap-server kafka.confluent.svc.cluster.local:9092
                --topic test-topic
                --group my-group
                --max-messages 1
                --property "print.key=true";
                echo end!;
        restartPolicy: Never
    backoffLimit: 4
  pollingInterval: 10             # Optional. Default: 30 seconds
  maxReplicaCount: 30             # Optional. Default: 100
  successfulJobsHistoryLimit: 3   # Optional. Default: 100. How many completed jobs should be kept.
  failedJobsHistoryLimit: 2       # Optional. Default: 100. How many failed jobs should be kept.
  triggers:
    - type: kafka
      metadata:
        bootstrapServers: kafka.confluent.svc.cluster.local:9092
        consumerGroup: my-group       # Make sure that this consumer group name is the same one as the one that is consuming topics
        topic: test-topic
        # Optional
        lagThreshold: "50"
        offsetResetPolicy: latest

Creamos el objeto en kubernetes:

kubectl apply -f scaledjob.yaml

Comprobamos que se ha creado correctamente. El CRD ScaledJob no aparecerá en kubeview, pero podemos listarlo usando kubectl:

$ kubectl get ScaledJob
NAME              MIN   MAX   TRIGGERS   AUTHENTICATION   READY   ACTIVE   PAUSED    AGE
kafka-scaledjob         30    kafka                       True    True    Unknown   63s

Comprobamos el funcionamiento del ScaledJob

Una vez creado el objeto ScaledJob, tenemos que probar que funciona para nuestro reto. Para ello tenemos que entender su funcionamiento.

  1. Cuando no hay mensajes esperando ser procesados, no se crean jobs.
  2. Cuando un mensaje llega a la cola, KEDA crea un job.
  3. Cuando el job comienza a ejecutarse, extrae un único mensaje y lo procesa hasta su finalización.
  4. A medida que llegan mensajes adicionales, se crean jobs adicionales. Cada job procesa un único mensaje hasta su finalización.
  5. Periódicamente, elimina los trabajos completados o fallidos según el límite de historial de trabajos exitosos (SuccessfulJobsHistoryLimit) y el límite de historial de trabajos fallidos (FailedJobsHistoryLimit).

Es decir, nuestro job tiene la responsabilidad de consumir un mensaje y el scaledJob se centra en la métrica del lag del consumer group para el topic configurado. Por lo tanto:

Se puede usar cualquier consumer de Kafka para leer un mensaje y procesarlo, por ejemplo Spring Kafka o cualquier cliente. Para la prueba, voy a usar una imagen de Confluent con el script kafka-console-consumer presente y ejecutaré dicho script para que consuma un mensaje cada vez que se ejecute.

echo start;
kafka-console-consumer 
  --bootstrap-server kafka.confluent.svc.cluster.local:9092 
  --topic test-topic
  --group my-group 
  --max-messages 1
  --property "print.key=true";
echo end!;

Ahora solo nos queda lanzar un mensaje por el topic test-topic, comprobar que se levanta el job por cada mensaje recibido y ejecuta el trabajo que tiene programado.

Comprobamos que el scaledJob está listo y preparado para escuchar mensajes por el topic adecuado:

$ kubectl describe scaledJob kafka-scaledJob
Name:         kafka-scaledjob
Namespace:    default
Labels:       <none>
Annotations:  <none>
API Version:  keda.sh/v1alpha1
Kind:         ScaledJob
Metadata:


Status:
  Conditions:
    Message:  ScaledJob is defined correctly and is ready to scaling
    Reason:   ScaledJobReady
    Status:   True
    Type:     Ready
    Message:  Scaling is performed because triggers are active
    Reason:   ScalerActive
    Status:   True
    Type:     Active
    Status:   Unknown
    Type:     Fallback
    Status:   Unknown
    Type:     Paused
Events:
  Type    Reason         Age             From           Message
   ...     ...           ...             ...              ...

  Normal  KEDAScalersStarted  2m34s                  scale-handler  Started scalers watch
  Normal  ScaledJobReady      2m34s                  keda-operator  ScaledJob is ready for scaling
  Normal  KEDAScalersStarted  2m34s (x3 over 2m34s)  scale-handler  Scaler kafka is built.
  Normal  KEDAJobsCreated     4s (x17 over 2m34s)    scale-handler  Created 0 jobs

Nota: si el topic no existe, encontrarás estos eventos. Al crear el topic, los jobs comenzarán a crearse correctamente.

Normal   KEDAScalersStarted  12m                     scale-handler  Started scalers watch
  Normal   ScaledJobReady      12m                     keda-operator  ScaledJob is ready for scaling
  Warning  KEDAScalerFailed    12m                     scale-handler  scaler with id 0 not found, len = 0, cache has been probably already invalidated
  Normal   KEDAScalersStarted  12m (x2 over 12m)       scale-handler  Scaler kafka is built.
  Warning  KEDAScalerFailed    7m20s (x31 over 12m)    scale-handler  expected at least one active partition within the topic 'test-topic'
  Normal   KEDAJobsCreated     2m20s (x15 over 4m40s)  scale-handler  Created 0 jobs

Creamos un mensaje cualquiera para comprobar si KEDA detecta el cambio del lag y ejecuta el job. Para ello, pulsamos en el topic > Messages > + Produce a new message to this topic.

Mantenemos el mensaje por defecto y pulsamos produce. Comprobamos que se ha enviado y que el mensaje está en el topic:

Comprobamos en los eventos del ScaledJob que se ha ejecutado un job (ejecutando el comando antes mencionado kubectl describe scaledJob kafka-scaledJob).

Events:
  Type    Reason             Age             From        Message
  ...      ...               ...             ...         ...
Normal  KEDAScalersStarted 6s              scale-handler Started scalers watch
Normal  ScaledJobReady     6s              keda-operator ScaledJob is ready for scaling
Normal  KEDAScalersStarted 6s (x3 over 6s) scale-handler Scaler kafka is built.
Normal  KEDAJobsCreated    6s (x2 over 6s) scale-handler Created 1 jobs

Comprobamos en kubernetes que, efectivamente, el job se ha creado y se ha ejecutado:

➜  ~ kubectl get jobs
NAME                    COMPLETIONS   DURATION   AGE
kafka-scaledjob-tvgtv   1/1           8s         33m

Comprobamos que se ha levantado un pod asociado al job:

➜  ~ kubectl get pods
NAME                          READY   STATUS      RESTARTS   AGE
kafka-scaledjob-tvgtv-rd5t4   0/1     Completed   0          33m

Y, revisando el log del pod, comprobamos que nuestro mensaje ha sido procesado por el job:

$ kubectl get pods|tail -1|awk '{print $1}'|xargs kubectl logs
start
18      {"ordertime":1497014222380,"orderid":18,"itemid":"Item_184","address":{"city":"Mountain View","state":"CA","zipcode":94041}}
Processed a total of 1 messages
end!

En breve publicaremos un video tutorial con la demostración en vivo de todo lo que os hemos comentado en este artículo. Añadiremos más pruebas para comprobar que los mensajes son procesados siempre por un solo job y algunas cosas más.

KEDA no se acopla a nuestro negocio

Una confusión que hemos tenido con respecto al Scaler Kafka de KEDA es pensar que iba a ser el propio scaler el encargado de leer el mensaje y usar ese mensaje como evento que produce un escalado en el sistema. Esto no es así, el evento externo es el cambio en la métrica del lag de un consumer group en un topic. Es nuestro job el encargado y el responsable de actuar sobre dicho evento, haciendo que el lag disminuya para que el escalado deje de producirse. Este concepto es fundamental en KEDA.

KEDA actúa reaccionando a métricas o eventos externos y su responsabilidad es escalar o desescalar en base a umbrales en métricas. KEDA no realiza acciones, sino que se centra en la recogida de ciertas métricas para reaccionar escalando o desescalando recursos dentro del cluster. KEDA permite la definición de umbrales y métricas complejas más dirigidas a negocio y usa HPA para escalar propiamente el cluster.

Conclusión

  1. Hemos analizado el reto que nos ha planteado nuestro compañero y nos hemos asegurado de que lo hemos entendido correctamente.
  2. Nos hemos imaginado una posible solución en la que necesitábamos un componente que pudiera crear objetos/pods dentro de kubernetes como reacción a la llegada de un evento vía Kafka.
  3. Hemos buscado dentro del ecosistema CNCF la herramienta más madura que cumple esta funcionalidad.
  4. Hemos encontrado una herramienta que cumple con nuestra idea de solución: KEDA.
  5. Hemos evaluado la herramienta haciendo una prueba de concepto constatando su madurez, calidad de su documentación, facilidad de uso y eficiencia en resolver los problemas que plantea nuestra solución.

A parte de este sencillo ejemplo, KEDA es una herramienta excelente para mantener el gasto de nuestro cluster kubernetes a raya. Según el framework FinOps, una de nuestras labores como ingenieros ajustar al máximo los recursos de nuestra plataforma con los objetivos del negocio. Con KEDA podemos automatizar escalado y desescalados (en muchos casos, partiendo desde cero) de nuestro cluster kubernetes en base a eventos referentes a las solicitudes de uso del mismo.

La cantidad de Scalers que tiene KEDA actualmente nos permite usar disparadores (triggers) procedentes de un gran número de herramientas. Por lo tanto, es una herramienta de obligado uso si queremos cumplir nuestra parte del framework FinOps, además de ser respetuosos con el medio ambiente y haciendo de nuestros diseños paradigmas de la sostenibilidad.

En este vídeo tutorial demostramos las cosas de las que hemos hablado en este artículo, vemos todo funcionando y hacemos algunas pruebas más que no hemos comentado aquí.

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete