¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
Conoce nuestra marca.¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
Conoce nuestra marca.dev
Andrés Navidad 15/11/2023 Cargando comentarios…
De sobra es conocido Apache Spark como motor de procesamiento de datos masivo. El objetivo de este post no es hacer una introducción a Spark, sino, más bien, ver qué capacidades ofrece el servicio Spark Serverless de Google Cloud, cuáles son sus ventajas, inconvenientes y limitaciones, así como un ejemplo de productivización de un proceso en la vida real.
Cuando hablamos de Spark Serverless en Google Cloud realmente nos referimos al servicio Dataproc Serverless. Nos permite ejecutar procesos batch de Spark sin tener que preocuparnos por la infraestructura subyacente ni por la gestión de recursos.
Spark Serverless utiliza un modelo de precios basado en uso. Los usuarios solo pagan por los recursos que utilizan, lo que puede ayudar a ahorrar dinero en cargas de trabajo de Spark que son de corta duración o que varían en tamaño.
A diferencia de Dataproc on Compute Engine, donde Google se encarga de la gestión y configuración de las máquinas y de los framework del ecosistema Hadoop y tienes que especificar parámetros de las máquinas, tipo de discos, etc., en Dataproc Serverless, lo único que tienes que especificar es la capacidad de cómputo que quieres para tu proceso.
Pasamos entonces de una visión genérica de configuración de cluster (donde se podrían ejecutar muchos procesos) a una visión de proceso. Aquí una tabla con las principales características del servicio Dataproc en sus diferentes sabores:
Función | Dataproc Serverless | Dataproc en Compute Engine |
---|---|---|
Frameworks de procesamiento | Batch: Spark 3.4 y versiones anteriores Interactivo: Kernel de PySpark para Spark 3.4 y versiones anteriores | Spark 3.3 y versiones anteriores Otros frameworks de código abierto, como Hive, Flink, Trino y Kafka |
Serverless | Sí | No |
Tiempo de arranque | 60 segundos | 90 segundos |
Control de la infraestructura | No | Sí |
Administración de recursos | Basado en Spark | Basada en YARN |
Asistencia de GPU | Planificado | Sí |
Sesiones interactivas | Sí | No |
Contenedores personalizados | Sí | No. |
Acceso a VM (por ejemplo, SSH) | No | Sí |
Versiones de Java | Java 17 y 11 | Versiones anteriores compatibles |
Asistencia de OS Login | No | Sí |
Los procesos Batch que podemos ejecutar en Dataproc Serverless son:
De momento está en preview, pero también se puede escribir y ejecutar código de manera interactiva en notebooks de Jupyter teniendo disponible una Spark Session.
Vamos a ver un ejemplo real de cómo ejecutar un proceso de Spark sobre Dataproc Serveless y llevarlo a producción. Los ejemplos de la documentación oficial, on-hands, labs, etc., son ideales para probar la tecnología, pero a veces quedan lejos de lo que es poner un proceso en producción.
Nuestro proceso leerá 131 millones de tickets que están en formato CSV almacenado en Cloud Storage, los procesará y los insertará en BigQuery.
Definición de variables globales que utilizaremos:
# GLOBAL VARS
export PROJECT_ID=spark-serverless-bigquery
export REGION_GCP=europe-west1
export ARTIFACT_REPOSITORY=spark-serverless
export IMAGE=spark-serverless-bigquery-goodly
Buckets necesarios:
#BUCKETS
gcloud storage buckets create gs://metadata-dataproc-goodly \
--project=$PROJECT_ID \
--default-storage-class=STANDARD \
--location=$REGION_GCP \
--uniform-bucket-level-access
gcloud storage buckets create gs://staging-dataproc-goodly \
--project=$PROJECT_ID \
--default-storage-class=STANDARD \
--location=$REGION_GCP \
--uniform-bucket-level-access
gcloud storage buckets create gs://temp_dataproc_bigquery_indirect \
--project=$PROJECT_ID \
--default-storage-class=STANDARD \
--location=$REGION_GCP \
--uniform-bucket-level-access
Configuración de networking. En este caso, vamos a crear una VPC desde 0, pero en un entorno productivo habrá que integrarse dentro de la VPC que esté definida y habilitar el private ip google access.
# NETWORKS
gcloud compute networks create spark-serverless-vpc \
--project=$PROJECT_ID \
--subnet-mode=auto \
--mtu=1460 \
--bgp-routing-mode=regional
gcloud compute firewall-rules create allow-internal-ingress \
--project=$PROJECT_ID \
--network="spark-serverless-vpc" \
--source-ranges="10.132.0.0/20" \
--direction="ingress" \
--action="allow" \
--rules="all"
gcloud compute networks subnets update spark-serverless-vpc \
--region=$REGION_GCP \
--enable-private-ip-google-access
Vamos a crear un repositorio de artefactos, en nuestro caso en GCP, pero puede ser cualquier otro que permita la persistencia de imágenes de docker.
# ARTIFACTORY
gcloud artifacts repositories create $ARTIFACT_REPOSITORY --repository-format=docker \
--location=$REGION_GCP --description="Docker repository"
En este punto, tenemos la infraestructura lista para lanzar por primera vez un proceso de CI sobre nuestro repositorio de código. Nosotros utilizaremos Cloud Build, pero puede utilizarse cualquier herramienta de Integración Continua.
gcloud builds submit --project=$PROJECT_ID \
--region=$REGION_GCP \
--substitutions=_REGION_GCP=$REGION_GCP,_REPOSITORY=$ARTIFACT_REPOSITORY,_IMAGE=$IMAGE \
--config=cloudbuild.yaml .
El fichero cloudbuild.yaml (donde definimos los pasos de CI y la construcción de la imagen) es tal que así:
steps:
- id: 'mvn test'
name: maven:3-jdk-11-slim
entrypoint: mvn
args: [ 'test' ]
- id: 'mvn package'
name: maven:3-jdk-11-slim
entrypoint: 'bash'
args:
- '-c'
- |
mvn package -DskipTests
echo $(mvn help:evaluate -Dexpression=project.version -q -DforceStdout) > version.txt
- id: 'Prepare provided jar'
name: 'gcr.io/cloud-builders/gsutil'
args: [ 'cp', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.28.0.jar','.' ]
- id: 'Build image'
name: 'gcr.io/cloud-builders/docker'
entrypoint: 'bash'
args:
- '-c'
- |
VERSION=$(cat version.txt)
docker build -t europe-west1-docker.pkg.dev/$PROJECT_ID/${_REPOSITORY}/${_IMAGE}:$VERSION \
--build-arg JAR_FILE=target/spark-serverless-bigquery-$VERSION-jar-with-dependencies.jar .
- id: 'Push image'
name: 'gcr.io/cloud-builders/docker'
entrypoint: 'bash'
args:
- '-c'
- |
VERSION=$(cat version.txt)
docker push europe-west1-docker.pkg.dev/$PROJECT_ID/${_REPOSITORY}/${_IMAGE}:$VERSION
substitutions:
_REPOSITORY: spark-serverless # default value
_IMAGE: spark-serverless-bigquery-goodly # default value
_REGION_GCP: europe-west1
options:
substitution_option: 'ALLOW_LOOSE'
logging: CLOUD_LOGGING_ONLY
Una vez construida la imagen, deberíamos tener en nuestro artifact registry (o equivalente) una imagen de docker con nuestro código dentro:
Tip: recomendamos seguir este artículo para el fichero Dockerfile que servirá para crear el custom container que lanzaremos en el siguiente punto.
Procedemos a lanzar nuestro proceso:
gcloud dataproc batches submit spark \
--batch `echo "csv-to-bq-dataproc-serverless-$(date "+%Y-%m-%d-%H%M%S")"` \
--properties spark.driver.cores=4\
,spark.driver.memory=8g\
,spark.dataproc.driver.disk.size=250g\
,spark.executor.cores=8\
,spark.executor.memory=16g\
,spark.dataproc.executor.disk.size=500g\
,spark.dynamicAllocation.enabled=true\
,spark.dynamicAllocation.initialExecutors=8\
,spark.dynamicAllocation.minExecutors=8\
,spark.dynamicAllocation.maxExecutors=8 \
--project=$PROJECT_ID \
--container-image=$REGION_GCP-docker.pkg.dev/spark-serverless-bigquery/$ARTIFACT_REPOSITORY/$IMAGE:1.0.1 \
--version=1.1 \
--region=$REGION_GCP \
--subnet=https://www.googleapis.com/compute/v1/projects/$PROJECT_ID/regions/$REGION_GCP/subnetworks/spark-serverless-vpc \
--class=com.paradigma.tech.goodly.sparkserverless.gcp.commitconf.CsvToBqDataprocServerless
Con esto hemos lanzado nuestro primer proceso de Spark y habríamos terminado aquí este post si el mundo fuese ideal, pero no lo es.
Cuando lanzas un proceso, puede fallar o, lo que es peor, puede no tener el performance que esperas. Es en ese punto donde la UI de Spark es fundamental, ya que nos permitirá ver nuestros Jobs, Stages, Tasks así como la distribución de los datos por las diferentes particiones, etc.
Por defecto, Dataproc Serverless no lo da, así que necesitamos un history server para poder tener toda la información visual que nos proporciona Spark por defecto.
# Creación History Server
gcloud dataproc clusters create phs-cluster \
--enable-component-gateway \
--bucket staging-dataproc-goodly --region $REGION_GCP \
--subnet spark-serverless-vpc --no-address \
--single-node --master-machine-type n2-standard-4 \
--master-boot-disk-size 200 --image-version 2.1-debian11 \
--properties mapred:mapreduce.jobhistory.read-only.dir-pattern=gs://metadata-dataproc-goodly/*/mapreduce-job-history/done,yarn:yarn.nodemanager.remote-app-log-dir=gs://metadata-dataproc-goodly/*/yarn-logs,spark:spark.eventLog.dir=gs://metadata-dataproc-goodly/spark-event-log/spark-job-history,spark:spark.history.fs.logDirectory=gs://metadata-dataproc-goodly/*/spark-job-history,spark:spark.history.custom.executor.log.url.applyIncompleteApplication=false,spark:spark.history.custom.executor.log.url={{YARN_LOG_SERVER_URL}}/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}} \
--project $PROJECT_ID
Si volvemos a lanzar el proceso, pero esta vez especificando que utilice el history server que hemos definido como parámetro (“--history-server-cluster=projects/$PROJECT_ID/regions/$REGION_GCP/clusters/phs-cluster \”), ya podremos ver habilitado el botón “View Spark History Server” en la interfaz de Dataproc > Serverless > Batch y nos llevará a la UI de Spark con información sobre nuestra ejecución.
Podemos ver, entre otros resultados, el tiempo de duración total del proceso (2,8 minutos), el tiempo que cada job, número de stages por job, etc.
Spark Serverless ofrece una serie de ventajas sobre los enfoques que requieren de infraestructura dedicada, como por ejemplo:
Pero también tiene algunos puntos de mejora:
Dataproc sobre Compute Engine. Cluster permanente 24x7
Hay que tener en cuenta que, en este caso, vamos a tener que pagar el cluster, independientemente de que se esté usando o no.
Dataproc Serveless
En cargas batch recurrentes, diarias, semanales, mensuales, etc. Debería de ser la opción por defecto y si con el tiempo, se dispone de tantas cargas que siempre hay alguna corriendo la consola, puede tener sentido crear un cluster con Dataproc sobre Compute Engine.
Dataproc sobre Compute Engine. Cluster efimeros
Esta opción es un punto intermedio entre las dos anteriores y probablemente deje de tener sentido teniendo ya la posibilidad de utilizar Dataproc Serverless. Consiste básicamente en crear un cluster, lanzar tus cargas y eliminar el cluster cuando hayan terminado.
Su uso puede acotarse a cuando no podamos lanzar una carga en Dataproc Serverless por alguna limitación técnica, como por ejemplo procesamiento con soporte para GPUs o procesos batch que duran más de 24 horas.
Como conclusión, podemos decir que Dataproc Serverless se convierte en la opción por defecto si quieres lanzar procesos de Spark teniendo en cuenta siempre las limitaciones técnicas que hemos comentado anteriormente.
Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.
Cuéntanos qué te parece.