En cada proyecto nos enfrentamos a desafíos que ponen a prueba nuestras habilidades técnicas, nuestra creatividad y nuestra capacidad para adaptarnos a situaciones diferentes. Llevo tantos años en el sector de la ingeniería y estoy tan acostumbrado a todo que ya nada me sorprende. Ya sea por las altas expectativas del cliente, los plazos ajustados o las limitaciones presupuestarias, nos obligan constantemente a diseñar soluciones que no solo satisfagan los requisitos establecidos, sino que también sean confiables y sostenibles a largo plazo.

La mayoría de las empresas en las que he trabajado buscaban ser los primeros en llevar una nueva funcionalidad a sus clientes actuales y poder captar nuevos clientes. Para ello, intentaban lanzar un Producto Mínimo Viable (MVP), una versión inicial funcional que cumpla con los requisitos esenciales que se va mejorando y expandiendo en las diferentes situaciones. A menudo, buscamos implementar la mejor solución posible para cada situación, pero esto no siempre significa optar por la opción más sofisticada o usar la tecnología de turno. Recuerdo diferentes casos en los que tomamos decisiones muy distintas, pero ambas relacionadas con menús que mostraban secciones con el número de ítems en cada una.

Hace mucho tiempo estuve trabajando en un portal de alojamientos rurales. Cada vez que un alojamiento se daba de alta o de baja, era necesario recalcular los totales de alojamientos a nivel de país, provincia y localidad. En ese momento, teníamos un proceso batch que se ejecutaba durante la noche, realizando todos los cálculos necesarios y "precocinado" los datos para que estuvieran listos para un acceso rápido.

Después de cambiar toda la plataforma en menos de un año, nos llegó el requisito de optimizar el proceso batch sin añadir nueva infraestructura y con un tiempo de implementación muy limitado, ya que la empresa era pequeña. No teníamos mucho tiempo para análisis porque podíamos quedarnos sin tiempo para la implementación. La solución que implementamos fue práctica y sencilla: creamos un endpoint que se llamaba desde el backoffice después de cada alta o baja de un alojamiento. Este actualizaba los totales sumando o restando uno en tiempo real. Al no tratarse de una funcionalidad crítica y, por ende, cualquier error sería corregido automáticamente por el batch nocturno, nos pareció suficiente. Se atacaba un problema, no hacíamos mucho más.

Años después, trabajando en un eCommerce con mucho tráfico, me enfrenté a una situación similar. Aunque teníamos mucha más libertad en cuanto a tecnologías, no era completa; no podías usar un lenguaje de programación no aceptado por la empresa. Sin embargo, los desafíos eran significativamente mayores: manejábamos un volumen de tráfico enorme y casos de uso increíblemente complejos. Algunos productos llegaban a agotarse en cuestión de horas, lo que nos obligaba a desarrollar una solución rápida y fiable.

Imagen donde se muestra un pantallazo de una web retail con varios productos en venta para añadir al carrito

En este caso, decidimos implementar un proceso ETL (Extract, Transform, and Load) que se ejecutaba cada 15 minutos. Este proceso "precocinaba" los datos de los menús, transformándolos y almacenándolos en formato JSON en MongoDB. Esto nos permitió mantener los menús actualizados casi en tiempo real y disponibles para ser consumidos rápidamente por el frontend.

Jenga

Los requisitos del reto

Jugando con mis hijos se me ocurrió una idea: podría resolver el problema de gestionar tareas pesadas (que requieren muchos recursos y/o tiempo para ser procesadas) en una arquitectura basada en eventos (EDA) usando Kafka.

Apunté algunos requisitos para una aplicación de ejemplo:

  1. El código de la tarea pesada es óptimo y no se puede mejorar.
  2. La tarea no se puede separar. Si se pudiera separar en subtareas, incluso si cada siguiente subtarea dependiera del estado de la anterior, usaría Cadence o Temporal para gestionarlas como workflows.
  3. Las tareas no tienen una duración máxima, pueden durar más de una hora.
  4. El input de la tarea se recibe de un topic.
  5. El estado de la tarea (FINISHED o FAILED) se publica en el topic de estados.
  6. El orden de las tareas con el mismo ID se debe respetar, es decir, que la tarea 3 debe terminar después de la tarea 2.

