Intérprete de Spark SQL PySpark

Intérprete de Spark SQL PySpark

 

Introducción

En este tutorial se va a explicar cómo utilizar Spark SQL en PySpark (basado en Spark 2.x).

Antes de empezar, es necesario aclarar un par de conceptos de Spark SQL:

  • SparkSession: este es el punto de entrada de Spark SQL, y se necesita usar SparkSession para crear un DataFrame/Dataset, registrar UDF, consultar tablas y etc.

  • DataFrame: no existen Datasets como tal en PySpark, sólo DataFrames. El DataFrame de PySpark es muy similar al concepto DataFrame de Pandas, pero está distribuido.

Hay dos maneras de crear un DataFrame:

  • Usando SparkSession para crear el DataFrame directamente: a partir de RDD, objetos tipo List, etc.

  • Utilizando DataFrameReader para crear un DataFrame desde muchos tipos de almacenamiento soportados por Spark, como HDFS, JDBC, etc.

Dicho esto, a continuación se muestran algunos ejemplos de cómo trabajar con SparkSQL.

Se recomienda ejecutar en primer lugar un párrafo con «%spark.conf» para asegurarse de que se utiliza la configuración correcta.

Ejemplo de uso

Crear un Dataset/DataFrame a través de SparkSession

En el siguiente ejemplo se va a crear un DataFrame de Spark a partir de unos datos cualesquiera. Además de mostrarlos en forma de tabla, se mostrará el esquema de datos.

Esto mismo se podrá hacer utilizando Pandas.

%pyspark # create DataFrame from python list. It can infer schema for you. df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country") df1.printSchema() df1.show() # create DataFrame from pandas dataframe df2 = spark.createDataFrame(df1.toPandas()) df2.printSchema() df2.show()

Ejecutando el código, el resultado será el siguiente:

image-20250213-103010.png

Crear un DataFrame mediante DataFrameReader

En el siguiente ejemplo, los datos se van a leer desde diferentes archivos; un JSON y un CSV. Para ello se usará el DataFrameReader.

%pyspark SPARK_HOME = os.getenv('SPARK_HOME') # Read data from json file # link for this people.json (https://github.com/apache/spark/blob/master/examples/src/main/resources/people.json) # Use hdfs path if you are using hdfs df1 = spark.read.json("file://" + SPARK_HOME + "/examples/src/main/resources/people.json") df1.printSchema() df1.show() # Read data from csv file. You can customize it via spark.read.options. E.g. In the following example, we customize the sep and header df2 = spark.read.options(sep=";", header=True).csv("file://" + SPARK_HOME + "/examples/src/main/resources/people.csv") df2.printSchema() df2.show() # Specify schema for your csv file from pyspark.sql.types import StructType, StringType, IntegerType schema = StructType().add("name", StringType(), True) \ .add("age", IntegerType(), True) \ .add("job", StringType(), True) df3 = spark.read.options(sep=";", header=True) \ .schema(schema) \ .csv("file://" + SPARK_HOME + "/examples/src/main/resources/people.csv")

Ejecutando el párrafo, el resultado se mostrará tal que así:

image-20250213-111247.png

Añadir una nueva columna

Aunque se genere una tabla en el DataFrame, o se importe la tabla desde un archivo, éstas no están bloqueadas y están abiertas a añadir más datos en ella. Con el siguiente código, se genera una tabla y, posteriormente, se le añaden nuevas columnas.

%pyspark # withColumn could be used to add new Column df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country") df2 = df1.withColumn("age2", df1["age"] + 1) df2.show() # the new column could replace the existing the column if the new column name is the same as the old column df3 = df1.withColumn("age", df1["age"] + 1) df3.show() # Besides using expression to create new column, you could also use udf to create new column # Use F.upper instead of upper, because the builtin udf of spark may conclifct with that of python, such as max import pyspark.sql.functions as F df4 = df1.withColumn("name", F.upper(df1["name"])) df4.show()

El resultado de ejecutar el párrafo es el siguiente:

image-20250213-111701.png

Elimina una columna

De igual forma, partiendo del nombre de la cabecera de una columna, se puede eliminar sin mucho problema con el siguiente código:

