¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
Conoce nuestra marca.¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
Conoce nuestra marca.dev
Jesús Pau de la Cruz 01/12/2022 Cargando comentarios…
En la serie de artículos donde hablamos de soluciones de streaming sobre Kafka, en un artículo nos centramos en ksqlDB: vimos su historia, arquitectura y sus principales características, destacando su valor como una herramienta sencilla y cómoda para simplificar la construcción de aplicaciones para el procesamiento de flujos de datos dentro del ecosistema Confluent.
Por otro lado, también hemos hablado sobre tecnologías CDC, como Debezium, Golden Gate o los conectores de Confluent, que permiten implementar soluciones reactivas a partir de distintos sistemas y definir casos de uso en tiempo real.
¿Y si pudiéramos tener una herramienta que, pensada inicialmente para implementar soluciones de streaming, también fuera capaz de implementar soluciones de CDC? Con ksqlDB todo es posible.
Ya hemos puesto de relieve lo especial que es ksqlDB. A medida que fue evolucionando, sus objetivos también evolucionaron, siendo más ambiciosos que solo convertirse en una herramienta para la creación de soluciones para el procesamiento de flujos de datos.
Por ello, otro de los objetivos de ksqlDB es el de facilitar la forma de integrar el core de Kafka con otros sistemas externos al ecosistema, y todo ello a través de la misma interfaz SQL que se usa para el procesamiento de flujos de datos.
Pero, si ksqlDB fue concebida como una herramienta de procesamiento, ¿cómo puede ser posible que tenga la utilidad de integración? Para ello, vamos a contextualizar brevemente el origen de ksqlDB.
ksqlDB se lanzó por Confluent un año después de que Kafka Streams se introdujera en el ecosistema Kafka. Ambas tecnologías fueron diseñadas para el mismo fin, simplificar el proceso de creación de soluciones para el procesamiento de flujos de datos, por lo que no sorprende que ksqlDB y Kafka Streams compartieran muchas cosas, ya que una está basada en la otra.
Las aplicaciones implementadas con Kafka Streams leen y escriben de topics de Kafka, por lo tanto, si los datos que se desean procesar son externos a Kafka o quiere enviar los datos de salida de la aplicación a un sistema externo, se debe construir una “canalización” que permite mover datos hacia/desde los sistemas apropiados. Normalmente, estos procesos son manejados por Kafka Connect, por lo que cuando se usa el estándar de Kafka Streams, se debe implementar Kafka Connect y los conectores apropiados para realizar dichas tareas.
En un primer momento, ksqlDB imponía las mismas limitaciones que el estándar de Kafka Streams respecto a la integración de fuentes de datos que no son de Kafka. Sin embargo, a medida que fue evolucionando, trajo consigo algunas capacidades nuevas, como la integración con Kafka Connect.
De esta forma, con ksqlDB podemos crear las “canalizaciones” necesarias a través de su interfaz SQL para simplificar el proceso.
CREATE SOURCE CONNECTOR jdbc_source WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/myDB’,
'topic.prefix' = 'jdbc_',
'table.whitelist' = 'table',
'mode' = 'bulk',
'key' = 'key';
Esta posibilidad de integración permite a ksqlDB seguir el ciclo de vida completo de un flujo de eventos, en lugar de estar únicamente en la parte del procesamiento, con todos los beneficios que eso puede implicar.
La integración con Kafka Streams que permite potenciar las capacidades de procesamiento y consulta de ksqlDB, y la integración con Kafka Connect que ayuda a hacer que fuentes de datos externas estén disponibles para consultas, es indicativo de lo poderosa que es esta herramienta.
No hay mejor manera de entender el funcionamiento de una tecnología que verla en acción. Para ello, vamos a basarnos en el caso de uso de detección de movimientos fraudulentos en tarjetas que hemos visto en el artículo destinado a ksqlDB de la serie de soluciones de streaming en Kafka.
En dicho artículo nos centramos únicamente en la parte del procesamiento una vez los datos ya están en el ecosistema de Kafka, pero no entramos en cómo llevar la información de los almacenes de datos externos a Kafka, aunque sí dejamos planteada la posibilidad de implementar una solución de CDC para poder hacerlo.
Aprovechando que ya se ha usado ksqlDB como tecnología para el procesamiento de datos, el objetivo es claro. Vamos a emplear ksqlDB también para poder llevar a cabo la integración de los orígenes de datos externos con la información de los movimientos realizados por las tarjetas según vayan ocurriendo, lo más cercano posible al tiempo real.
La realidad es que definir el conector para realizar dicha tarea con la interfaz SQL que proporciona ksqlDB resulta muy sencillo.
CREATE SOURCE CONNECTOR movements_connector WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/cdc',
'connection.user' = 'admin',
'connection.password' = 'admin1',
'topic.prefix' = 'jdbc_',
'table.whitelist' = 'movements_online, movements_merchant, movements_atm',
'mode' = 'incrementing',
'incrementing.column.name' = '_id',
'key' = 'card',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter');
La configuración realizada para el caso de uso es una configuración simple:
'connector.class':
es el tipo de conector con el origen de datos. En este caso, como los orígenes de datos son una base de datos postgresql, se usa un conector genérico JDBC.'connection.url', 'connection.user', 'connection.password':
definen los datos necesarios para acceder a la base de datos y a las tablas correspondientes.'table.whitelist':
indica las tablas sobre las que necesitamos detectar los cambios y actuar en consecuencia.'topic.prefix':
define un prefijo al nombre del topic destino (por ejemplo, para la tabla movements_atm el topic resultante sería jdbc_movements_atm).'mode', 'incrementing.column.name':
estos campos permiten detectar únicamente las filas nuevas de la tabla, ya que es lo único que nos interesa en este caso, sin detectar modificaciones o eliminaciones.'key':
define la clave del evento enviado a Kafka.'key.converter', 'value.converter':
define la serialización tanto de la clave como el contenido del evento.Como se puede apreciar, la configuración del conector con ksqlDB es una configuración básica que tampoco dista mucho de cualquier otra configuración de un conector, aunque sí lo hace más accesible para todo tipo de usuarios.
En la propia documentación de Confluent se puede acceder a todos las propiedades del conector utilizado, así como a todos los conectores disponibles.
Simplemente, ejecutando esa sentencia SQL de ksqlDB ya estamos preparados para detectar nuevos movimientos producidos en la base de datos y enviarlos a Kafka en tiempo real.
Es importante recalcar que para esta configuración de conector, es necesario un topic distinto por tabla que “monitorizamos”, por lo que es necesario modificar ligeramente el proyecto original para que se adecúe a los nuevos topics.
Y poca cosa más. Sencillo, ¿verdad?
Si estáis interesados, todo el código para iniciar el caso de uso y sus instrucciones de ejecución están disponibles en el siguiente repositorio.
Cuando hablamos de procesos en tiempo real, estamos hablando de que somos capaces de desencadenar alguna tarea justo en el momento en el que ocurre una acción, situación muy importante para evolucionar y sacar más rendimiento a nuestras fuentes de información.
Con ksqlDB podemos crear un flujo end2end que nos permita cumplir con todo lo deseado sin la necesidad de crear arquitecturas complejas gracias a su existencia en el ecosistema Confluent.
Entonces, ¿es la mejor opción? Pues como casi todo, depende de diversos factores.
Si nuestra elección se basa en la búsqueda de la simplicidad, con una arquitectura más “limpia”, un menor mantenimiento y bajo el paraguas de una empresa como Confluent, ksqlDB puede representar una opción muy atractiva tanto por su capacidad de integración como de procesamiento.
Por el contrario, si lo que buscamos es un rendimiento óptimo, hay que tener en cuenta la forma en la que ksqlDB permite hacer la integración con los almacenes de datos externos. Dependiendo del conector utilizado, puede impactar en el rendimiento del sistema debido a que algunos conectores no hacen CDC a través de los logs transaccionales como pueden hacer otras herramientas (como Debezium), sino que hace pooling a la base de datos, teniendo pérdida de rendimiento y alejándonos del procesamiento en tiempo real. De hecho, en el caso de uso presentado, el conector JDBC usado hace esto, por lo que no cumplimos estrictamente con la definición de ‘tiempo real’ (simplemente podemos dejarlo en ‘cerca de tiempo real’).
Además, hay otros factores que pueden decantar la balanza entre unas tecnologías u otras, como puede ser el precio de licencia, si se tiene un stack ya implantado o el conocimiento que se tiene de una tecnología u otra.
Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.
Cuéntanos qué te parece.