¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
Conoce nuestra marca.¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
Conoce nuestra marca.dev
Óscar Javier Fúquene Ramos 15/06/2023 Cargando comentarios…
Hace unas semanas ya os contamos qué es Great Expectations, la librería para maximizar la calidad de los datos. Continuamos con esta serie de post y en este nuevo artículo vamos a explicar cómo usar el Data Assistant que nos ayudará a obtener Expectation automáticamente. Esta herramienta nos viene bien para cuando no sabemos por dónde empezar al entrar en contacto con esta librería o, simplemente, queremos que GX realice una primera exploración sobre un conjunto de datos cuyas características desconocemos.
Además, vamos a describir cómo GX puede conectarse a diversas fuentes de datos como S3, Athena y Snowflake.
GX ofrece una utilidad denominada Data Assistant, que permite realizar una introspección sobre uno o varios Batch de datos con el objetivo de identificar Expectation candidatas a ser usadas, basándose en la información recolectada por dicha utilidad. Mediante el Data Assistant podemos acelerar la exploración de datos y la creación de Expectation.
Un Data Assistant consta de un profiler basado en reglas que son el medio para “interrogar a los datos” mediante la formulación de “preguntas” sobre las características de un Batch de datos como por ejemplo:
En función del tipo de dato de las columnas que conforman el Batch, el Data Assistant sigue realizando preguntas. En caso de que la columna sea de tipo numérico, pregunta ¿cómo están distribuidos los datos? o ¿cuáles son las estadísticas? Si la columna es de tipo fecha, ¿cuál es el rango de fechas existente? Si es de tipo texto, ¿El valor debe ajustarse a alguna expresión regular?, ¿Que valores categóricos existen?, etc.
Para dar respuesta a cada una de las preguntas planteadas, el Data Assistant aplica una serie de Expectation predefinidas sobre un Batch de datos.
Esta es la relación entre las diferentes preguntas citadas previamente y las Expectation correspondientes que le dan respuesta:
¿Qué columnas tiene el Batch?
¿Cuántos registros tiene el Batch?
¿Existen valores nulos en el Batch?
¿Existen valores únicos en el Batch?
¿Cuáles son las estadísticas de los datos?
¿Cuáles son los rangos de fechas existentes?
¿Los datos de tipo texto deben ajustarse a una expresión regular, patrón o tener cierta longitud?
¿Qué valores categóricos existen?
Como resultado final, el Data Assistant genera una Expectation Suite que contiene las Expectation listadas previamente, quedando a disposición del usuario eliminar, modificar o añadir Expectation diferentes, usando aquellas que ha creado la comunidad que puedes ver en este enlace o creando Expectation propias, en función de las validaciones que se quieran aplicar sobre los datos.
Teniendo claro lo que es un Data Assistant, vamos a describir los pasos para empezar a validar los datos usando el Data Assistant de Great Expectations.
Tal como explicamos en el primer post, para poder empezar a usar Great Expectations es necesario instalar la librería e inicializar el Data Context. Si tienes dudas de cómo hacerlo te recomiendo que leas el primer post donde se explica detalladamente.
GX se puede conectar a diversas fuentes de datos tanto en On-premise como en Cloud, como lo son: S3, Athena, Snowflake, MySQL, MSSQL, Azure Blob, Google Cloud Storage, BigQuery, entre otros.
Asumiendo que ya hemos inicializado el Data Context, podemos seguir usando el notebook creado en el 1º post o crear uno nuevo en el directorio donde se haya inicializado el Data Context.
En los siguientes apartados vamos a detallar cómo conectarnos a las siguientes fuentes de datos: S3, Athena y Snowflake.
Queremos usar el Data Assistant para generar una Expectation Suite a partir de los datos almacenados en formato CSV en un bucket de S3, tal como se aprecia en la siguiente imagen.
En el bucket hay 2 ficheros que contienen datos alusivos a características y precios de coches. Puedes descargar los datos en este enlace.
Antes de conectarnos con S3 es necesario verificar que tengamos configurada la CLI y las credenciales de AWS para que GX pueda conectarse con los servicios de AWS.
Una vez tengamos la CLI y credenciales listas, procedemos a configurar la conexión con S3. Para ello abrimos el notebook generado en el primer post, y ejecutamos el siguiente código cuya función es crear un Data Source que permita a GX conectarse a un bucket de S3 usando el Data Connector InferredAssetS3DataConnector.
import great_expectations as gx
from great_expectations.cli.datasource import sanitize_yaml_and_save_datasource
context = gx.get_context()
datasource_name = "s3_datasource"
""" Definimos el YAML con configuración del Data Source de S3. """
s3_datasource_yaml = f"""
name: {datasource_name}
class_name: Datasource
execution_engine:
class_name: PandasExecutionEngine
data_connectors:
default_inferred_data_connector_name:
class_name: InferredAssetS3DataConnector
bucket: 'gx-experiments-data'
prefix: 'data'
default_regex:
group_names:
- data_asset_name
pattern: (.*)\.csv
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
assets:
my_runtime_asset_name:
batch_identifiers:
- runtime_batch_identifier_name
"""
"""
Comprobamos que el YAML es correcto realizando un test, el cual se conecta a S3
y lista los ficheros que se encuentran dentro del bucket y prefix configurados en el YAML.
"""
context.test_yaml_config(yaml_config=s3_datasource_yaml )
""" Persistimos el YAML del Data Source en el contexto de GX. """
sanitize_yaml_and_save_datasource(context, s3_datasource_yaml , overwrite_existing=False)
Después de ejecutar el código anterior, se persiste la configuración del nuevo Data Source en el fichero great_expectations.yml ubicado dentro del Data Context de GX, donde podemos ver los ítems de configuración relevantes que habilitan la conexión con S3 y que se señalan en la siguiente imagen.
Siguiendo las pautas anteriores tenemos el Data Source listo para conectarnos a S3. Para obtener más detalles sobre la configuración de la conexión con S3 puedes ir a este enlace.
Aquí partimos del hecho de que tenemos una tabla en Athena, también con los mismos datos del apartado anterior, los cuales han sido cargados previamente en la tabla car_features_prices.
Para conectarnos con Athena, GX necesita que tengamos instaladas las librerías SQLAlchemy y PyAthena para interactuar con dicho servicio de AWS.
Podemos seguir usando el notebook del aparato anterior y ejecutar el siguiente código para configurar el Data Source que se conecte con Athena.
import great_expectations as gx
from great_expectations.cli.datasource import sanitize_yaml_and_save_datasource
context = gx.get_context()
datasource_name = "athena_datasource"
""" Región donde tenemos desplegada la base de datos de Athena. """
aws_region = 'eu-west-1'
""" Nombre de la base de datos de Athena. """
athena_db = 'gx_db'
""" Nombre del directorio de staging de Athena. """
s3_staging_dir = 's3://gx-experiments-data/output/'
""" Nombre de la tabla de Athena con los datos que queremos validar. """
table_name = "car_features_prices"
""" Cadena de conexión para conectarnos con Athena. """
connection_string = f"awsathena+rest://@athena.{aws_region}.amazonaws.com/{athena_db}?s3_staging_dir={s3_staging_dir}"
""" Definimos el YAML con configuración del Data Source de Athena. """
athena_datasource_yaml = f"""
name: {datasource_name}
class_name: Datasource
execution_engine:
class_name: SqlAlchemyExecutionEngine
connection_string: {connection_string}
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
default_inferred_data_connector_name:
class_name: InferredAssetSqlDataConnector
include_schema_name: True
introspection_directives:
schema_name: {athena_db}
default_configured_data_connector_name:
class_name: ConfiguredAssetSqlDataConnector
assets:
{table_name}:
class_name: Asset
schema_name: {athena_db}
"""
"""
Comprobamos que el YAML es correcto realizando un test, el cual se conecta a Athena
y comprueba si existe la tabla configurada en el YAML.
"""
context.test_yaml_config(yaml_config=athena_datasource_yaml)
""" Persistimos el YAML del Data Source en el contexto de GX. """
sanitize_yaml_and_save_datasource(context, athena_datasource_yaml, overwrite_existing=False)
Después de ejecutar el código anterior, accedemos al fichero great_expectations.yml y vemos los ítems de configuración usados para conectarse con Athena.
Eso es todo, ya tenemos habilitada la conexión entre GX y Athena a través del Data Source que hemos configurado. Para obtener más detalles sobre la configuración de la conexión con Athena puedes ir a este enlace.
Para este caso, contamos con una tabla de Snowflake llamada car_features en la cual también hay datos de la misma naturaleza que los apartados previos y que se han precargado usando la utilidad SnowSQL.
Para conectarnos con Snowflake necesitamos las librerías: SQLAlchemy, snowflake-connector-python y snowflake-sqlalchemy.
Ejecutando el siguiente código configuramos un nuevo Data Source que se conecta con la tabla de Snowflake que contiene los datos.
import great_expectations as gx
from great_expectations.cli.datasource import sanitize_yaml_and_save_datasource
context = gx.get_context()
datasource_name = "snowflake_datasource"
""" Parámetros para establecer la conexión con Snowflake. """
account_id = "replace_by_your_account_id"
region = "eu-west-1"
host = f"{account_id}.{region}"
username = "replace_by_your_user"
password = "replace_by_your_password"
database = "GX_DB"
schema_name = "PUBLIC"
warehouse = "COMPUTE_WH"
role = "ACCOUNTADMIN"
"""
Nombre de la tabla donde residen los datos que queremos validar con el Data Assistant.
"""
table_name = "car_features"
""" Definimos el YAML con configuración del Data Source de Snowflake. """
snowflake_datasource_yaml = f"""
name: {datasource_name}
class_name: Datasource
execution_engine:
class_name: SqlAlchemyExecutionEngine
credentials:
host: {host}
username: {username}
database: {database}
query:
schema: {schema_name}
warehouse: {warehouse}
role: {role}
password: {password}
drivername: snowflake
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
default_inferred_data_connector_name:
class_name: InferredAssetSqlDataConnector
include_schema_name: True
introspection_directives:
schema_name: {schema_name}
default_configured_data_connector_name:
class_name: ConfiguredAssetSqlDataConnector
assets:
{table_name}:
class_name: Asset
schema_name: {schema_name}
"""
"""
Comprobamos que el YAML es correcto realizando un test, el cual se conecta a Snowflake
y comprueba si existe la tabla configurada en el YAML.
"""
context.test_yaml_config(yaml_config=snowflake_datasource_yaml)
""" Persistimos el YAML del Data Source en el contexto de GX. """
sanitize_yaml_and_save_datasource(context, snowflake_datasource_yaml, overwrite_existing=False)
En la siguiente imagen podemos ver los atributos del Data Source necesarios para poder conectarse con Snowflake.
Para obtener más detalles sobre la configuración de la conexión con Snowflake, puedes ir a este enlace.
Hasta aquí hemos configurado tres Data Sources, cada uno asociado a una fuente de datos diferente: S3, Athena y Snowflake.
Ahora bien, para usar el Data Assistant, podemos hacerlo mediante el notebook o mediante la CLI de GX. Como hasta el momento no hemos usado la CLI en demasía, vamos a emplearla.
Fundamentalmente, el Data Assistant se encarga de generar métricas y Expectation automáticamente haciendo introspección de los datos mediante el uso de un Profiler. Para usarlo ejecutamos el siguiente comando en la consola.
great_expectations suite new
Una vez ejecutado el comando, nos indica cómo queremos crear la Expectation Suite ofreciendo 3 opciones. Seleccionamos la opción 3, correspondiente al Data Assistant.
A continuación, el Data Assistant nos solicita con qué Data Source, de los que hemos configurado anteriormente, queremos trabajar.
Por ejemplo, si seleccionamos el Data Source de Athena, debemos seleccionar el Data Connector: default_configured_data_connector_name y luego seleccionar la tabla de Athena donde están almacenados los datos.
Por otro lado, si seleccionamos el Data Source de Snowflake, también debemos seleccionar el Data Connector: default_configured_data_connector_name y acto seguido seleccionar la tabla de Snowflake donde están los datos.
Cabe destacar, que independientemente del Data Source que hayamos seleccionado, lo que se explica de aquí en adelante, aplica tanto para S3, como para Athena y Snowflake.
Como no queremos que este post sea infinito, vamos a centrarnos en el Data Source restante que se conecta a S3, para ello cuando el Data Assistant nos da a elegir el Data Source, seleccionamos la opción correspondiente al s3_datasource.
Acto seguido, el Data Assistant nos muestra los ficheros disponibles en S3 y seleccionamos el primer fichero “data_car_pricing_01”, seleccionado la opción 1. Por último, nos pide que le asignemos un nombre a la Expectation Suite que para nuestro caso es: “s3_data_assistant_expectation_suite”.
Después de seleccionar el data asset o fichero con los datos sobre el cual vamos a crear la Expectation Suite, GX crea un notebook que contiene código “boilerplate” que explicamos posteriormente. Para acceder al notebook GX nos proporciona una URL que podemos extraer de la terminal desde donde estemos lanzando los comandos.
Al ir a la URL vemos que hay varios notebooks pudiendo acceder al que corresponda con el Data Assistant.
Si le echamos un vistazo al código autogenerado que contiene el notebook, vemos que en el primer bloque de código se configura un Batch Request que básicamente genera varios Batch (máximo 1000) a partir del fichero “data_car_pricing_01.csv” almacenado en el Data Source “s3_datasource” y haciendo uso del Data Connector “default_inferred_data_connector_name”, ambos configurados en el fichero great_expectations.yml.
import datetime
import pandas as pd
import great_expectations as gx
import great_expectations.jupyter_ux
from great_expectations.core.batch import BatchRequest
from great_expectations.checkpoint import SimpleCheckpoint
from great_expectations.exceptions import DataContextError
context = gx.get_context()
batch_request = {
"datasource_name": "s3_datasource",
"data_connector_name": "default_inferred_data_connector_name",
"data_asset_name": "data/car_prices/data_car_pricing_01",
"limit": 1000,
}
expectation_suite_name = "s3_edata_assistant_expectation_suite"
validator = context.get_validator(
batch_request=BatchRequest(**batch_request),
expectation_suite_name=expectation_suite_name,
)
column_names = [f'"{column_name}"' for column_name in validator.columns()]
print(f"Columns: {', '.join(column_names)}.")
validator.head(n_rows=5, fetch_all=False)
Posteriormente, se crea un array indicando las columnas del fichero CSV que se quieren excluir durante la ejecución del Data Assistant. Por defecto, excluye todas las columnas, pero podemos comentar algunas de ellas para que se generen mayor cantidad de métricas y por consiguiente más Expectation.
exclude_column_names = [
"make",
"model",
"year",
#"engine_fuel_type",
#"engine_hp",
#"engine_cylinders",
#"transmission_type",
#"driven_wheels",
"number_of_doors",
"market_category",
#"vehicle_size",
#"vehicle_style",
#"highway_mpg",
#"city_mpg",
"popularity",
"msrp",
]
En el siguiente fragmento de código se ejecuta el Data Assistant encargado de crear la Expectation Suite que contiene las Expectations candidatas, en base al Batch de registros creado previamente, todo esto mediante el método context.assistants.onboarding.run(). Luego, mediante el método get_expectation_suite() se obtiene la Expectation Suite subyacente generada por el Data Assistant.
result = context.assistants.onboarding.run(
batch_request=batch_request,
exclude_column_names=exclude_column_names,
)
validator.expectation_suite = result.get_expectation_suite(
expectation_suite_name=expectation_suite_name
)
Este último fragmento de código persiste la Expectation Suite en disco y, además, configura el Checkpoint encargado de ejecutar el Validator que como sabemos ejecuta la Expectation Suite sobre el Batch de datos. Finalmente, los resultados de la ejecución del Checkpoint son transformados en los Data Docs, usando un Renderer, que permiten identificar de manera legible las Expectations fallidas y las que han terminado satisfactoriamente.
print(validator.get_expectation_suite(discard_failed_expectations=False))
validator.save_expectation_suite(discard_failed_expectations=False)
checkpoint_config = {
"class_name": "SimpleCheckpoint",
"validations": [
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name,
}
],
}
checkpoint = SimpleCheckpoint(
f"{validator.active_batch_definition.data_asset_name}_{expectation_suite_name}",
context,
**checkpoint_config,
)
checkpoint_result = checkpoint.run()
context.build_data_docs()
validation_result_identifier = checkpoint_result.list_validation_result_identifiers()[0]
context.open_data_docs(resource_identifier=validation_result_identifier)
Para acceder a los Data Docs generados vamos al directorio del Data Context y abrimos el fichero index.html ubicado en la siguiente ruta:
<Path del Data Context>/uncommited/datadocs/local_site/
Al abrir el fichero HTML veremos un Dashboard con 2 apartados: Validation Results y Expectation Suites. En el primero, vemos las distintas ejecuciones de validaciones que han tenido lugar, pudiendo acceder a cada ejecución para ver los resultados.
Si accedemos al detalle de los resultados de la validación podemos ver las Expectation a nivel de tabla y a nivel de columnas.
En el segundo apartado del Dashboard veremos la Expectation Suite que se ha creado pudiendo acceder igualmente al detalle de la misma.
Al acceder a la Expectation Suite podemos ver las Expectation que se han generado automáticamente a nivel de tabla y a nivel de cada una de las columnas especificadas en el Data Assistant ejecutado previamente. Como podemos apreciar en las siguientes imágenes, para cada columna se definen una serie de validaciones que comprueban que el fichero en formato tabular tenga las columnas indicadas o que tenga cierta cantidad de registros.
También podemos observar validaciones que comprueban que las columnas no tengan valores nulos, que los valores estén dentro de un rango numérico, que cumplan un patrón definido en una expresión regular, etc.
Incluso tiene en cuenta validaciones sobre los valores que podemos extraer de las estadísticas de los datos, como pueden ser el valor mínimo, valor máximo, cuartiles, media, desviación estándar, etc.
Hasta el momento lo que hemos realizado es crear una Expectation Suite a partir de un subconjunto de datos guardado en S3. Ahora vamos a realizar una validación sobre el segundo fichero de datos que tenemos en S3, “data_car_pricing_02.csv”, usando la Expectation Suite que hemos creado.
Para ello vamos a crear un Checkpoint mediante la ejecución del siguiente comando:
great_expectations checkpoint new gx_s3_checkpoint
Usamos la URL para acceder al nuevo notebook que se ha generado con el Checkpoint cuyo nombre es “gx_s3_checkpoint”.
Analizando el código autogenerado del notebook, podemos observar que en el primer bloque de código se obtiene el contexto de GX y se genera un fichero YAML con la configuración del Checkpoint en donde se define un Batch Request, que es una petición al Data Source para obtener los registros del fichero especificado en el campo data_asset_name.
from ruamel.yaml import YAML
import great_expectations as gx
from pprint import pprint
yaml = YAML()
context = gx.get_context()
my_checkpoint_name = "gx_s3_checkpoint"
yaml_config = f"""
name: {my_checkpoint_name}
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S-run-my-checkpoint-on-s3"
validations:
- batch_request:
datasource_name: s3_datasource
data_connector_name: default_inferred_data_connector_name
data_asset_name: data/car_prices/data_car_pricing_02
data_connector_query:
index: -1
expectation_suite_name: s3_edata_assistant_expectation_suite
"""
El siguiente código ejecuta el método test_yaml_config() que se encarga de verificar que el YAML sea correcto y además se conecta a S3. Acto seguido, se añade el Checkpoint al contexto y se ejecuta mediante el método run_checkpoint(). Finalmente, los resultados del Checkpoint se convierten en Data Docs y accedemos a estos a través del método open_data_docs(). Todos los métodos anteriores pertenecen al contexto de GX, de ahí la importancia de su uso.
my_checkpoint = context.test_yaml_config(yaml_config=yaml_config)
context.add_checkpoint(**yaml.load(yaml_config))
context.run_checkpoint(checkpoint_name=my_checkpoint_name)
context.open_data_docs()
Al abrir los Data Doc vemos que la ejecución del Checkpoint que hemos ejecutado anteriormente presenta errores.
Si accedemos al detalle de la ejecución vemos de manera precisa los errores generados. Por ejemplo en esta primera imagen, nos indica que se esperaba que el fichero “data_car_pricing_02.csv” tuviese 5898 registros, que corresponde al número de registros del fichero “data_car_pricing_01.csv” y que se usó para generar la Expectation Suite.
Para nuestro caso práctico la validación anterior no tiene mucho sentido, con lo cual podríamos eliminarla o modificarla accediendo al fichero JSON donde se ha persistido la Expectation Suite.
<Path al Data Context>/expectations/s3_data_assistant_expectation_suite.json
En el siguiente JSON podemos ver como se define una Expectation de manera declarativa y que el Data Assistant genera automáticamente.
{
"expectation_type": "expect_table_row_count_to_be_between",
"kwargs": {
"max_value": 5898,
"min_value": 5898
},
"meta": {
"profiler_details": {
"metric_configuration": {
"domain_kwargs": {},
"metric_name": "table.row_count",
"metric_value_kwargs": null
},
"num_batches": 1
}
}
}
En la siguiente Expectation se valida que la columna engine_fuel_type no tenga nulos, pero falla porque se detectan 3 filas con valor nulo. Además, se valida que los valores estén dentro de un conjunto finito de valores, que tengan cierta longitud o que la fracción de valores únicos estén dentro de baremo.
En el siguiente JSON podemos ver la definición de la Expectation que valida que la columna engine_fuel_type no tenga valores nulos:
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "engine_fuel_type"
},
"meta": {
"profiler_details": {
"metric_configuration": {
"domain_kwargs": {
"column": "engine_fuel_type"
},
"metric_name": "column_values.nonnull.unexpected_count",
"metric_value_kwargs": null
},
"num_batches": 1
}
}
}
En esta Expectation sobre la columna engine_hp fallan todas las validaciones, por tanto, el usuario tendrá que ajustarlas modificando el JSON donde están definidas.
En el siguiente JSON vemos la definición de la Expectation que valida que los valores de la columna engine hp están dentro de un mínimo y un máximo:
{
"expectation_type": "expect_column_min_to_be_between",
"kwargs": {
"column": "engine_hp",
"max_value": 62.0,
"min_value": 62.0,
"strict_max": false,
"strict_min": false
},
"meta": {
"profiler_details": {
"metric_configuration": {
"domain_kwargs": {
"column": "engine_hp"
},
"metric_name": "column.min",
"metric_value_kwargs": null
},
"num_batches": 1
}
}
}
A lo largo de este post hemos explicado cómo usar el Data Assistant, el cual es muy fácil de utilizar. Independientemente de la fuente de datos que elijas, el modo de usar la librería no varía. Te animo a que uses el Data Assistant cuando no estás familiarizado con Great Expectations o si quieres tener una primera toma de contacto explorando conjuntos de datos que desconoces.
Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.
Cuéntanos qué te parece.