Empecé a buscar si otras personas se habían topado con esta casuística y me han gustado estos dos artículos: “Dealing with long-running jobs using Apache Kafka” y “How to avoid rebalances and disconnections in Kafka consumers”. Trataban sobre el mismo desafío que quería resolver, así que empecé analizar las sugerencias de los autores de los artículos:

  1. Optimizar el código.
  2. Dividir la tarea en microprocesos.
  3. Aumentar el tiempo de espera.
  4. Hilos con enfoque "fire-and-forget".
  5. Pausar el consumidor con hilos y callbacks.

Manos a la obra

He creado el proyecto con Spring Boot large-tasks-using-kafka. También he añadido docker-compose para poder ejecutar en mi portátil Kafka y Kafka UI y poder hacer las pruebas manuales.

A continuación, he creado también un consumidor básico que ejecuta una tarea pesada y, dependiendo de si ha terminado con éxito o no, pública el estado de la tarea en el topic de estados. Si salta una excepción, el estado de la tarea sería ERROR.

Este es el código inicial:

@Service
@RequiredArgsConstructor
public class TaskConsumer {

   public static final String INPUT_TOPIC = "input-topic";
   public static final String STATUS_TOPIC = "status-topic";

   private final KafkaTemplate<String, String> kafkaTemplate;

   private final LargeTaskProcessor taskProcessor;

   @KafkaListener(topics = INPUT_TOPIC)
   public void consume(ConsumerRecord<String, String> consumerRecord) {
       try {
           boolean isSuccess = taskProcessor.run();
           if (isSuccess) {
               this.publishStatusOf(consumerRecord.value(), "FINISHED");
           } else {
               this.publishStatusOf(consumerRecord.value(), "FAILED");
           }
       } catch (Exception e) {
           this.publishStatusOf(consumerRecord.value(), "ERROR");
           throw new RuntimeException(e);
       }
   }

   private void publishStatusOf(String value, String status) {
       String statusString = String.format("The status of task %s is %s", value, status);
       kafkaTemplate.send(STATUS_TOPIC, statusString);
   }

}

La prueba de integración

Para la prueba de integración usaría Testcontainers. La principal razón de esta elección es que Testcontainers ofrece un entorno realista, porque ejecuta un contenedor real de Docker con una instancia completa de Kafka. Por otro lado, EmbeddedKafka está incluido en spring-kafka-test y es una librería que proporciona un broker de Kafka en memoria, pero no tiene todas las funcionalidades de un Kafka real.

Este es el código de mi primer test:

@Import(TestcontainersConfiguration.class)
@SpringBootTest
class TaskConsumerTest {


   @Autowired
   private KafkaTemplate<String, String> kafkaTemplate;


   @Autowired
   private ConsumerFactory<String, String> consumerFactory;


   @Test
   void testLargeTaskProcessing() {
       String task = "1";
       String expectedMessage = String.format("The status of task %s is %s", task, "FINISHED");


       kafkaTemplate.send(TaskConsumer.INPUT_TOPIC, task);


       Consumer<String, String> testConsumer = consumerFactory.createConsumer();
       testConsumer.subscribe(List.of(TaskConsumer.STATUS_TOPIC));
       ConsumerRecord<String, String> received = KafkaTestUtils.getSingleRecord(testConsumer, TaskConsumer.STATUS_TOPIC, Duration.ofSeconds(30L));


       assertThat(received.value()).isEqualTo(expectedMessage);
   }
}

Implementando los requisitos

Basándome en mis requisitos, el código estaba optimizado y no se podía separar, así que las primeras dos sugerencias se descartaron.

Teniendo en cuenta en el requisito 3, tampoco podía “jugar” con la propiedad max.poll.interval.ms del consumidor de Kafka, así que también debemos descartar la sugerencia 3.

Como no quería estar esperando mucho tiempo para las pruebas del requisito 3, he bajado el valor max.poll.interval.ms a 2 segundos, ya que el valor por defecto es de 5 minutos y era demasiado alto para mis pruebas.

spring:
 application:
   name: large-tasks-using-kafka
 kafka:
   bootstrap-servers: localhost:9092
   consumer:
     group-id: group-id
     auto-offset-reset: earliest
     enable-auto-commit: false
     key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
     value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
   properties:
     max.poll.interval.ms: 2000

Si la tarea pesada se demoraba más de 2 segundos en completar, Kafka empezaba hacer rebalanceo de los consumidores porque no sabía si el consumidor seguía “vivo”.
Ejecutamos el test de integración y vemos que finaliza correctamente, pero también vemos los siguientes errores:

2024-11-18T05:57:58.159+01:00 INFO 1060293 --- [large-tasks-using-kafka] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-id-1, groupId-group-id] Failing OffsetCommit request since the consumer is not part of an active group 2024-11-18T05:57:58.159+01:00 ERROR 1060293 -- [large-tasks-using-kafka] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer ‹ clientId=consumer-group-id-1, groupId=group-id] LeaveGroup request with Generation{generationId=1, 2 < member Id='consumer-group-id-1-bec5067a-76b6-4f6b-ba59-bff1b89672c3', protocol='range'} failed with error: The coordinator is not aware of this › member.

El error indica que hay un problema de sincronización entre el consumidor y el coordinador de grupo dentro de un grupo de consumidores.

El error ha sido provocado, ya que necesitábamos simular que el consumidor tarda en procesar mensajes más tiempo que el max.poll.interval.ms y como no llama a poll() dentro de este tiempo, se desconecta del grupo.

Vamos a verlo mejor ejecutando la aplicación. Primero arrancamos Kafka y Kafka UI con Docker con el siguiente comando:

docker compose up

Bajamos el nivel de log del consumidor de Kafka a DEBUG y ejecutamos la aplicación.
Abrimos Kafka UI en nuestro navegador favorito introduciendo http://localhost:8989/ en la barra de direcciones. Vemos que el input-topic ya está creado y publicamos un mensaje:

produce message

Revisamos el status-topic y vemos que el consumidor ha entrado en un bucle infinito y está procesando el mensaje sin parar:

pantallazo donde se muestra el status topic en la herramienta

¿Por qué pasa esto?

Como nuestra aplicación y Kafka se ejecutan en diferentes servidores, ¿cómo puede saber Kafka si un consumidor sigue funcionando? Con el heartbeat, que hace peticiones a Kafka para que le vaya diciendo que está funcionando. Pero si una tarea tarda demasiado en procesarse (el tiempo de su procesamiento excede max.poll.interval.ms), en ese caso, tal vez el coordinador debería expulsar al consumidor del grupo para darle sus particiones al resto de los consumidores (hacer rebalanceo), de modo que el procesamiento pueda continuar. Esto es lo que pasa en nuestro caso.

esquema de comunicación del heartbeat

El caso de usar hilos con enfoque "fire-and-forget" no nos vale, lo que pasamos a la solución 5: pausar al consumidor para que pueda seguir haciendo poll(), procesar nuestro proceso pesado en un hilo diferente y usar callbacks.

Hacemos los cambios:

@Service
@RequiredArgsConstructor
public class TaskConsumer {


   public static final String INPUT_TOPIC = "input-topic";
   public static final String STATUS_TOPIC = "status-topic";
   public static final String CONTAINER_ID = "pausable-consumer";


   private final KafkaTemplate<String, String> kafkaTemplate;
   private final LargeTaskProcessor taskProcessor;
   private final KafkaContainerService kafkaContainerService;
   private final AsyncTaskExecutor executor;


   @KafkaListener(id = CONTAINER_ID, topics = INPUT_TOPIC)
   public void consume(ConsumerRecord<String, String> consumerRecord) {
       kafkaContainerService.pauseConsume(CONTAINER_ID);


       executor.submitCompletable(() -> taskProcessor.run(consumerRecord.value()))
           .whenComplete((isSuccess, exception) -> {
               if (Objects.isNull(exception)) {
                   if (Boolean.TRUE.equals(isSuccess)) {
                       this.publishStatusOf(consumerRecord.value(), "FINISHED");
                   } else {
                       this.publishStatusOf(consumerRecord.value(), "FAILED");
                   }
               } else {
                   this.publishStatusOf(consumerRecord.value(), "ERROR");
               }
               kafkaContainerService.resumeConsumer(CONTAINER_ID);
           });
   }


   private void publishStatusOf(String value, String status) {
       String statusString = String.format("The status of task %s is %s", value, status);
       kafkaTemplate.send(STATUS_TOPIC, statusString);
   }


}

Ejecutamos la aplicación y publicamos un mensaje en el input-topic. Vemos que el mensaje se consume solo una vez. Con esto hemos cumplido los primeros cinco requisitos y vamos a probar el sexto: el orden de las tareas con el mismo ID se debe respetar, es decir, que la tarea 3 debe terminar después de tarea 2.

El orden de las tareas con el mismo ID se debe respetar

Paramos la ejecución de la aplicación y configuramos que la tarea 2 debe demorarse más que la tarea 3. Para ello, modificamos LargeTaskProcessor:

@Service
public class LargeTaskProcessor {


   public boolean run(String task) throws InterruptedException {
       int seconds = (task.equals("2")) ? 10 : 5;
       Thread.sleep(Duration.ofSeconds(seconds));


       return true;
   }
}

Publicamos 5 mensajes en el input-topic con values: 1, 2, 3, 4 y 5.
Ejecutamos la aplicación y revisamos el status-topic:

pantallazo del status topic

Podemos ver que el orden de las tareas no se ha conservado. Esto significa que debemos procesar secuencialmente los mensajes. Esto no es mala idea, porque las tareas “pesadas” usan mucho recursos y si las procesamos en secuencia, haremos que terminen más rápido usando todos los recursos. Para conseguir esto, debemos decir al consumidor que debe leer los mensajes uno a uno. Esto se consigue con la propiedad de Kafka max.poll.records, que debemos establecer en 1.

kafka:
  # ...
 properties:
   max.poll.interval.ms: 2000
   max.poll.records: 1

Reiniciamos la aplicación después de los cambios y publicamos de nuevo los 5 mensajes en el input-topic con values: 1, 2, 3, 4 y 5 (como el consumer group es el mismo, no se consumen los mensajes anteriores).

Revisamos Kafka UI:

Pantallazo del status topic en kafka ui

Et voilà! Ya tenemos implementados todos los requisitos.

Establecer el group ID

Ahora vamos a revisar las propiedades del consumidor:

pantallazo de input topic

Vemos que el group-id tiene el valor "pausable-consumer", que es el valor que establecimos para CONTAINER_ID. Si revisamos las propiedades del consumidor, habíamos establecido que el group-id: group-id, que no es un valor muy recomendable… Vamos a cambiarlo a group-id: my-consumer-group

¿Cómo es posible esto?

Si revisamos la documentación de @KafkaListener, veremos que a partir de la versión 2.0, la propiedad id (si está presente) se utiliza como la propiedad group.id del consumidor de Kafka, sobrescribiendo la propiedad configurada, si está presente. Para poder restaurar el comportamiento anterior, debemos establecer explícitamente groupId o configurar idIsGroup en false.

¿En que puede afectar esto?

El groupId se utiliza para el valor de Consumer Group ID, que se puede utilizar en el Access Control Lists (ACL) de Kafka. Como tenemos el groupId configurado en nuestro fichero de configuración, vamos a usar el segundo caso, estableciendo idIsGroup en false.

@KafkaListener(id = CONTAINER_ID, topics = INPUT_TOPIC, idIsGroup = false)
public void consume(ConsumerRecord<String, String> consumerRecord) {
   // ...
}

Ejecutar las tareas pesadas en un único hilo

Vamos a ejecutar el test de integración y revisar los logs:

logs después de ejecutar el test de integración

Vemos que se crean hilos para procesar cada una de las tareas. Como tenemos el requisito de que el orden de las tareas se debe respetar, debemos procesar secuencialmente los mensajes. Podemos usar solo un ejecutor con un hilo para poder ejecutar en él todas las tareas.

El cambio es muy fácil: en lugar de esperar que Spring nos inyecte el ThreadPoolTaskExecutor que implementa AsyncTaskExecutor, nosotros le crearemos una instancia de TaskExecutorAdapter con ejecutor con un hilo:

AsyncTaskExecutor executor = new TaskExecutorAdapter(Executors.newSingleThreadExecutor());

Ejecutamos de nuevo el test de integración y vemos que ahora todas las tareas se ejecutan en el mismo hilo:

pantallazo del segundo test de integración

Virtual Threads

Como estamos con Java 21, podemos usar los Virtual Threads. Para hacer esto, pasamos como parámetro el Executors.newVirtualThreadPerTaskExecutor() a TaskExecutorAdapter:

AsyncTaskExecutor executor = new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());

Si ejecutamos el test de integración, veremos que ya usamos los Virtual Threads:

resultados del test de integración con virtual threads

Conclusión

Habitualmente, las tareas pesadas se implementan como procesos batch, es lo natural. Pero en este caso tenía la curiosidad si se podía hacer de otra manera, sin complicarme y usando todo lo que hay en una arquitectura orientada a eventos. La técnica de pausar el consumidor con hilos y callbacks ofrece una solución efectiva para poder evitar el rebalanceo. Podéis ver todo el código con los commits por pasos en el repositorio GitHub: large-tasks-using-kafka.

Enlaces de interés

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete