Terminamos la serie de post "Replicando datos en tiempo real", donde hemos analizado la replicación de datos en tiempo real entre dos sistemas heterogéneos.

En la primera entrega, Replicando datos en tiempo real: ¿qué vas a hacer con tus datos, si no los usas?, expusimos la necesidad de encontrar una solución a esta problemática, viendo qué nos puede aportar desde un punto de vista de negocio y analizando los inconvenientes de las soluciones tradicionales.

En el segundo post, Replicando datos en tiempo real II: Diseñando nuestra solución, diseñamos la arquitectura de nuestro proceso de replicación de datos, analizando sus ventajas e inconvenientes y presentando algunas tecnologías que nos pueden servir a la hora de llevar a cabo la implementación.

Debido a que, como hemos visto, las tecnologías a utilizar pueden variar en función del sistema a replicar, en este último post vamos a centrarnos en analizar con más detalle la replicación de datos para dos bases de datos relacionales: Oracle y PostgreSQL.

Oracle (12c)

Oracle no es un sistema que nos ofrezca demasiadas facilidades a la hora de replicar sus bases de datos en tiempo real. Además, si echamos un vistazo al panorama tecnológico, las principales alternativas que nos aparecen son productos de pago, ya sean propias de Oracle u otras compañías de terceros.

En el mundo open source, solo Debezium parece ofrecer alguna alternativa interesante. Aunque, en el momento de escribir este post, el conector para Oracle aún se encuentra en desarrollo (y además parece que estará basado en la utilización del framework Oracle Golden Gate de todas formas).

Las tres vías que hemos explorado para la ingesta de logs de Oracle son:

Procesado manual (spoiler: descartado)

Los archivos de logs de Oracle en los que debemos poner el foco son los archivos “redo</em>.log”, que son los que contienen la información de las transacciones ejecutadas en nuestra base de datos_._

Esta información se encuentra cifrada mediante un sistema propietario de Oracle, por lo que su parseo no es trivial. Para poder hacernos una idea, la pinta que tienen es la siguiente (tras hacer un ALTER SYSTEM DUMP logfile del archivo redo a procesar):

REDO RECORD - Thread:1 RBA: 0x000016.00000002.0010 LEN: 0x0154 VLD: 0x05 CON_UID: 2324221331
SCN: 0x0000.001bc8cc SUBSCN: 1 01/12/2017 06:00:44
(LWN RBA: 0x000016.00000002.0010 LEN: 0006 NST: 0001 SCN: 0x0000.001bc8cc)
CHANGE #1 CON_ID:1 TYP:0 CLS:26 AFN:4 DBA:0x01000834 OBJ:4294967295 SCN:0x0000.001bc8c9 SEQ:15 OP:5.1 ENC:0 RBL:0 FLG:0x0000
ktudb redo: siz: 68 spc: 2358 flg: 0x0022 seq: 0x00ee rec: 0x35
 xid: 0x0005.016.00000657
ktubu redo: slt: 22 rci: 52 opc: 11.1 objn: 91814 objd: 91814 tsn: 1
Undo type: Regular undo Undo type: Last buffer split: No
… (contenido cortado debido a que es demasiado verboso y no aporta nada)
col 0: \[ 6\] c5 2b 5f 60 0a 48
col 1: \[ 2\] c1 07
col 2: \[13\] 78 75 01 0c 07 01 2d 31 14 b2 80 14 3c
col 3: \[ 1\] 80

Aunque existen guías sobre cómo procesar esta información, nosotros decidimos no hacer este procesado por nuestra cuenta debido a que:

  1. La implementación es demasiado compleja y costosa.
  2. Es difícil crear una solución reactiva o de tiempo real, ya que la generación y procesamiento de los archivos de logs tendría que ser ejecutado de manera periódica.

Oracle Golden Gate

Como dijimos en el post anterior, este producto de pago de Oracle nos ofrece la posibilidad de replicar dos sistemas heterogéneos en tiempo real. Para respetar nuestro diseño de arquitectura podemos utilizar este software para obtener la replicación en una cola de mensajería JMS, en concreto ActiveMQ (para integrar con Apache Kafka necesitaríamos utilizar Oracle Golden Gate Big Data).

Conceptos básicos

HOST1: Máquina donde tenemos la base de datos origen. En esta máquina debemos hacer la primera instalación de Oracle Golden Gate para extraer la información a partir del sistema de logs de la base de datos para, a continuación, emitirlos a nuestra máquina destino (HOST2). No obstante, es importante puntualizar que Oracle Golden Gate distingue dos procesos distintos en la extracción:

Extract: proceso de extracción en sí mismo. Se encarga de procesar las transacciones y enviarlas a la bandeja de salida.

Data Pump (opcional): se encarga de procesar lo realizado en el paso anterior y llevarlo al host destino (remoto). Sirve para aislar al proceso de extracción de problemas relacionados con el envío de datos al host remoto. Además, puede realizar operaciones de filtrado y/o transformación. Si omitimos esta pieza, el proceso Extract tendría que hacer el envío de datos al host remoto por sí mismo.

HOST2: Máquina donde tenemos la cola de mensajería JMS (ActiveMQ, por ejemplo). En esta máquina debemos realizar una segunda instalación de Oracle Golden para que se encargue de recibir la información del HOST1 y volcarla a la cola de mensajería JMS.

Instalación y configuración

Los productos que vamos a necesitar descargar son Oracle GoldenGate 12.2.0.1.1 for Oracle, Oracle GoldenGate Application Adapters for JMS and Flat File, y, lógicamente, la cola de mensajería ActiveMQ.

El proceso de instalación es algo tedioso, aunque está bien documentado por parte de Oracle. Los dos documentos principales que seguimos para la instalación de estos productos son los siguientes: Intalling OGG y OGG to JMS. Además, en esta documentación también encontraremos información acerca de cómo configurar los distintos procesos que concurren en la extracción de los datos, los cuales hemos explicado anteriormente.

Además, también podemos hacer uso de la herramienta gráfica Oracle Golden Gate Studio, aunque esto no es obligatorio, ya que toda la configuración se puede hacer vía archivos de configuración y ejecución de comandos sobre las consolas de Oracle Golden Gate (GGSCI).

Configuración BBDD origen

Para poder realizar el proceso de replicación sobre la base de datos Oracle, tenemos que cumplir ciertos requisitos en la base de datos que va a ser replicada:

  • Tener configurado en el modo archivelog.
  • Tener habilitado el parámetro enable_goldengate_replication.
  • Tener habilitado el supplemental log data.
  • Tener configurada la instancia de la base datos en modo FORCE LOGGING.

Como en el paso anterior, estas modificaciones están bien documentadas en la siguiente página de Oracle.

Ejemplo mensaje

Una vez tengamos todo funcionando, el tipo de mensaje que obtendremos será algo parecido a esto:

xml

<operation table='TABLE_NAME' type='INSERT' ts='2017-03-02 17:27:41.005167' pos='00000000080000008954' numCols='3'>
<col name='ENTITY_ID' index='0'>
   <after><![CDATA[28]]></after>
 </col>
<col name='GDP_YEAR' index='1'>
   <after><![CDATA[2008]]></after>
 </col>
<col name='GDP_VALUE' index='2'>
   <after><![CDATA[0.84704]]></after>
 </col>
</operation>

Conclusiones acerca del producto

Finalmente, tras las pruebas realizadas con este producto, las principales conclusiones que he sacado son las siguientes:

  • Se necesita realizar sobre la base de datos a replicar (origen) una configuración específica para que el proceso de replicación pueda funcionar.
  • Se necesitan dos instancias de Golden Gate funcionando: una para el proceso de extracción y otra para el proceso de volcado.
  • Se necesita hacer un mapeo de la estructura de las tablas a replicar. Los archivos que contienen la información de estas estructuras deben estar en ambas instancias de Oracle Golden Gate.
  • La cola de mensajería sólo contiene las transacciones ejecutadas una vez iniciado el proceso, no tiene en cuenta el estado inicial de la base de datos. Esto implica que es necesario un proceso de migración inicial adicional si queremos obtener una solución completa de replicación.
  • Realiza un buen manejo de errores en caso de caída de algunos de los elementos de la solución.

CR8

