¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
Conoce nuestra marca.¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
Conoce nuestra marca.dev
Carlos Navarro 09/03/2022 Cargando comentarios…
En un artículo reciente hicimos un repaso de las características y capacidades principales de Cloud Data Fusion, una de las herramientas para la integración de datos ofrecida por Google. Data Fusion es una herramienta muy potente, pero con ciertas particularidades y puntos por pulir que hacen que la curva de aprendizaje sea más elevada de lo que debería.
Por eso, con este artículo os quiero compartir algunos trucos y consejos (que ojalá hubiera sabido antes) para ayudaros a que los comienzos sean más fáciles y que podáis explotar antes las capacidades reales de esta herramienta.
Cloud Data Fusion es la versión gestionada por GCP de la plataforma CDAP, cuya empresa madre fue comprada por Google. Por esto, la documentación está dividida entre la documentación de GCP, que está más orientada a la gestión de la instancia y tareas básicas, y la documentación de CDAP en la que se puede encontrar más detalles de las diferentes capacidades de Data Fusion. No caigáis en el error de evitar la documentación de CDAP. Es tan oficial como la de GCP y, aunque quizá podría estar más completa y mejor organizada, contiene la gran mayoría de elementos que se utilizan cuando se desarrollan pipelines en Data Fusion.
Al crear la instancia de Data Fusion es buena práctica habilitar, al menos, la capacidad de logging para poder crear alertas automáticas más adelante según el comportamiento de los pipelines. Para ello, hay que desplegar el menú de opciones avanzadas:
La función principal de Data Fusion es crear y gestionar pipelines para la integración de datos de forma centralizada y visual. Data Fusion es capaz de integrar muchísimas fuentes de datos y una vez los pipelines están corriendo son robustos y repetibles. El problema es que a veces nos encontramos con muchas piedras en el camino. Con estos consejos esperamos poder ayudaros a sortear algunas de ellas.
Como ya comentamos, Data Fusion es un servicio gestionado, pero no un servicio serverless. Todo lo que se hace en Data Fusion se hace dentro de una instancia, la cual si se borra puede hacer perder mucho trabajo. Así que lo mejor es curarse en salud y guardar frecuentemente los pipelines que se vayan desarrollando. Para ello no hay más que pulsar “Export” en el menú de los pipelines. De la documentación:
Una vez descargado en local, lo mejor es tener un repositorio de código al que subirlo para compartirlo con el equipo y por mayor seguridad. Dicho esto, es un poco inconveniente el modo de versionado de pipelines de Data Fusion. Cada vez que hay que hacer cambios es necesario duplicar el pipeline y volverlo a desplegar. No puede haber desplegados dos pipelines con el mismo nombre y automáticamente se le añade un número de versión al pipeline al duplicarlo. Esto hace que sea fácil que en el repositorio acaben varias versiones del mismo pipeline con nombres ligeramente diferentes. Por ello, es importante tener una convención en el equipo de si se va a mantener el nombre de los pipelines o no y de si se van a permitir diferentes versiones del mismo pipeline en el repositorio. Pero esto debe decidirlo cada equipo. Por todo esto, es muy recomendable utilizar herramientas de DevOps para recrear y reprogramar los pipelines usando el API de Data Fusion.
Cuando se empieza a usar Data Fusion parece que es imposible depurar los errores en los pipelines. Es algo que no suele ser fácil en este tipo de sistemas, pero lo primero es saber dónde mirar. Casi siempre que falla algo que no es muy obvio los logs no aparecen directamente en la vista de logs de la GUI de pipelines. Por ejemplo, un dato mal formateado, quizá un error de tipo de columna al cargar en BigQuery o algún permiso. En esos casos (que al final son la mayoría) hay que ver los logs en crudo. Para ello hay que dar a “Logs” > “Download All” > “View Raw Logs”. De la documentación:
Data Fusion y en general todos los sistemas que utilizan procesamiento Big Data tardan en ejecutarse porque tienen mucha infraestructura que levantar. En el caso de Data Fusion, cuando se ejecuta un pipeline desplegado, el primer paso es levantar un cluster efímero de Dataproc, lo cual puede llevar 4 o 5 minutos. Afortunadamente, Data Fusion tiene la opción de ejecutar los pipelines en desarrollo en preview. No es algo instantáneo pero reduce mucho el tiempo necesario para corregir errores, sirviendo para cazar bugs típicos como pueden ser errores en los formatos de salida. Además de evitar tener que desplegar y duplicar los pipelines para corregir errores. De la documentación:
El problema es que hay veces que es fácil caer en la tentación de desplegar directamente para probar porque la ejecución en Preview tiene ciertas limitaciones. Primero, no lee todos los datos de entrada, sólo una muestra. Segundo, no ejecuta “acciones”. Las “acciones” son comandos que se ejecutan antes o después del resto de etapas. Estos comandos pueden ser cosas como copiar, mover o eliminar archivos, tareas de compresión y descompresión de archivos o comandos que se ejecutan, por ejemplo un comando en BigQuery. También la funcionalidad de asignar variables desde GCS o BigQuery.
Estas limitaciones pueden hacer que parezca que ejecutar en Preview no merece la pena, pero esto no es así. Merece la pena dedicarle tiempo a asignar las variables a mano y hacer una primera prueba en Preview para detectar problemas de tipos o comandos erróneos. En algunos casos, como si hay joins entre tablas muy grandes puede que la Preview no sea suficiente para saber si la Pipeline está bien hecha, pero siempre es recomendable.
Data Fusion permite utilizar variables en lo que denominan “Macros”. Son expresiones con el formato ${expresión} que permiten utilizar variables y métodos en las etapas del pipeline. Estas variables se pueden definir de varias formas:
Se puede encontrar más información en la documentación, pero lo más importante es que permiten hacer los pipelines reutilizables. Por ejemplo, si se cuenta con un entorno de producción y otro de desarrollo se pueden tener variables con distinto valor en los entornos. En este ejemplo, se podría definir un ${bigquery_project} que apuntase al proyecto de BigQuery de desarrollo o producción según corresponda. También se puede utilizar la acción de Argument Setter con ficheros de configuración con variables para que un mismo pipeline sirva para procesar distintos ficheros si todas las variables están en ese fichero.
Dicho esto, hay que tener cuidado al usar argumentos de entrada para asignar variables, ya que una de las cosas que hacen robusto a Data Fusion es que los pipelines una vez desplegados son repetibles y esto puede hacer que un mismo pipeline cambie su comportamiento según los valores de entrada.
Hay algunos casos raros en los que la instancia de Dataproc usada para ejecutar el pipeline se queda corriendo infinitamente. Esto puede pasar si hay un pipeline en ejecución y se borra la instancia de Data Fusion. Por ello, es importante tener alarmas de uso de Dataproc en caso de que un cluster esté levantado demasiado tiempo.
Por otro lado, Data Fusion a veces usa buckets temporales, por ejemplo para cargar o leer datos de BigQuery. A veces, dependiendo del error, estos buckets pueden no borrarse, así que es recomendable tener siempre un bucket temporal con un TTL definido para los ficheros temporales o, por lo menos, revisar los buckets temporales de vez en cuando.
Por lo tanto, es muy recomendable intentar definir alarmas en base a los consumos provocados por estas instancias o, en su defecto, definir cuotas de consumo que nos eviten algún tipo de consumo no deseados.
Una de las fortalezas de Data Fusion es que tiene una gran cantidad de plugins para la integración con otros sistemas, aunque la mayoría de ellas no están disponibles entre los componentes cargados por defecto. Además de los plugins para leer y escribir datos, también hay plugins para otro tipo de acciones, como puede ser hacer Look Up en tablas, comprimir/descomprimir ficheros o ejecutar comandos en BigQuery. Así que antes de buscar soluciones manuales a los retos con los que te encuentres intenta buscar en el “HUB” de plugins para ver si hay alguna opción disponible.
Aún así, hay veces que los plugins no funcionan como deben. En esos casos puede que haya que volver a los componentes por defecto. Por ejemplo, puede que el componente “projection” no funcione y haya que simularlo con Wrangler o que Look Up cuelgue el pipeline y haya que usar un Group By aunque sea más costoso.
BigQuery es probablemente el destino que más se usa en Data Fusion, siendo uno de los componentes estrella de las herramientas que ofrece CGP. El “Sink” de BigQuery tiene bastantes capacidades, de las cuales puede destacarse la posibilidad de hacer “Upsert” con los registros siempre que se tenga un campo id y un campo por el que ordenar. Esto por debajo ejecuta una instancia “MERGE” de BigQuery. Esta capacidad puede evitarte tener que mantener una capa de tablas delta, dependiendo del caso de uso. Para encontrar esto hay que bucear un poco en el código porque la documentación no lo deja muy claro.
Por otro lado, puede ser complicado crear tablas particionadas automáticamente en el Sink de BigQuery. A veces no funciona todo lo bien que debería y de momento (hasta la versión 6.5.1) parece que sólo se pueden hacer tablas que estén particionadas por día. Si se necesita que las tablas tengan particiones de mes o de año, o simplemente por asegurarse de que la partición se hace correctamente, normalmente es mejor crear primero la tabla en vez de dejar que Data Fusion la cree automáticamente.
Cuando se está creando un pipeline, cada vez que se conecta una etapa nueva a la anterior, Data Fusion le asigna a esa nueva etapa automáticamente el esquema de la etapa o etapas anteriores como esquema de entrada. Pero, aunque no es intuitivo, si editamos una etapa este esquema no se actualiza automáticamente en las etapas siguientes. Si editamos un Wrangler o quizá una etapa de tipo Source (que lee de BigQuery o GCS por ejemplo) es necesario entrar en esa etapa y pulsar, en la columna de la derecha, en el esquema de salida: “Action > propagate” y hacer esto para todas las etapas sucesivas.
La interfaz gráfica es muy cómoda y rápida, pero a veces no es suficiente. Un ejemplo, la función “Transpose” sirve para normalizar columnas. Esto permite que una tabla que tiene columnas con medidas diferentes se normalice en una tabla con las columnas “tipo” y “valor”, donde “tipo” es el nombre de la columna origen y “valor” el contenido del registro. Hacer eso para unos cuantos campos no es costoso. Pero si nos encontramos que hay que hacer eso cuarenta o cincuenta veces entonces ya empieza a ser un problema. En esos casos puede ser más rápido exportar el pipeline, encontrar esas instrucciones en el json descargado y utilizar funciones de tipo “encontrar y reemplazar” para después volver a importarlo.
Wrangler es la herramienta para realizar transformaciones sobre ficheros de forma sencilla. Permite parsear ficheros de tipos comunes como json, xml o csv, unir columnas, cambiar los tipos de dato de columnas y otras muchas cosas. Si se lo compara con otras soluciones, como el Wrangler de Dataprep, otra solución ofertada por Google, se le nota menos refinado, menos intuitivo. Pero tiene muchas funcionalidades ocultas, es el componente al que más partido se le puede sacar si se investiga un poco.
Data Wrangler de Data Fusion admite instrucciones por línea de comandos (CLI) que en la documentación denominan “Power Mode”. Con ellos se puede sacar mucho más partido a Wrangler. En GCP no hay nada de documentación al respecto, la mayoría de la documentación está en la página de CDAP, aunque puede ser difícil encontrar lo que necesitas en cada momento. Hay algún artículo que también puede ayudar, pero no hay muchos disponibles.
Estos son algunos ejemplos de lo que se puede conseguir con Wrangler usando este “Power Mode”:
Hay muchas funciones que se pueden hacer en la GUI y en la CLI. La ventaja de la GUI es que no hay que memorizar los comandos. Pero una vez los conoces, usar la CLI te da algo de feedback si falla (normalmente una sola línea, pero menos es nada). Además, puedes copiar y pegar el comando hasta que des con la sintaxis correcta o editar en un editor de texto directivas de otros Wranglers que sean parecidos pero no exactamente iguales.
Esto se puede hacer también en la GUI (opción Custom Transform), pero se recomienda hacerlo en Power Mode por las razones arriba indicadas. Por ejemplo, queremos que la columna A tenga el valor de la columna B si la columna B es mayor que la columna C y si no que valga lo que la columna C. Esto se haría de la siguiente manera:
set-column column_A if(column_B>column_C){column_B} else {column_C}
Estas expresiones usan JEXL como lenguaje. Admiten espacios pero no saltos de línea.
Por ejemplo, una fecha con el siguiente formato: yyyy.MM.dd. Si se parsea a tipo Date utilizando la GUI (parse > date), puede fallar. Si se parsea utilizando parse > simpledate el Wrangler las convierte a timestamp con zona horaria (2021-12-25T00:00:00[UTC]) y puede que se necesiten como tipo Date, sin hora. Entonces, se puede convertir Timestamp con zona a Date sin zona con con las siguientes instrucciones:
parse-as-simple-date :fecha dd.MM.yyyy
set-column fecha_de_precio fecha.toLocalDate()
Por ejemplo, añadir la fechahora actual en la columna ingestion_time:
set-column ingestion_time datetime:CurrentDateTime()
A veces es difícil manejarse con los tipos de dato que tiene Wrangler, pero con un poco de práctica te acabas acostumbrando. Por ejemplo, muchas veces el tipo de hora por defecto es “timestamp_micros” que incluye zona horaria.
set-column ingestion_time datetime:CurrentDateTime()
format-datetime :ingestion_time "yyyy-MM-dd HH:mm:ss"
parse-as-datetime :ingestion_time "yyyy-MM-dd HH:mm:ss"
Hablando de fechas, los “módulos” que admite la CLI se corresponden con clases java. Por ejemplo el tipo Datetime corresponde con ZonedDateTime.
La GUI no tiene una opción de parsear números decimales con coma como separador decimal (por ejemplo 1.003,14) así que hay que usar expresiones regulares para convertirlos:
find-and-replace valor_decimal s/\.//g
find-and-replace valor_decimal s/,/./g
send-to-error !dq:isNumber(valor_decimal)
set-type :valor_decimal double
Es útil utilizar la instrucción sendToError para que el registro se mande al colector de errores en vez de que falle el pipeline entero. Esto también se puede hacer con la GUI, pero es más rápido usar la CLI. Un ejemplo:
send-to-error !dq:isDate(fecha, "dd-MM-yyyy")
Enlazando con el punto anterior, en la configuración de la etapa Wrangler hay opciones de gestión de errores. Por defecto la opción es “Fail pipeline”, lo cual hace que todo el pipeline falle si hay un registro que falla. Las otras opciones son “Skip error” y “Send to Error port”.
Todo el que haya manejado datos sabe que los errores de tipo son mucho más comunes de lo que parece y que haya un registro erróneo no quiere decir que haya que parar todo un pipeline, especialmente si el pipeline tarda mucho en ejecutar. Por tanto, la opción Fail pipeline es un poco drástica en la mayoría de los casos. La opción “Skip Pipeline” se pasa por el otro lado, puede ser peligroso ignorar completamente los errores. Lo mejor es usar la opción “Send to error port” que manda los errores al componente colector de errores y desde ahí se pueden guardar, en un bucket GCS por ejemplo.
Para que esto funcione hay que utilizar la directiva “Send to error port” que se puede encontrar en la GUI o en la CLI. La recomendación es que se utilice esta instrucción justo antes de cada vez que se va a parsear un dato (una fecha por ejemplo) o cada vez que se va a cambiar un tipo de dato (pasar de string a double). De esta manera el pipeline será resistente a registros sueltos erróneos. Una vez hecho esto ya sólo quedaría tratar los errores. De primeras se puede emplear inspección manual, pero para producción suele ser necesario implementar un sistema de alarmado cuando aparecen registros erróneos.
Ejemplo de pipeline con Error Collector:
Muchas veces los pipelines que se crean para distintos ficheros son iguales excepto la etapa de parseo de datos. Para esos casos puede ser útil duplicar un pipeline ya existente y solamente editar la etapa del Wrangler. Pero esto a veces da problemas y al pinchar sobre “Wrangle” en el componente para manejar el fichero con la GUI no sale nada o da error. Cuando esto pase, lo mejor es abrir un Wrangler nuevo, parsear el fichero y cuando hayamos acabado pasarlo a nuestro pipeline. Para ello hay que:
Para la última de las capacidades de Data Fusion, enfatizar que hay que ponerle mucha atención a los valores de “Reference” de los componentes, pues los pipelines no se pueden editar.
Estos consejos y trucos no son más que la punta del Iceberg. Data Fusion es una herramienta con mucho potencial, ampliable incluso con nuestros propios plugins. Esperamos haber ayudado con este artículo a que aprovechéis antes las capacidades de Data Fusion. ¿Tienes alguna duda o pregunta? ¡Déjanos un comentario!
Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.
Cuéntanos qué te parece.