¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
dev
Ismail Ahmedov 19/02/2025 Cargando comentarios…
En cada proyecto nos enfrentamos a desafíos que ponen a prueba nuestras habilidades técnicas, nuestra creatividad y nuestra capacidad para adaptarnos a situaciones diferentes. Llevo tantos años en el sector de la ingeniería y estoy tan acostumbrado a todo que ya nada me sorprende. Ya sea por las altas expectativas del cliente, los plazos ajustados o las limitaciones presupuestarias, nos obligan constantemente a diseñar soluciones que no solo satisfagan los requisitos establecidos, sino que también sean confiables y sostenibles a largo plazo.
La mayoría de las empresas en las que he trabajado buscaban ser los primeros en llevar una nueva funcionalidad a sus clientes actuales y poder captar nuevos clientes. Para ello, intentaban lanzar un Producto Mínimo Viable (MVP), una versión inicial funcional que cumpla con los requisitos esenciales que se va mejorando y expandiendo en las diferentes situaciones. A menudo, buscamos implementar la mejor solución posible para cada situación, pero esto no siempre significa optar por la opción más sofisticada o usar la tecnología de turno. Recuerdo diferentes casos en los que tomamos decisiones muy distintas, pero ambas relacionadas con menús que mostraban secciones con el número de ítems en cada una.
Hace mucho tiempo estuve trabajando en un portal de alojamientos rurales. Cada vez que un alojamiento se daba de alta o de baja, era necesario recalcular los totales de alojamientos a nivel de país, provincia y localidad. En ese momento, teníamos un proceso batch que se ejecutaba durante la noche, realizando todos los cálculos necesarios y "precocinado" los datos para que estuvieran listos para un acceso rápido.
Después de cambiar toda la plataforma en menos de un año, nos llegó el requisito de optimizar el proceso batch sin añadir nueva infraestructura y con un tiempo de implementación muy limitado, ya que la empresa era pequeña. No teníamos mucho tiempo para análisis porque podíamos quedarnos sin tiempo para la implementación. La solución que implementamos fue práctica y sencilla: creamos un endpoint que se llamaba desde el backoffice después de cada alta o baja de un alojamiento. Este actualizaba los totales sumando o restando uno en tiempo real. Al no tratarse de una funcionalidad crítica y, por ende, cualquier error sería corregido automáticamente por el batch nocturno, nos pareció suficiente. Se atacaba un problema, no hacíamos mucho más.
Años después, trabajando en un eCommerce con mucho tráfico, me enfrenté a una situación similar. Aunque teníamos mucha más libertad en cuanto a tecnologías, no era completa; no podías usar un lenguaje de programación no aceptado por la empresa. Sin embargo, los desafíos eran significativamente mayores: manejábamos un volumen de tráfico enorme y casos de uso increíblemente complejos. Algunos productos llegaban a agotarse en cuestión de horas, lo que nos obligaba a desarrollar una solución rápida y fiable.
En este caso, decidimos implementar un proceso ETL (Extract, Transform, and Load) que se ejecutaba cada 15 minutos. Este proceso "precocinaba" los datos de los menús, transformándolos y almacenándolos en formato JSON en MongoDB. Esto nos permitió mantener los menús actualizados casi en tiempo real y disponibles para ser consumidos rápidamente por el frontend.
Jugando con mis hijos se me ocurrió una idea: podría resolver el problema de gestionar tareas pesadas (que requieren muchos recursos y/o tiempo para ser procesadas) en una arquitectura basada en eventos (EDA) usando Kafka.
Apunté algunos requisitos para una aplicación de ejemplo:
Empecé a buscar si otras personas se habían topado con esta casuística y me han gustado estos dos artículos: “Dealing with long-running jobs using Apache Kafka” y “How to avoid rebalances and disconnections in Kafka consumers”. Trataban sobre el mismo desafío que quería resolver, así que empecé analizar las sugerencias de los autores de los artículos:
He creado el proyecto con Spring Boot large-tasks-using-kafka. También he añadido docker-compose para poder ejecutar en mi portátil Kafka y Kafka UI y poder hacer las pruebas manuales.
A continuación, he creado también un consumidor básico que ejecuta una tarea pesada y, dependiendo de si ha terminado con éxito o no, pública el estado de la tarea en el topic de estados. Si salta una excepción, el estado de la tarea sería ERROR.
Este es el código inicial:
@Service
@RequiredArgsConstructor
public class TaskConsumer {
public static final String INPUT_TOPIC = "input-topic";
public static final String STATUS_TOPIC = "status-topic";
private final KafkaTemplate<String, String> kafkaTemplate;
private final LargeTaskProcessor taskProcessor;
@KafkaListener(topics = INPUT_TOPIC)
public void consume(ConsumerRecord<String, String> consumerRecord) {
try {
boolean isSuccess = taskProcessor.run();
if (isSuccess) {
this.publishStatusOf(consumerRecord.value(), "FINISHED");
} else {
this.publishStatusOf(consumerRecord.value(), "FAILED");
}
} catch (Exception e) {
this.publishStatusOf(consumerRecord.value(), "ERROR");
throw new RuntimeException(e);
}
}
private void publishStatusOf(String value, String status) {
String statusString = String.format("The status of task %s is %s", value, status);
kafkaTemplate.send(STATUS_TOPIC, statusString);
}
}
Para la prueba de integración usaría Testcontainers. La principal razón de esta elección es que Testcontainers ofrece un entorno realista, porque ejecuta un contenedor real de Docker con una instancia completa de Kafka. Por otro lado, EmbeddedKafka está incluido en spring-kafka-test y es una librería que proporciona un broker de Kafka en memoria, pero no tiene todas las funcionalidades de un Kafka real.
Este es el código de mi primer test:
@Import(TestcontainersConfiguration.class)
@SpringBootTest
class TaskConsumerTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Test
void testLargeTaskProcessing() {
String task = "1";
String expectedMessage = String.format("The status of task %s is %s", task, "FINISHED");
kafkaTemplate.send(TaskConsumer.INPUT_TOPIC, task);
Consumer<String, String> testConsumer = consumerFactory.createConsumer();
testConsumer.subscribe(List.of(TaskConsumer.STATUS_TOPIC));
ConsumerRecord<String, String> received = KafkaTestUtils.getSingleRecord(testConsumer, TaskConsumer.STATUS_TOPIC, Duration.ofSeconds(30L));
assertThat(received.value()).isEqualTo(expectedMessage);
}
}
Basándome en mis requisitos, el código estaba optimizado y no se podía separar, así que las primeras dos sugerencias se descartaron.
Teniendo en cuenta en el requisito 3, tampoco podía “jugar” con la propiedad max.poll.interval.ms del consumidor de Kafka, así que también debemos descartar la sugerencia 3.
Como no quería estar esperando mucho tiempo para las pruebas del requisito 3, he bajado el valor max.poll.interval.ms a 2 segundos, ya que el valor por defecto es de 5 minutos y era demasiado alto para mis pruebas.
spring:
application:
name: large-tasks-using-kafka
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: group-id
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
max.poll.interval.ms: 2000
Si la tarea pesada se demoraba más de 2 segundos en completar, Kafka empezaba hacer rebalanceo de los consumidores porque no sabía si el consumidor seguía “vivo”.
Ejecutamos el test de integración y vemos que finaliza correctamente, pero también vemos los siguientes errores:
El error indica que hay un problema de sincronización entre el consumidor y el coordinador de grupo dentro de un grupo de consumidores.
El error ha sido provocado, ya que necesitábamos simular que el consumidor tarda en procesar mensajes más tiempo que el max.poll.interval.ms y como no llama a poll() dentro de este tiempo, se desconecta del grupo.
Vamos a verlo mejor ejecutando la aplicación. Primero arrancamos Kafka y Kafka UI con Docker con el siguiente comando:
docker compose up
Bajamos el nivel de log del consumidor de Kafka a DEBUG y ejecutamos la aplicación.
Abrimos Kafka UI en nuestro navegador favorito introduciendo http://localhost:8989/ en la barra de direcciones. Vemos que el input-topic ya está creado y publicamos un mensaje:
Revisamos el status-topic y vemos que el consumidor ha entrado en un bucle infinito y está procesando el mensaje sin parar:
Como nuestra aplicación y Kafka se ejecutan en diferentes servidores, ¿cómo puede saber Kafka si un consumidor sigue funcionando? Con el heartbeat, que hace peticiones a Kafka para que le vaya diciendo que está funcionando. Pero si una tarea tarda demasiado en procesarse (el tiempo de su procesamiento excede max.poll.interval.ms), en ese caso, tal vez el coordinador debería expulsar al consumidor del grupo para darle sus particiones al resto de los consumidores (hacer rebalanceo), de modo que el procesamiento pueda continuar. Esto es lo que pasa en nuestro caso.
El caso de usar hilos con enfoque "fire-and-forget" no nos vale, lo que pasamos a la solución 5: pausar al consumidor para que pueda seguir haciendo poll(), procesar nuestro proceso pesado en un hilo diferente y usar callbacks.
Hacemos los cambios:
@Service
@RequiredArgsConstructor
public class TaskConsumer {
public static final String INPUT_TOPIC = "input-topic";
public static final String STATUS_TOPIC = "status-topic";
public static final String CONTAINER_ID = "pausable-consumer";
private final KafkaTemplate<String, String> kafkaTemplate;
private final LargeTaskProcessor taskProcessor;
private final KafkaContainerService kafkaContainerService;
private final AsyncTaskExecutor executor;
@KafkaListener(id = CONTAINER_ID, topics = INPUT_TOPIC)
public void consume(ConsumerRecord<String, String> consumerRecord) {
kafkaContainerService.pauseConsume(CONTAINER_ID);
executor.submitCompletable(() -> taskProcessor.run(consumerRecord.value()))
.whenComplete((isSuccess, exception) -> {
if (Objects.isNull(exception)) {
if (Boolean.TRUE.equals(isSuccess)) {
this.publishStatusOf(consumerRecord.value(), "FINISHED");
} else {
this.publishStatusOf(consumerRecord.value(), "FAILED");
}
} else {
this.publishStatusOf(consumerRecord.value(), "ERROR");
}
kafkaContainerService.resumeConsumer(CONTAINER_ID);
});
}
private void publishStatusOf(String value, String status) {
String statusString = String.format("The status of task %s is %s", value, status);
kafkaTemplate.send(STATUS_TOPIC, statusString);
}
}
Ejecutamos la aplicación y publicamos un mensaje en el input-topic. Vemos que el mensaje se consume solo una vez. Con esto hemos cumplido los primeros cinco requisitos y vamos a probar el sexto: el orden de las tareas con el mismo ID se debe respetar, es decir, que la tarea 3 debe terminar después de tarea 2.
Paramos la ejecución de la aplicación y configuramos que la tarea 2 debe demorarse más que la tarea 3. Para ello, modificamos LargeTaskProcessor:
@Service
public class LargeTaskProcessor {
public boolean run(String task) throws InterruptedException {
int seconds = (task.equals("2")) ? 10 : 5;
Thread.sleep(Duration.ofSeconds(seconds));
return true;
}
}
Publicamos 5 mensajes en el input-topic con values: 1, 2, 3, 4 y 5.
Ejecutamos la aplicación y revisamos el status-topic:
Podemos ver que el orden de las tareas no se ha conservado. Esto significa que debemos procesar secuencialmente los mensajes. Esto no es mala idea, porque las tareas “pesadas” usan mucho recursos y si las procesamos en secuencia, haremos que terminen más rápido usando todos los recursos. Para conseguir esto, debemos decir al consumidor que debe leer los mensajes uno a uno. Esto se consigue con la propiedad de Kafka max.poll.records, que debemos establecer en 1.
kafka:
# ...
properties:
max.poll.interval.ms: 2000
max.poll.records: 1
Reiniciamos la aplicación después de los cambios y publicamos de nuevo los 5 mensajes en el input-topic con values: 1, 2, 3, 4 y 5 (como el consumer group es el mismo, no se consumen los mensajes anteriores).
Revisamos Kafka UI:
Et voilà! Ya tenemos implementados todos los requisitos.
Ahora vamos a revisar las propiedades del consumidor:
Vemos que el group-id tiene el valor "pausable-consumer", que es el valor que establecimos para CONTAINER_ID. Si revisamos las propiedades del consumidor, habíamos establecido que el group-id: group-id, que no es un valor muy recomendable… Vamos a cambiarlo a group-id: my-consumer-group
Si revisamos la documentación de @KafkaListener, veremos que a partir de la versión 2.0, la propiedad id (si está presente) se utiliza como la propiedad group.id del consumidor de Kafka, sobrescribiendo la propiedad configurada, si está presente. Para poder restaurar el comportamiento anterior, debemos establecer explícitamente groupId o configurar idIsGroup en false.
El groupId se utiliza para el valor de Consumer Group ID, que se puede utilizar en el Access Control Lists (ACL) de Kafka. Como tenemos el groupId configurado en nuestro fichero de configuración, vamos a usar el segundo caso, estableciendo idIsGroup en false.
@KafkaListener(id = CONTAINER_ID, topics = INPUT_TOPIC, idIsGroup = false)
public void consume(ConsumerRecord<String, String> consumerRecord) {
// ...
}
Vamos a ejecutar el test de integración y revisar los logs:
Vemos que se crean hilos para procesar cada una de las tareas. Como tenemos el requisito de que el orden de las tareas se debe respetar, debemos procesar secuencialmente los mensajes. Podemos usar solo un ejecutor con un hilo para poder ejecutar en él todas las tareas.
El cambio es muy fácil: en lugar de esperar que Spring nos inyecte el ThreadPoolTaskExecutor que implementa AsyncTaskExecutor, nosotros le crearemos una instancia de TaskExecutorAdapter con ejecutor con un hilo:
AsyncTaskExecutor executor = new TaskExecutorAdapter(Executors.newSingleThreadExecutor());
Ejecutamos de nuevo el test de integración y vemos que ahora todas las tareas se ejecutan en el mismo hilo:
Como estamos con Java 21, podemos usar los Virtual Threads. Para hacer esto, pasamos como parámetro el Executors.newVirtualThreadPerTaskExecutor() a TaskExecutorAdapter:
AsyncTaskExecutor executor = new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
Si ejecutamos el test de integración, veremos que ya usamos los Virtual Threads:
Habitualmente, las tareas pesadas se implementan como procesos batch, es lo natural. Pero en este caso tenía la curiosidad si se podía hacer de otra manera, sin complicarme y usando todo lo que hay en una arquitectura orientada a eventos. La técnica de pausar el consumidor con hilos y callbacks ofrece una solución efectiva para poder evitar el rebalanceo. Podéis ver todo el código con los commits por pasos en el repositorio GitHub: large-tasks-using-kafka.
Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.
Cuéntanos qué te parece.