¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
Conoce nuestra marca.¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
Conoce nuestra marca.dev
Iñaki García de Blas 30/01/2023 Cargando comentarios…
En esta ocasión me gustaría ampliar el conocimiento de un post anterior de mi compañero Raúl Martínez, sobre Testcontainers, entornos de testing efímeros, en el que nos presenta una bonita introducción a Testcontainers, lo que me permitirá ser más directo.
Una vez hemos construido las pruebas unitarias de nuestro sistema, llega el momento de trabajar en el siguiente piso de la pirámide de los test, los test de integración.
A la hora de realizar test de integración necesitamos piezas y herramientas para observar cómo se comporta nuestro desarrollo en un entorno real. Dicho entorno ha de estar siempre disponible y las pruebas no han de interferir entre sí, lo que supone un reto. Dichos entornos pueden resultar en máquinas virtuales o físicas con determinados servicios en versiones y configuraciones concretas.
Para ello, y en el caso concreto de Kafka podemos contar con la herramienta EmbeddedKafka, que nos ayuda a crear test potentes con un Kafka autocontenido que nos independiza de un servicio externo.
Sin embargo, nos vamos a decantar por Testcontainers, principalmente por los siguientes motivos:
Vamos a proponer un escenario con dos microservicios que interactúan con un Kafka. Uno de ellos, va a producir un mensaje en un topic de Kafka; y el otro, se va a suscribir y procesar el mismo.
Para mantener el escenario lo más sencillo posible, el mensaje va a ser una simple cadena de texto. De esta manera, nos centraremos en aprender a probar cada uno de los sistemas en un entorno real.
Crear test de integración para microservicios que trabajan con Kafka con las siguientes características:
En resumen, queremos combinar las ventajas que nos aporta trabajar con una herramienta como lo es jUnit y las que nos aporta trabajar con contenedores.
Vamos a continuar con la premisa de hacer las cosas lo más fácil posible. Para ello vamos a crear dos microservicios utilizando Spring Boot con Spring for Kafka y las extensiones de Testcontainers.
Añadiremos las dependencias de Testcontainers. De esta manera podemos generar contenedores con las herramientas que necesitamos en nuestros escenarios, en este caso Kafka.
Por otro lado, integramos el ciclo de vida de dichos contenedores a nuestras pruebas junto a jUnit para poder valernos de sus capacidades de validación.
EventProducer.java
@Component
@Slf4j
public class EventProducer {
@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate;
public void sendEvent(String event) {
int key = (int)System.currentTimeMillis();
try {
kafkaTemplate.sendDefault(key, event).get();
} catch (...) {
…
}
}
}
application.yaml
spring:
profiles:
active: dev
---
spring:
profiles: dev
kafka:
producer:
bootstrap-servers: localhost:29092
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
template:
default-topic: example-topic
En el paquete de test (src/test) vamos a dar de alta nuestros casos de prueba, configuración de los contenedores y un consumidor de prueba.
TestBeansConfigurer.java
Aquí preparamos un consumidor que utilizaremos únicamente para los casos de prueba. De esta manera trabajamos de forma desacoplada, dicho consumidor va a hacer las veces de otro microservicio en nuestros test.
@Configuration
public class TestBeansConfigurer {
@Autowired
KafkaProperties properties;
private static final String TOPIC_NAME= "example-topic";
@Bean
Consumer<String, String> testConsumer() {
final Consumer<String, String> consumer =
new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(),
StringDeserializer::new, StringDeserializer::new).createConsumer();
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
return consumer;
}
}
AbstractIntegrationTest.java
Donde vamos a configurar y lanzar los contenedores que necesitemos.
...
import org.junit.jupiter.api.TestInstance;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
...
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
...
@SpringBootTest
@Testcontainers
@TestInstance(TestInstance.Lifecycle.PER_METHOD)
@ActiveProfiles("test")
public abstract class AbstractIntegrationTest {
static KafkaContainer kafkaContainer =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
kafkaContainer.start();
registry.add("spring.kafka.properties.bootstrap.servers", kafkaContainer::getBootstrapServers);
registry.add("spring.kafka.consumer.properties.auto.offset.reset", () -> "earliest");
}
}
EventProducerApplicationConsumerIt.java
Donde definimos un caso de prueba muy simple. En él vamos a hacer un POST a nuestro endpoint, que a su vez publicará la cadena de texto en Kafka. Finalmente, mediante el “test consumer” podemos verificar que aquel que escuche recibe el mensaje esperado.
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class EventProducerApplicationProducerIt extends AbstractIntegrationTest {
private String topicName = "example-topic";
@Autowired
TestRestTemplate restTemplate;
@Autowired
Consumer<String, String> testConsumer;
@Test
void produce() {
// GIVEN
String bodyContent = "body";
HttpEntity<String> request = new HttpEntity<String>(bodyContent);
// WHEN
ResponseEntity<String> res = restTemplate.exchange("/generate", HttpMethod.POST, request, String.class);
// THEN
assertEquals(HttpStatus.CREATED, res.getStatusCode());
ConsumerRecord<String, String> resultRecord = KafkaTestUtils.getSingleRecord(testConsumer, topicName, 10000);
assertEquals(bodyContent, resultRecord.value());
}
}
application.yaml
En la configuración de nuestros casos de prueba añadimos el consumidor que nos ayuda a verificar el código productivo.
spring:
profiles:
active: test
---
spring:
profiles: test
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
template:
default-topic: example-topic
consumer:
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: example-group
El consumidor espera un mensaje en el topic de Kafka para procesarlo.
EventConsumer.java
En el consumidor tenemos una clase “EventRecorderService”,
lógica de negocio aplicada a cada uno de los eventos que recibimos. En este caso algo tan simple como escribirlo en el log.
@Component
@RequiredArgsConstructor
public class EventConsumer {
private final EventRecorderService eventRecorderService;
@KafkaListener(topics = {"example-topic"})
public void onMessage(ConsumerRecord<Integer,String> record) {
eventRecorderService.save(record.value());
}
}
application.yaml
spring:
profiles:
active: dev
---
spring:
profiles: dev
kafka:
consumer:
bootstrap-servers: localhost:29092
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: example-group
template:
default-topic: example-topic
EventProducerApplicationConsumerIt.java
En el caso de prueba vamos a generar un producto, cuya implementación podemos clonar del caso anterior. Con él vamos a publicar un evento en el contenedor de Kafka.
El consumidor va a recibir el mensaje y va a delegar su procesado al servicio “EventRecorderService
”. Podemos hacer un “Spy” sobre dicho servicio y verificar que lo estamos invocando con el mismo contenido que hemos publicado.
@SpringBootTest
class EventProducerApplicationConsumerIt extends AbstractIntegrationTest {
@Autowired
EventProducer eventProducer;
@SpyBean
EventRecorderService eventRecorderService;
@Test
void consume() throws InterruptedException {
// GIVEN
String bodyContent = "body";
// WHEN
eventProducer.sendEvent(bodyContent);
// THEN
await().atMost(FIVE_SECONDS).untilAsserted(() -> {
verify(eventRecorderService, times(1)).save(bodyContent);
});
}
}
application.yaml
spring:
profiles:
active: test
---
spring:
profiles: test
kafka:
consumer:
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: example-group
producer:
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
template:
default-topic: example-topic
Hemos sido capaces de probar microservicios que trabajen con Kafka. Además, podemos realizar las mismas pruebas en cualquier implementación de Kafka (publish/subscriber, stream de datos…). Mediante contenedores de Kafka podemos probar cómo interactuamos o reaccionamos a los eventos que en él se publican.
El entorno creado para los casos de prueba se está realizando con la misma pieza de software y con la misma versión que en los entornos productivos, la misma ventaja que nos proporciona trabajar con contenedores. Algo que no podríamos lograr al 100% con una versión embebida.
Es posible replicar el mismo esquema de pruebas con otras herramientas que puedan trabajar como pub/sub de eventos, tales como AMQ, Google Pubsub, Redis… inicializando y configurando sus contenedores.
Como sabemos que los mensajes que vamos a intercambiar, siempre van a ser estructuras complejas, normalmente cuando trabajamos con Kafka definimos un esquema que nuestros datos deben cumplir. Nuevamente, quiero mencionar el siguiente post de mi compañera Noelia Martín en el que se nos presenta Schema Registry y cómo gobernar eventos con Schema Registry y Avro.
Volviendo a nuestro tema principal, el de crear pruebas de integración, es posible configurar un Schema Registry mediante Testcontainers como si el de producción se tratase. No obstante, podemos simplemente utilizar un endpoint mock si no necesitamos mucho más.
Solo tenemos que configurar la propiedad “schema.registry.url” de la siguiente manera:
spring:
kafka:
....
client-id: ${Client_id}
....
properties:
....
schema.registry.url: mock://testUrl
auto.register.schemas: true
specific.avro.reader: true
....
Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.
Cuéntanos qué te parece.