Como comentamos en el post anterior, este producto de pago de la empresa DBS-H soporta la replicación de datos en tiempo real entre bases de datos relacionales y diversos sistemas de mensajería, Big Data y bases de datos no relacionales (MongoDB).

Al igual que en el caso anterior, y para respetar el diseño de arquitectura planteado en el anterior post, vamos a utilizar este software para integrar nuestro proceso de replicación con Apache Kafka.

Conceptos básicos

Este producto se compone de tres módulos básicos:

  1. Fetcher (sólo necesario en caso de Oracle): Este módulo es el encargado de transferir los archivos de logs transaccionales desde nuestra base de datos Oracle a la máquina donde tengamos el proceso de replicación de CR8.
  2. Parser: Este módulo es el encargado de procesar los archivos de logs transaccionales (recuperados por el módulo anterior) y persistir los cambios en archivos independientes con formato JSON, aplicando las transformaciones que hayamos especificado en la configuración de nuestro proceso de replicación.
  3. Applier: Es el módulo encargado de aplicar los cambios procesados por el módulo anterior, en orden, a la base de datos destino de nuestro proceso de replicación.

Instalación y configuración

La propia empresa DBS-H nos facilitó una máquina virtual con la instalación del producto. Cuando evaluamos CR8, éste no tenía ninguna herramienta gráfica para realizar la configuración de nuestro proceso de replicación, por lo que la hicimos directamente sobre los archivos JSON de configuración. Veamos cómo:

  • Partimos de la plantilla CR8Config.json.template que tenemos disponible dentro de la carpeta /home/oracle/dbsh/coreplic8/etc, y especificamos nuestros valores de conexión:
xml

{
"source": {
       "dBType": "ORACLE",
       "host": "localhost",
       "port": "1539",
       "serviceName": "ogg12"
},
"target": {
       "dBType": "KAFKA",
       "host": "10.70.1.58",
       "port": "9092",
       "zooKeeperHost" : "10.70.1.58",
       "clientPort": "2181",
       "kafkaTopic": "topicCR8_1”
},
"applier": {
        "start": "true",
        "mode": "JOURNAL",
        "journalType": "SINGLE",
        "sleepTime": "1500"
}
}

  • Cargamos de la configuración con el siguiente comando:

updateCMDB.sh <ruta_archivo_configuración_anterior>

Si el resultado de este proceso es satisfactorio (true), es que hemos podido hacer una conexión inicial con los dos sistemas a comunicar.

  • Arrancamos el proceso de streaming:

CR8_Stream_ctl.sh start <Stream name>

Configuración BBDD origen

Al igual que en el caso anterior, la documentación de CR8 nos indica que, para poder realizar el proceso de replicación sobre la base de datos Oracle, tenemos que realizar ciertas configuraciones sobre la base de datos origen. En concreto:

  • Estar configurado en el modo archivelog.
  • Ejecutar el script /home/oracle/dbsh/coreplic8/sql/grant_flashback_on_dict_tables.sql para conceder los permisos necesarios a los usuarios administradores sobre las data dictionary tables.

Ejemplo mensaje

La siguiente sentencia en la base de datos Oracle:


insert into economic_entity(entity_id,economic_entity, continent)
values(551, 't1', 't2'); commit;

Provocaría el siguiente mensaje en nuestro topic Kafka:

xml

{
 "commandScn": "1926516",
 "commandCommitScn": "1927635",
 "commandSequence": "235",
 "commandType": "INSERT",
 "commandTimestamp": "2017-02-17 10:10:46+00:00",
 "objectDBName": "ogg12",
 "objectSchemaName": "WEST",
 "objectId": "ECONOMIC_ENTITY",
 "changedFieldsList": [
  {
     "fieldId": "ENTITY_ID",
     "fieldType": "NUMBER",
     "fieldValue": "551"
}, {
    "fieldId": "ECONOMIC_ENTITY",
     "fieldType": "VARCHAR2",
     "fieldValue": "t1"
}, {
    "fieldId": "CONTINENT",
     "fieldType": "VARCHAR2",
     "fieldValue": "t2"
} ],
"conditionFieldsList": []
}

Conclusiones acerca del producto

Tras haber estado probando este producto, mis principales conclusiones son:

  • Al igual que con Oracle Golden Gate, se necesita realizar sobre la base de datos a replicar (origen) una configuración específica.
  • Las transformaciones de datos permitidas son muy básicas, ya que sólo permite crear nuevos campos a partir de la copia de valores de otros campos y el renombrado de campos.
  • El topic de Kafka sólo contiene las transacciones ejecutadas una vez iniciado el proceso, no tiene en cuenta el estado inicial de la base de datos. Esto implica que es necesario un proceso de migración inicial adicional si queremos obtener una solución completa de replicación. No obstante, también es importante señalar que CR8 sí permite la replicación completa de datos, aunque para ello la configuración se debe realizar entre dos bases de datos (ejemplo: MongoDB en lugar de Kafka) y, además, se necesita realizar acciones adicionales sobre la base de datos Oracle.
  • Realiza un buen manejo de errores en caso de caída de algunos de los elementos de la solución.
  • A pesar de la aparente facilidad de uso, la puesta en marcha de este proceso nos causó algunos problemas, por lo que la sensación final que nos dejó CR8 fue la de ser un producto menos maduro que Golden Gate de Oracle.

PostgreSQL

PostgreSQL, al igual que Oracle, dispone de logs transaccionales en formato binario que pueden ser consultados y procesados para realizar la replicación. No obstante, a diferencia de Oracle, esta serialización no tiene formato propietario, y además, a partir de la versión 9.4, disponemos de una funcionalidad llamada logical decoding, que facilita la replicación de datos.

Todo lo comentado en este post hace referencia a esta “nueva” funcionalidad, por lo que sólo aplica a la versión 9.4 de PostgreSQL y posteriores.

Procesado de logs (Conceptos básicos)

Si echamos un vistazo a la documentación original de PostgreSQL, unos cuantos conceptos clave que vamos a descubrir, y con los que nos conviene estar familiarizado, son:

  • Logical Decoding: Proceso de extraer todos los cambios persistentes a otras tablas de la base de datos, en un formato coherente y fácil de entender, para que pueda ser interpretado sin necesidad de poseer un conocimiento profundo de la base de datos en cuestión.
  • Write-ahead-log (WAL): Estos archivos describen los cambios en un sistema persistente en forma de sentencias SQL. El concepto principal es que los cambios a los ficheros de datos (donde residen las tablas e índices) sólo se escriben una vez que los cambios hayan sido almacenados en los archivos WAL (de esta forma no necesitamos comprobar el resultado de cada transacción para mantener la coherencia de nuestros datos). Es más, estos archivos también nos valen como mecanismo de seguridad en caso de caída de la base de datos, ya que podríamos recuperar de estos archivos de logs las transacciones que aún no se hubieran escrito en las tablas e índices correspondientes (esto también se conoce como roll-forward recovery, REDO).
  • Replication slot: Representa un flujo de cambios, que puede ser replicado por un cliente, para obtener el estado actual de la base de datos origen. Cada slot tiene un flujo de cambios de una base de datos, actúa de manera independiente y tiene su propio estado. Es posible que un slot pueda ser consultado por varios consumidores (lógicamente, el slot no sabe nada acerca de sus consumidores).
  • Output plugin: Estos plugins transforman los datos de los WAL’s a los formatos que los consumidores de los slots requieren. Tienes más info aquí.
  • Exported snapshot: Cada vez que un replication slot se crea, un snapshot que representa el estado de la base de datos en el momento de creación del slot.

Gracias a todos estos conceptos y mecanismos es posible crear una solución de replicación de datos en tiempo real para PostgreSQL. Y esto es precisamente lo que hace Debezium, por lo que nosotros decidimos utilizarlo para implementar nuestro proceso de replicación. Lo vemos a continuación.

Debezium

Este producto open source permite la replicación en tiempo real entre diversas bases de datos relacionales y Apache Kafka.

Conceptos básicos

Para la replicación de PostgreSQL, Debezium se basa en el mecanismo de logical-decoding.

El conector de Debezium para PostgreSQL se compone de dos elementos:

Instalación y configuración

Para realizar la instalación podemos utilizar las imágenes de Docker que Debezium nos proporciona de Kafka, Zookeeper y KafkaConnect en este repositorio.

Otra opción, si ya tenemos nuestra instalación de los anteriores elementos, es la de registrar el jar del conector de Debezium en el classpath de KafkaConnector, y reiniciar este proceso para que lo pueda cargar (más info sobre este proceso lo tenemos en la propia documentación de Debezium).

En cualquier caso, una vez hayamos realizado esta instalación, la configuración de nuestro proceso de replicación se realiza mediante la invocación al API REST de Kafka Connect de la siguiente forma:


curl -X POST -H "Content-Type: application/json" -H
"Cache-Control: no-cache" -H "Postman-Token:
c0ed1f9e-33c0-87af-d729-d26cdd75b592" -d '{
  "name": "origindb-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "192.168.42.122",
    "database.port": "5432",
    "database.user": "replicationuser",
    "database.password": "xxxx",
    "database.dbname" : "vpDB",
    "database.server.name": "vp"
  }
}' "http://connect-postgresql-replication-poc.osapps.paradigma.local/connectors/"

Configuración BBDD origen

Como hemos comentado antes, esta solución se basa en la implementación un logical decoding output plugin para poder realizar el proceso de replicación sobre la base de datos PostgreSQL. Este plugin está escrito en C, se instala en la máquina del servidor de PostgreSQL, y codifica los cambios en Protobuf format.

Además de esta instalación, también debemos tener en cuenta que hemos de habilitar la replicación en la base de datos origen, la cual no viene habilitada por defecto en las instalaciones de PostgreSQL.

En la documentación oficial encontraremos más detalle, y un ejemplo de mensaje JSON que genera este producto, junto con la especificación de cada uno de sus campos.

Conclusiones acerca del producto

Mis conclusiones acerca de Debezium son:

  • Crea un topic por cada tabla a replicar y por cada operación crea un evento dentro de ese topic.
  • En el tratamiento de errores se garantiza que no se pierde ningún mensaje, aunque se puede obtener mensajes duplicados (mismo comportamiento estándar que Kafka).
  • Tiene la licencia de Apache 2.0.
  • Proporciona la capacidad de realizar un snapshot inicial de manera opcional. Este snapshot inicial lo realiza a partir de una lectura de todos los registros de la base de datos, generando así eventos de lectura en los archivos WAL de todos los registros existentes (con este mecanismo nos evitaríamos el tener que realizar una migración inicial de la base de datos por nuestra cuenta).
  • Sólo funciona en el nodo master de un cluster (limitación de postgreSQL en su versión actual).
  • Se necesita un usuario con un rol con permisos de login y replicación.
  • Son necesarios permisos de administrador para instalar el plugin. Además, en la máquina donde reside la base de datos hemos de instalar otros paquetes adicionales (protobuf, postgis y paquetes de desarrollo de postgresql), en el caso de que no estuvieran ya instalados.
  • Requerimos la versión mínima 9.4 de postgreSQL.

Conclusiones

Llegados a este punto, ya tendríamos cubiertos buena parte de nuestra solución de replicación, tal y como se ve en la siguiente figura:

A continuación, tendríamos que implementar el elemento ETL, que es el encargado de recoger los eventos de la plataforma de streaming (o cola de mensajería), transformarlos según nos convenga y volcarlos a nuestro sistema de persistencia destino.

Para implementar este proceso tenemos bastantes posibilidades. Por ejemplo, en el caso de Apache Kafka, podríamos utilizar Kafka Connector y delegar nuestro proceso de volcado a un conector tipo Sink.

Otra opción podría ser implementar nuestro propio microservicio ETL utilizando nuestra tecnología favorita: por ejemplo, Spring (Java) podría ser una opción interesante, tanto si nos tenemos que integrar con Apache Kafka (Debezium/CR8), como si nos tenemos que integrar con una cola JMS (Oracle Golden Gate).

En cualquier caso, este tipo de implementaciones no son algo exclusivo de la problemática de la replicación de datos, por lo que no vamos a profundizar en ellas en este artículo.

Con este post damos por finalizado la serie de posts dedicados al análisis de la replicación de datos en tiempo real, en los que hemos visto las motivaciones de negocio para cubrir esta necesidad, el diseño técnico de nuestra solución y algunas posibilidades de implementación para los casos de Oracle y PostgreSQL.

Nuevo llamado a la acción

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.