/
Intérprete de Spark SQL PySpark

Intérprete de Spark SQL PySpark

 

Introducción

En este tutorial se va a explicar cómo utilizar Spark SQL en PySpark (basado en Spark 2.x).

https://spark.apache.org/sql/

Antes de empezar, es necesario aclarar un par de conceptos de Spark SQL:

  • SparkSession: este es el punto de entrada de Spark SQL, y se necesita usar SparkSession para crear un DataFrame/Dataset, registrar UDF, consultar tablas y etc.

  • DataFrame: no existen Datasets como tal en PySpark, sólo DataFrames. El DataFrame de PySpark es muy similar al concepto DataFrame de Pandas, pero está distribuido.

Hay dos maneras de crear un DataFrame:

  • Usando SparkSession para crear el DataFrame directamente: a partir de RDD, objetos tipo List, etc.

  • Utilizando DataFrameReader para crear un DataFrame desde muchos tipos de almacenamiento soportados por Spark, como HDFS, JDBC, etc.

Dicho esto, a continuación se muestran algunos ejemplos de cómo trabajar con SparkSQL.

Se recomienda ejecutar en primer lugar un párrafo con «%spark.conf» para asegurarse de que se utiliza la configuración correcta.

Ejemplo de uso

Crear un Dataset/DataFrame a través de SparkSession

En el siguiente ejemplo se va a crear un DataFrame de Spark a partir de unos datos cualesquiera. Además de mostrarlos en forma de tabla, se mostrará el esquema de datos.

Esto mismo se podrá hacer utilizando Pandas.

%pyspark # create DataFrame from python list. It can infer schema for you. df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country") df1.printSchema() df1.show() # create DataFrame from pandas dataframe df2 = spark.createDataFrame(df1.toPandas()) df2.printSchema() df2.show()

Ejecutando el código, el resultado será el siguiente:

image-20250213-103010.png

Crear un DataFrame mediante DataFrameReader

En el siguiente ejemplo, los datos se van a leer desde diferentes archivos; un JSON y un CSV. Para ello se usará el DataFrameReader.

%pyspark SPARK_HOME = os.getenv('SPARK_HOME') # Read data from json file # link for this people.json (https://github.com/apache/spark/blob/master/examples/src/main/resources/people.json) # Use hdfs path if you are using hdfs df1 = spark.read.json("file://" + SPARK_HOME + "/examples/src/main/resources/people.json") df1.printSchema() df1.show() # Read data from csv file. You can customize it via spark.read.options. E.g. In the following example, we customize the sep and header df2 = spark.read.options(sep=";", header=True).csv("file://" + SPARK_HOME + "/examples/src/main/resources/people.csv") df2.printSchema() df2.show() # Specify schema for your csv file from pyspark.sql.types import StructType, StringType, IntegerType schema = StructType().add("name", StringType(), True) \ .add("age", IntegerType(), True) \ .add("job", StringType(), True) df3 = spark.read.options(sep=";", header=True) \ .schema(schema) \ .csv("file://" + SPARK_HOME + "/examples/src/main/resources/people.csv")

Ejecutando el párrafo, el resultado se mostrará tal que así:

image-20250213-111247.png

Añadir una nueva columna

Aunque se genere una tabla en el DataFrame, o se importe la tabla desde un archivo, éstas no están bloqueadas y están abiertas a añadir más datos en ella. Con el siguiente código, se genera una tabla y, posteriormente, se le añaden nuevas columnas.

%pyspark # withColumn could be used to add new Column df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country") df2 = df1.withColumn("age2", df1["age"] + 1) df2.show() # the new column could replace the existing the column if the new column name is the same as the old column df3 = df1.withColumn("age", df1["age"] + 1) df3.show() # Besides using expression to create new column, you could also use udf to create new column # Use F.upper instead of upper, because the builtin udf of spark may conclifct with that of python, such as max import pyspark.sql.functions as F df4 = df1.withColumn("name", F.upper(df1["name"])) df4.show()

El resultado de ejecutar el párrafo es el siguiente:

Elimina una columna

De igual forma, partiendo del nombre de la cabecera de una columna, se puede eliminar sin mucho problema con el siguiente código:

Partiendo de la tabla del ejemplo anterior, el resultado quedaría así:

Seleccionar subconjunto de columnas

Siguiendo con las columnas, es posible seleccionar uno subconjunto de éstas. Esto se puede hacer tal como se muestra a continuación:

El resultado sería el siguiente:

Filtrado de columnas

Es posible también filtrar el contenido de las columnas según filtros sencillos, tanto numéricos como de texto. Se puede ver en el siguiente código:

Ejecutando el código, la tabla generada con el DataFrame se filtra según los campos de edad o de país:

Funciones definidas por el usuario

Hay ciertos momentos en los que es necesario interactuar con los datos de una manera algo más elaborada que meros filtros. Para ello, es posible usar funciones de Spark de Python (PySpark UDF).

En el siguiente ejemplo se aprecia cómo utilizar una lambda de Python junto a una función personalizada:

Lanzando el párrafo, el resultado obtenido tendría este aspecto:

Agrupaciones

Al igual que con SQL, es posible llevar a cabo agrupaciones según los parámetros definidos en el código del párrafo. Un ejemplo sería el siguiente código que presenta varios tipos de agrupaciones:

  • Agrupa los datos por país y contando el número de registros para cada uno.

  • Agrupa los datos por país, calculando la edad promedio y contando el número de registros para cada país (sin y con alias)

  • Agrupa los datos por país, calculando la edad promedio la edad máxima para cada país.

Tras ejecutar el párrafo, el resultado sería:

Uniones sencillas

Otro tipo de operación que se puede realizar es la unión sencilla a partir de un único campo. Así, dados dos DataFrames, se indicará a partir del campo por el que realizar la unión.

El resultado de ejecutar el párrafo mostraría las siguientes uniones:

Uniones complejas

También va a ser posible realizar uniones más complejas a partir de varios campos. En el siguiente ejemplo se van a crear dos DataFrames, uno con información personal y otro con información de países, y luego realiza una unión entre ambos conjuntos de datos usando dos campos comunes para combinar los datos.

Tras ejecutar el párrafo, el resultado que se mostrará es el siguiente:

Usar SQL directamente

También va a resultar posible lanzar consultas de SQL. Como ejemplo, en el siguiente párrafo se creará un DataFrame con datos de personas, registrándose en una vista temporal para llevar a cabo consultas SQL, para seleccionar columnas y aplicar una función personalizada (UDF).

Tras la ejecución se visualizará las siguientes tablas:

Mostrar tablas

Aunque ya se ha visualizado datos en forma tabulada, es posible representar la información en tablas más avanzadas, con opciones y una interfaz gráfica más agradable a la vista.

Con el siguiente código se podrá visualizar la tabla:

Ejecutando el párrafo, se obtiene este resultado:

Visualizar un DataFrame/Dataset mediante z.show

Además de representar los datos de forma tabulada, también se puede representar en diferentes tipos de gráficas. Un ejemplo es el siguiente, en el que se puede representar los datos de manera

Tras lanzar el párrafo, se visualizarían los datos en forma gráfica:

Otros tipos de visualizaciones

Existen otras formas de visualizar los datos, por lo que se anima a ir probando las opciones disponibles.

En el siguiente ejemplo mediante spark.sql se puede ver una visualización mediante gráfico circular.

Ejecutando el párrafo se visualizará el siguiente gráfico:

Conclusiones

Gracias a PySpark, es posible representar y trabajar con datos tabulares de manera sencilla y ágil. Resulta por tanto un complemento muy útil a la hora de trabajar con los Notebooks, permitiendo al analista de datos contar con un amplio abanico de posibilidades tanto para el análisis como la representación de los datos.

Descargar ejemplo

A continuación se encuentra disponible para descargar el ejemplo utilizado en este tutorial:

 

Related content

Cómo instalar nuevas librerías con el intérprete de Spark
Cómo instalar nuevas librerías con el intérprete de Spark
More like this
¿Cómo ejecutar una aplicación Spark con StreamSets?
¿Cómo ejecutar una aplicación Spark con StreamSets?
More like this
Data Exploitation Notebook with Spark
Data Exploitation Notebook with Spark
More like this
Using Data Across Interpreters
Using Data Across Interpreters
More like this
How to run a Spark application with StreamSets
How to run a Spark application with StreamSets
More like this
A look at the Notebooks
A look at the Notebooks
More like this