%pyspark df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country") # drop could be used to remove Column df2 = df1.drop("id") df2.show()

Partiendo de la tabla del ejemplo anterior, el resultado quedaría así:

image-20250213-112340.png

Seleccionar subconjunto de columnas

Siguiendo con las columnas, es posible seleccionar uno subconjunto de éstas. Esto se puede hacer tal como se muestra a continuación:

%pyspark df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country") # select can accept a list of string of the column names df2 = df1.select("id", "name") df2.show() # select can also accept a list of Column. You can create column via $ or udf import pyspark.sql.functions as F df3 = df1.select(df1["id"], F.upper(df1["name"]), df1["age"] + 1) df3.show()

El resultado sería el siguiente:

image-20250213-112649.png

Filtrado de columnas

Es posible también filtrar el contenido de las columnas según filtros sencillos, tanto numéricos como de texto. Se puede ver en el siguiente código:

%pyspark df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country") # filter accept a Column df2 = df1.filter(df1["age"] >= 20) df2.show() # To be noticed, you need to use "&" instead of "&&" or "AND" df3 = df1.filter((df1["age"] >= 20) & (df1["country"] == "China")) df3.show()

Ejecutando el código, la tabla generada con el DataFrame se filtra según los campos de edad o de país:

image-20250213-130923.png

Funciones definidas por el usuario

Hay ciertos momentos en los que es necesario interactuar con los datos de una manera algo más elaborada que meros filtros. Para ello, es posible usar funciones de Spark de Python (PySpark UDF).

En el siguiente ejemplo se aprecia cómo utilizar una lambda de Python junto a una función personalizada:

%pyspark df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]) \ .toDF("id", "name", "age", "country") # Create udf create python lambda from pyspark.sql.functions import udf udf1 = udf(lambda e: e.upper()) df2 = df1.select(udf1(df1["name"])) df2.show() # UDF could also be used in filter, in this case the return type must be Boolean # We can also use annotation to create udf from pyspark.sql.types import * @udf(returnType=BooleanType()) def udf2(e): if e >= 20: return True; else: return False df3 = df1.filter(udf2(df1["age"])) df3.show() # UDF could also accept more than 1 argument. udf3 = udf(lambda e1, e2: e1 + "_" + e2) df4 = df1.select(udf3(df1["name"], df1["country"]).alias("name_country")) df4.show()

Lanzando el párrafo, el resultado obtenido tendría este aspecto:

image-20250213-140726.png

Agrupaciones

Al igual que con SQL, es posible llevar a cabo agrupaciones según los parámetros definidos en el código del párrafo. Un ejemplo sería el siguiente código que presenta varios tipos de agrupaciones:

  • Agrupa los datos por país y contando el número de registros para cada uno.

  • Agrupa los datos por país, calculando la edad promedio y contando el número de registros para cada país (sin y con alias)

  • Agrupa los datos por país, calculando la edad promedio la edad máxima para cada país.

%pyspark df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]) \ .toDF("id", "name", "age", "country") # You can call agg function after groupBy directly, such as count/min/max/avg/sum df2 = df1.groupBy("country").count() df2.show() # Pass a Map if you want to do multiple aggregation df3 = df1.groupBy("country").agg({"age": "avg", "id": "count"}) df3.show() import pyspark.sql.functions as F # Or you can pass a list of agg function df4 = df1.groupBy("country").agg(F.avg(df1["age"]).alias("avg_age"), F.count(df1["id"]).alias("count")) df4.show() # You can not pass Map if you want to do multiple aggregation on the same column as the key of Map should be unique. So in this case # you have to pass a list of agg functions df5 = df1.groupBy("country").agg(F.avg(df1["age"]).alias("avg_age"), F.max(df1["age"]).alias("max_age")) df5.show()

Tras ejecutar el párrafo, el resultado sería:

image-20250213-142158.png

Uniones sencillas

Otro tipo de operación que se puede realizar es la unión sencilla a partir de un único campo. Así, dados dos DataFrames, se indicará a partir del campo por el que realizar la unión.

