¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
Conoce nuestra marca.¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
Conoce nuestra marca.dev
Santiago García-Bonacho 12/06/2024 Cargando comentarios…
Gracias a un reto propuesto por un compañero de Paradigma Digital descubrimos KEDA, una herramienta tan ligera como práctica y poderosa. Con KEDA podemos escalar recursos desde “cero” en respuesta a eventos procedentes de dentro y fuera de nuestro cluster Kubernetes. Es una herramienta en estado Graduated por la CNCF y, como hemos comprobado, posee una excelente documentación, facilidad de uso y un nivel de soporte suficiente como para ser considerada una tecnología a adoptar en nuestros entornos productivos.
Pertenezco a un grupo habilitador de la tecnología dentro de Paradigma Digital, y vamos a empezar una serie de artículos que nos permitan ir demostrando las soluciones aportadas por este grupo a los distintos proyectos reales de la compañía.
Hoy veremos cómo encontrar la mejor solución a un reto propuesto por uno de nuestros compañeros. Para ello usaremos Me-hab (Método Habilitador: es sentido común, pero si le pones un nombre es más comercial 😋) que consiste en un proceso iterativo de 5 pasos:
Para un proyecto actual de Paradigma Digital, un compañero (Arquitecto de Soluciones) se puso en contacto con nosotros para proponernos un reto:
“Necesitamos levantar procesos batch en un entorno Kubernetes a partir de mensajes presentes en un topic de kafka”.
Partiendo de un cluster Kubernetes levantado, pero con el uso de recursos en el mínimo posible:
Empezamos investigando dentro de las herramientas serverless compatibles con Kubernetes, puesto que lo que necesitamos es crear recursos al vuelo para la ejecución de trabajos en respuesta a eventos externos a la plataforma, a ser posible con estados latentes con el menor consumo posible.
Buscando en CNCF Landscape encontramos varias herramientas que se ajustan a nuestra solución imaginada.
Estas son las más destacadas:
De su página web obtenemos la definición de KEDA, que parece un buen candidato para solucionar nuestro reto:
KEDA es un Autoscaler Dirigido por Eventos basado en Kubernetes. Con KEDA, puedes gestionar el escalado de cualquier contenedor en Kubernetes basándote en el número de eventos que necesitan ser procesados.
KEDA es un componente ligero y de un solo propósito que se puede añadir a cualquier clúster de Kubernetes. KEDA funciona junto con componentes estándares de Kubernetes como el Horizontal Pod Autoscaler y puede extender la funcionalidad sin sobrescribir ni duplicar. Con KEDA puedes mapear explícitamente las aplicaciones que quieres usar para escalado basado en eventos, mientras que otras aplicaciones continúan funcionando. Esto hace de KEDA una opción flexible y segura para funcionar al lado de cualquier número de otras aplicaciones o marcos de Kubernetes.
De momento nos vamos a centrar en KEDA, puesto que en una evaluación preliminar encontramos que es el candidato más fuerte:
Ahora sólo nos queda hacer una prueba y constatar los puntos antes mencionados.
En mi caso estoy usando un cluster Kubernetes en local, pero cualquier conexión nos valdría.
Necesitamos tener instalado:
$ kubectl cluster-info
Kubernetes control plane is running at https://127.0.0.1:6443
CoreDNS is running at https://127.0.0.1:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
Metrics-server is running at https://127.0.0.1:6443/api/v1/namespaces/kube-system/services/https:metrics-server:https/proxy
$ kubectl version
Client Version: v1.29.2
Kustomize Version: v5.0.4-0.20230601165947-6ce0bf390ce3
Server Version: v1.28.3+k3s2
y helm
$ helm version
version.BuildInfo{Version:"v3.14.1", GitCommit:"e8858f8696b144ee7c533bd9d49a353ee6c4b98d", GitTreeState:"clean", GoVersion:"go1.21.7"}
Para tener cierta visibilidad sobre los Resources Kubernetes, vamos a usar una herramienta muy sencilla pero muy potente que nos muestra los recursos desplegados y el estado de salud que tienen.
git clone https://github.com/benc-uk/kubeview.git
cd kubeview/charts
helm install kubeview ./kubeview -f example-values.yaml -n kubeview --create-namespace
Para probar que está correctamente instalado, levantamos un port-forward:
kubectl port-forward svc/kubeview -n kubeview 8080:80
Y probamos en http://localhost:8080. Si nos vamos al namespace kubeview, podremos ver todos los resources de la propia herramienta.
Antes de nada, tenemos que instalar un cluster de Kafka para poder hacer la prueba. Usando esta documentación con helm:
helm repo add confluentinc https://packages.confluent.io/helm
helm repo update
helm upgrade --install confluent-operator \
confluentinc/confluent-for-kubernetes \
--namespace confluent --create-namespace
Esto despliega el operador:
Ahora tenemos que configurar nuestro cluster, lo mínimo para hacer la prueba que nos ocupa. Para ello, seguimos el quickstart deploy de este git
Una vez instalado el operador, el siguiente paso es instalar el cluster. Pero necesitamos uno muy sencillo, sin ksqlDB, connect y restProxy.
Creamos el fichero confluent-platform-simple.yaml:
apiVersion: platform.confluent.io/v1beta1
kind: Zookeeper
metadata:
name: zookeeper
namespace: confluent
spec:
replicas: 1
image:
application: confluentinc/cp-zookeeper:7.6.0
init: confluentinc/confluent-init-container:2.8.0
dataVolumeCapacity: 10Gi
logVolumeCapacity: 10Gi
podTemplate:
resources:
requests:
cpu: 100m
memory: 256Mi
podSecurityContext:
fsGroup: 1000
runAsUser: 1000
runAsNonRoot: true
apiVersion: platform.confluent.io/v1beta1
kind: Kafka
metadata:
name: kafka
namespace: confluent
spec:
replicas: 1
image:
application: confluentinc/cp-server:7.6.0
init: confluentinc/confluent-init-container:2.8.0
dataVolumeCapacity: 10Gi
configOverrides:
server:
- "confluent.license.topic.replication.factor=1"
- "confluent.metrics.reporter.topic.replicas=1"
- "confluent.tier.metadata.replication.factor=1"
- "confluent.metadata.topic.replication.factor=1"
- "confluent.balancer.topic.replication.factor=1"
- "confluent.security.event.logger.exporter.kafka.topic.replicas=1"
- "event.logger.exporter.kafka.topic.replicas=1"
- "offsets.topic.replication.factor=1"
- "confluent.cluster.link.enable=true"
- "password.encoder.secret=secret"
podTemplate:
resources:
requests:
cpu: 200m
memory: 512Mi
podSecurityContext:
fsGroup: 1000
runAsUser: 1000
runAsNonRoot: true
metricReporter:
enabled: true
apiVersion: platform.confluent.io/v1beta1
kind: ControlCenter
metadata:
name: controlcenter
namespace: confluent
spec:
replicas: 1
image:
application: confluentinc/cp-enterprise-control-center:7.6.0
init: confluentinc/confluent-init-container:2.8.0
dataVolumeCapacity: 10Gi
configOverrides:
server:
- confluent.controlcenter.command.topic.replication=1
- confluent.controlcenter.replication.factor=1
- confluent.metrics.reporter.topic.replicas=1
- confluent.metrics.topic.replication=1
- confluent.monitoring.interceptor.topic.replication=1
- confluent.controlcenter.internal.topics.replication=1
externalAccess:
type: loadBalancer
loadBalancer:
domain: minikube.domain
podTemplate:
resources:
requests:
cpu: 500m
memory: 512Mi
probe:
liveness:
periodSeconds: 10
failureThreshold: 5
timeoutSeconds: 500
podSecurityContext:
fsGroup: 1000
runAsUser: 1000
runAsNonRoot: true
dependencies:
schemaRegistry:
url: http://schemaregistry.confluent.svc.cluster.local:8081
apiVersion: platform.confluent.io/v1beta1
kind: SchemaRegistry
metadata:
name: schemaregistry
namespace: confluent
spec:
replicas: 1
image:
application: confluentinc/cp-schema-registry:7.6.0
init: confluentinc/confluent-init-container:2.8.0
podTemplate:
resources:
requests:
cpu: 100m
memory: 256Mi
podSecurityContext:
fsGroup: 1000
runAsUser: 1000
runAsNonRoot: true
Instalamos los elementos del cluster:
kubectl apply -f confluent-platform-simple.yaml
Al cabo de un rato, comprobamos mediante kubeview (o usando kubectl get pods) que todo está creado y funcionando correctamente.
k get pods
NAME READY STATUS RESTARTS AGE
confluent-operator-d8b8d5b65-tdndj 1/1 Running 0 33m
zookeeper-0 1/1 Running 0 4m55s
kafka-0 1/1 Running 0 3m40s
controlcenter-0 1/1 Running 0 2m25s
schemaregistry-0 1/1 Running 0 2m25s
Comprobamos que podemos acceder al controlcenter:
kubectl port-forward svc/controlcenter -n confluent 9021:9021
Comprobamos en http://localhost:9021
Las características que tiene que cumplir KEDA para poder ser usado en la solución de nuestro reto son las siguientes:
Parece que es un buen candidato para resolver nuestro reto.
Pre-condiciones
Para instalar KEDA, vamos a seguir sus indicaciones de la página de deploy. Hay múltiples formas de instalar KEDA, pero nosotros vamos a utilizar Helm:
helm repo add kedacore https://kedacore.github.io/charts
helm repo update
helm install keda kedacore/keda --namespace keda --create-namespace
En la consola de kubeview, en el namespace keda, podemos ver cuando esté todo levantado:
Ya tenemos instalado KEDA. Ahora vamos a ver su funcionalidad.
Según el reto, los eventos que van a provocar la ejecución de un job son eventos provenientes de un topic de Kafka. Según la documentación, existe un conector que nos permite realizar está acción, veamos cómo hacerlo.
Lo primero es crear un topic que nos sirva como ejemplo de trigger. En el control center, pulsamos en el cluster > topics > + Add Topic
Comprobamos que el topic está correctamente creado:
Una vez creado nuestro topic, procedemos a crear un ScaledJob cuyo trigger sea un topic Kafka llamado test-topic en un consumer group llamado my-group:
Fichero scaledjob.yaml:
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: kafka-scaledjob
namespace: default
spec:
jobTargetRef:
template:
spec:
containers:
- name: kafka
image: confluentinc/cp-kafka:7.6.1
command: [ "/bin/sh", "-c" ]
args:
- echo start;
kafka-console-consumer
--bootstrap-server kafka.confluent.svc.cluster.local:9092
--topic test-topic
--group my-group
--max-messages 1
--property "print.key=true";
echo end!;
restartPolicy: Never
backoffLimit: 4
pollingInterval: 10 # Optional. Default: 30 seconds
maxReplicaCount: 30 # Optional. Default: 100
successfulJobsHistoryLimit: 3 # Optional. Default: 100. How many completed jobs should be kept.
failedJobsHistoryLimit: 2 # Optional. Default: 100. How many failed jobs should be kept.
triggers:
- type: kafka
metadata:
bootstrapServers: kafka.confluent.svc.cluster.local:9092
consumerGroup: my-group # Make sure that this consumer group name is the same one as the one that is consuming topics
topic: test-topic
# Optional
lagThreshold: "50"
offsetResetPolicy: latest
Creamos el objeto en kubernetes:
kubectl apply -f scaledjob.yaml
Comprobamos que se ha creado correctamente. El CRD ScaledJob no aparecerá en kubeview, pero podemos listarlo usando kubectl:
$ kubectl get ScaledJob
NAME MIN MAX TRIGGERS AUTHENTICATION READY ACTIVE PAUSED AGE
kafka-scaledjob 30 kafka True True Unknown 63s
Una vez creado el objeto ScaledJob, tenemos que probar que funciona para nuestro reto. Para ello tenemos que entender su funcionamiento.
Es decir, nuestro job tiene la responsabilidad de consumir un mensaje y el scaledJob se centra en la métrica del lag del consumer group para el topic configurado. Por lo tanto:
Se puede usar cualquier consumer de Kafka para leer un mensaje y procesarlo, por ejemplo Spring Kafka o cualquier cliente. Para la prueba, voy a usar una imagen de Confluent con el script kafka-console-consumer presente y ejecutaré dicho script para que consuma un mensaje cada vez que se ejecute.
echo start;
kafka-console-consumer
--bootstrap-server kafka.confluent.svc.cluster.local:9092
--topic test-topic
--group my-group
--max-messages 1
--property "print.key=true";
echo end!;
Ahora solo nos queda lanzar un mensaje por el topic test-topic, comprobar que se levanta el job por cada mensaje recibido y ejecuta el trabajo que tiene programado.
Comprobamos que el scaledJob está listo y preparado para escuchar mensajes por el topic adecuado:
$ kubectl describe scaledJob kafka-scaledJob
Name: kafka-scaledjob
Namespace: default
Labels: <none>
Annotations: <none>
API Version: keda.sh/v1alpha1
Kind: ScaledJob
Metadata:
Status:
Conditions:
Message: ScaledJob is defined correctly and is ready to scaling
Reason: ScaledJobReady
Status: True
Type: Ready
Message: Scaling is performed because triggers are active
Reason: ScalerActive
Status: True
Type: Active
Status: Unknown
Type: Fallback
Status: Unknown
Type: Paused
Events:
Type Reason Age From Message
... ... ... ... ...
Normal KEDAScalersStarted 2m34s scale-handler Started scalers watch
Normal ScaledJobReady 2m34s keda-operator ScaledJob is ready for scaling
Normal KEDAScalersStarted 2m34s (x3 over 2m34s) scale-handler Scaler kafka is built.
Normal KEDAJobsCreated 4s (x17 over 2m34s) scale-handler Created 0 jobs
Nota: si el topic no existe, encontrarás estos eventos. Al crear el topic, los jobs comenzarán a crearse correctamente.
Normal KEDAScalersStarted 12m scale-handler Started scalers watch
Normal ScaledJobReady 12m keda-operator ScaledJob is ready for scaling
Warning KEDAScalerFailed 12m scale-handler scaler with id 0 not found, len = 0, cache has been probably already invalidated
Normal KEDAScalersStarted 12m (x2 over 12m) scale-handler Scaler kafka is built.
Warning KEDAScalerFailed 7m20s (x31 over 12m) scale-handler expected at least one active partition within the topic 'test-topic'
Normal KEDAJobsCreated 2m20s (x15 over 4m40s) scale-handler Created 0 jobs
Creamos un mensaje cualquiera para comprobar si KEDA detecta el cambio del lag y ejecuta el job. Para ello, pulsamos en el topic > Messages > + Produce a new message to this topic.
Mantenemos el mensaje por defecto y pulsamos produce. Comprobamos que se ha enviado y que el mensaje está en el topic:
Comprobamos en los eventos del ScaledJob que se ha ejecutado un job (ejecutando el comando antes mencionado kubectl describe scaledJob kafka-scaledJob).
Events:
Type Reason Age From Message
... ... ... ... ...
Normal KEDAScalersStarted 6s scale-handler Started scalers watch
Normal ScaledJobReady 6s keda-operator ScaledJob is ready for scaling
Normal KEDAScalersStarted 6s (x3 over 6s) scale-handler Scaler kafka is built.
Normal KEDAJobsCreated 6s (x2 over 6s) scale-handler Created 1 jobs
Comprobamos en kubernetes que, efectivamente, el job se ha creado y se ha ejecutado:
➜ ~ kubectl get jobs
NAME COMPLETIONS DURATION AGE
kafka-scaledjob-tvgtv 1/1 8s 33m
Comprobamos que se ha levantado un pod asociado al job:
➜ ~ kubectl get pods
NAME READY STATUS RESTARTS AGE
kafka-scaledjob-tvgtv-rd5t4 0/1 Completed 0 33m
Y, revisando el log del pod, comprobamos que nuestro mensaje ha sido procesado por el job:
$ kubectl get pods|tail -1|awk '{print $1}'|xargs kubectl logs
start
18 {"ordertime":1497014222380,"orderid":18,"itemid":"Item_184","address":{"city":"Mountain View","state":"CA","zipcode":94041}}
Processed a total of 1 messages
end!
En breve publicaremos un video tutorial con la demostración en vivo de todo lo que os hemos comentado en este artículo. Añadiremos más pruebas para comprobar que los mensajes son procesados siempre por un solo job y algunas cosas más.
Una confusión que hemos tenido con respecto al Scaler Kafka de KEDA es pensar que iba a ser el propio scaler el encargado de leer el mensaje y usar ese mensaje como evento que produce un escalado en el sistema. Esto no es así, el evento externo es el cambio en la métrica del lag de un consumer group en un topic. Es nuestro job el encargado y el responsable de actuar sobre dicho evento, haciendo que el lag disminuya para que el escalado deje de producirse. Este concepto es fundamental en KEDA.
KEDA actúa reaccionando a métricas o eventos externos y su responsabilidad es escalar o desescalar en base a umbrales en métricas. KEDA no realiza acciones, sino que se centra en la recogida de ciertas métricas para reaccionar escalando o desescalando recursos dentro del cluster. KEDA permite la definición de umbrales y métricas complejas más dirigidas a negocio y usa HPA para escalar propiamente el cluster.
A parte de este sencillo ejemplo, KEDA es una herramienta excelente para mantener el gasto de nuestro cluster kubernetes a raya. Según el framework FinOps, una de nuestras labores como ingenieros ajustar al máximo los recursos de nuestra plataforma con los objetivos del negocio. Con KEDA podemos automatizar escalado y desescalados (en muchos casos, partiendo desde cero) de nuestro cluster kubernetes en base a eventos referentes a las solicitudes de uso del mismo.
La cantidad de Scalers que tiene KEDA actualmente nos permite usar disparadores (triggers) procedentes de un gran número de herramientas. Por lo tanto, es una herramienta de obligado uso si queremos cumplir nuestra parte del framework FinOps, además de ser respetuosos con el medio ambiente y haciendo de nuestros diseños paradigmas de la sostenibilidad.
En este vídeo tutorial demostramos las cosas de las que hemos hablado en este artículo, vemos todo funcionando y hacemos algunas pruebas más que no hemos comentado aquí.
Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.
Cuéntanos qué te parece.