%pyspark df1 = spark.createDataFrame([(1, "andy", 20, 1), (2, "jeff", 23, 2), (3, "james", 18, 3)]).toDF("id", "name", "age", "c_id") df1.show() df2 = spark.createDataFrame([(1, "USA"), (2, "China")]).toDF("c_id", "c_name") df2.show() # You can just specify the key name if join on the same key df3 = df1.join(df2, "c_id") df3.show() # Or you can specify the join condition expclitly in case the key is different between tables df4 = df1.join(df2, df1["c_id"] == df2["c_id"]) df4.show() # You can specify the join type afte the join condition, by default it is inner join df5 = df1.join(df2, df1["c_id"] == df2["c_id"], "left_outer") df5.show()

El resultado de ejecutar el párrafo mostraría las siguientes uniones:

image-20250213-142516.png

Uniones complejas

También va a ser posible realizar uniones más complejas a partir de varios campos. En el siguiente ejemplo se van a crear dos DataFrames, uno con información personal y otro con información de países, y luego realiza una unión entre ambos conjuntos de datos usando dos campos comunes para combinar los datos.

%pyspark df1 = spark.createDataFrame([("andy", 20, 1, 1), ("jeff", 23, 1, 2), ("james", 12, 2, 2)]).toDF("name", "age", "key_1", "key_2") df1.show() df2 = spark.createDataFrame([(1, 1, "USA"), (2, 2, "China")]).toDF("key_1", "key_2", "country") df2.show() # Join on 2 fields: key_1, key_2 # You can pass a list of field name if the join field names are the same in both tables df3 = df1.join(df2, ["key_1", "key_2"]) df3.show() # Or you can specify the join condition expclitly in case when the join fields name is differetnt in the two tables df4 = df1.join(df2, (df1["key_1"] == df2["key_1"]) & (df1["key_2"] == df2["key_2"])) df4.show()

Tras ejecutar el párrafo, el resultado que se mostrará es el siguiente:

image-20250213-143506.png

Usar SQL directamente

También va a resultar posible lanzar consultas de SQL. Como ejemplo, en el siguiente párrafo se creará un DataFrame con datos de personas, registrándose en una vista temporal para llevar a cabo consultas SQL, para seleccionar columnas y aplicar una función personalizada (UDF).

%pyspark df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]) \ .toDF("id", "name", "age", "country") # call createOrReplaceTempView first if you want to query this DataFrame via sql df1.createOrReplaceTempView("people") # SparkSession.sql return DataFrame df2 = spark.sql("select name, age from people") df2.show() # You need to register udf if you want to use it in sql spark.udf.register("udf1", lambda e : e.upper()) df3 = spark.sql("select udf1(name), age from people") df3.show()

Tras la ejecución se visualizará las siguientes tablas:

image-20250213-144018.png

Mostrar tablas

Aunque ya se ha visualizado datos en forma tabulada, es posible representar la información en tablas más avanzadas, con opciones y una interfaz gráfica más agradable a la vista.

Con el siguiente código se podrá visualizar la tabla:

%spark.sql show tables

Ejecutando el párrafo, se obtiene este resultado:

image-20250213-144807.png

Visualizar un DataFrame/Dataset mediante z.show

Además de representar los datos de forma tabulada, también se puede representar en diferentes tipos de gráficas. Un ejemplo es el siguiente, en el que se puede representar los datos de manera.

%pyspark df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country") df2 = df1.groupBy("country").count() z.show(df2)

Tras lanzar el párrafo, se visualizarían los datos en forma gráfica:

image-20250213-153619.png

Otros tipos de visualizaciones

Existen otras formas de visualizar los datos, por lo que se anima a ir probando las opciones disponibles.

En el siguiente ejemplo mediante spark.sql se puede ver una visualización mediante gráfico circular.

%spark.sql select country, count(1) as count from people group by country

Ejecutando el párrafo se visualizará el siguiente gráfico:

image-20250213-153952.png

Conclusiones

Gracias a PySpark, es posible representar y trabajar con datos tabulares de manera sencilla y ágil. Resulta por tanto un complemento muy útil a la hora de trabajar con los Notebooks, permitiendo al analista de datos contar con un amplio abanico de posibilidades tanto para el análisis como la representación de los datos.

Descargar ejemplo

A continuación se encuentra disponible para descargar el ejemplo utilizado en este tutorial: