Intérprete de Spark SQL PySpark
- 1 Introducción
- 2 Ejemplo de uso
- 2.1 Crear un Dataset/DataFrame a través de SparkSession
- 2.2 Crear un DataFrame mediante DataFrameReader
- 2.3 Añadir una nueva columna
- 2.4 Elimina una columna
- 2.5 Seleccionar subconjunto de columnas
- 2.6 Filtrado de columnas
- 2.7 Funciones definidas por el usuario
- 2.8 Agrupaciones
- 2.9 Uniones sencillas
- 2.10 Uniones complejas
- 2.11 Usar SQL directamente
- 2.12 Mostrar tablas
- 2.13 Visualizar un DataFrame/Dataset mediante z.show
- 2.14 Otros tipos de visualizaciones
- 3 Conclusiones
- 4 Descargar ejemplo
Introducción
En este tutorial se va a explicar cómo utilizar Spark SQL en PySpark (basado en Spark 2.x).
Antes de empezar, es necesario aclarar un par de conceptos de Spark SQL:
SparkSession: este es el punto de entrada de Spark SQL, y se necesita usar SparkSession para crear un DataFrame/Dataset, registrar UDF, consultar tablas y etc.
DataFrame: no existen Datasets como tal en PySpark, sólo DataFrames. El DataFrame de PySpark es muy similar al concepto DataFrame de Pandas, pero está distribuido.
Hay dos maneras de crear un DataFrame:
Usando SparkSession para crear el DataFrame directamente: a partir de RDD, objetos tipo List, etc.
Utilizando DataFrameReader para crear un DataFrame desde muchos tipos de almacenamiento soportados por Spark, como HDFS, JDBC, etc.
Dicho esto, a continuación se muestran algunos ejemplos de cómo trabajar con SparkSQL.
Se recomienda ejecutar en primer lugar un párrafo con «%spark.conf» para asegurarse de que se utiliza la configuración correcta.
Ejemplo de uso
Crear un Dataset/DataFrame a través de SparkSession
En el siguiente ejemplo se va a crear un DataFrame de Spark a partir de unos datos cualesquiera. Además de mostrarlos en forma de tabla, se mostrará el esquema de datos.
Esto mismo se podrá hacer utilizando Pandas.
%pyspark
# create DataFrame from python list. It can infer schema for you.
df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country")
df1.printSchema()
df1.show()
# create DataFrame from pandas dataframe
df2 = spark.createDataFrame(df1.toPandas())
df2.printSchema()
df2.show()
Ejecutando el código, el resultado será el siguiente:
Crear un DataFrame mediante DataFrameReader
En el siguiente ejemplo, los datos se van a leer desde diferentes archivos; un JSON y un CSV. Para ello se usará el DataFrameReader.
%pyspark
SPARK_HOME = os.getenv('SPARK_HOME')
# Read data from json file
# link for this people.json (https://github.com/apache/spark/blob/master/examples/src/main/resources/people.json)
# Use hdfs path if you are using hdfs
df1 = spark.read.json("file://" + SPARK_HOME + "/examples/src/main/resources/people.json")
df1.printSchema()
df1.show()
# Read data from csv file. You can customize it via spark.read.options. E.g. In the following example, we customize the sep and header
df2 = spark.read.options(sep=";", header=True).csv("file://" + SPARK_HOME + "/examples/src/main/resources/people.csv")
df2.printSchema()
df2.show()
# Specify schema for your csv file
from pyspark.sql.types import StructType, StringType, IntegerType
schema = StructType().add("name", StringType(), True) \
.add("age", IntegerType(), True) \
.add("job", StringType(), True)
df3 = spark.read.options(sep=";", header=True) \
.schema(schema) \
.csv("file://" + SPARK_HOME + "/examples/src/main/resources/people.csv")
Ejecutando el párrafo, el resultado se mostrará tal que así:
Añadir una nueva columna
Aunque se genere una tabla en el DataFrame, o se importe la tabla desde un archivo, éstas no están bloqueadas y están abiertas a añadir más datos en ella. Con el siguiente código, se genera una tabla y, posteriormente, se le añaden nuevas columnas.
%pyspark
# withColumn could be used to add new Column
df1 = spark.createDataFrame([(1, "andy", 20, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country")
df2 = df1.withColumn("age2", df1["age"] + 1)
df2.show()
# the new column could replace the existing the column if the new column name is the same as the old column
df3 = df1.withColumn("age", df1["age"] + 1)
df3.show()
# Besides using expression to create new column, you could also use udf to create new column
# Use F.upper instead of upper, because the builtin udf of spark may conclifct with that of python, such as max
import pyspark.sql.functions as F
df4 = df1.withColumn("name", F.upper(df1["name"]))
df4.show()
El resultado de ejecutar el párrafo es el siguiente:
Elimina una columna
De igual forma, partiendo del nombre de la cabecera de una columna, se puede eliminar sin mucho problema con el siguiente código:
Partiendo de la tabla del ejemplo anterior, el resultado quedaría así:
Seleccionar subconjunto de columnas
Siguiendo con las columnas, es posible seleccionar uno subconjunto de éstas. Esto se puede hacer tal como se muestra a continuación:
El resultado sería el siguiente:
Filtrado de columnas
Es posible también filtrar el contenido de las columnas según filtros sencillos, tanto numéricos como de texto. Se puede ver en el siguiente código:
Ejecutando el código, la tabla generada con el DataFrame se filtra según los campos de edad o de país:
Funciones definidas por el usuario
Hay ciertos momentos en los que es necesario interactuar con los datos de una manera algo más elaborada que meros filtros. Para ello, es posible usar funciones de Spark de Python (PySpark UDF).
En el siguiente ejemplo se aprecia cómo utilizar una lambda de Python junto a una función personalizada:
Lanzando el párrafo, el resultado obtenido tendría este aspecto:
Agrupaciones
Al igual que con SQL, es posible llevar a cabo agrupaciones según los parámetros definidos en el código del párrafo. Un ejemplo sería el siguiente código que presenta varios tipos de agrupaciones:
Agrupa los datos por país y contando el número de registros para cada uno.
Agrupa los datos por país, calculando la edad promedio y contando el número de registros para cada país (sin y con alias)
Agrupa los datos por país, calculando la edad promedio la edad máxima para cada país.
Tras ejecutar el párrafo, el resultado sería:
Uniones sencillas
Otro tipo de operación que se puede realizar es la unión sencilla a partir de un único campo. Así, dados dos DataFrames, se indicará a partir del campo por el que realizar la unión.
El resultado de ejecutar el párrafo mostraría las siguientes uniones:
Uniones complejas
También va a ser posible realizar uniones más complejas a partir de varios campos. En el siguiente ejemplo se van a crear dos DataFrames, uno con información personal y otro con información de países, y luego realiza una unión entre ambos conjuntos de datos usando dos campos comunes para combinar los datos.
Tras ejecutar el párrafo, el resultado que se mostrará es el siguiente:
Usar SQL directamente
También va a resultar posible lanzar consultas de SQL. Como ejemplo, en el siguiente párrafo se creará un DataFrame con datos de personas, registrándose en una vista temporal para llevar a cabo consultas SQL, para seleccionar columnas y aplicar una función personalizada (UDF).
Tras la ejecución se visualizará las siguientes tablas:
Mostrar tablas
Aunque ya se ha visualizado datos en forma tabulada, es posible representar la información en tablas más avanzadas, con opciones y una interfaz gráfica más agradable a la vista.
Con el siguiente código se podrá visualizar la tabla:
Ejecutando el párrafo, se obtiene este resultado:
Visualizar un DataFrame/Dataset mediante z.show
Además de representar los datos de forma tabulada, también se puede representar en diferentes tipos de gráficas. Un ejemplo es el siguiente, en el que se puede representar los datos de manera
Tras lanzar el párrafo, se visualizarían los datos en forma gráfica:
Otros tipos de visualizaciones
Existen otras formas de visualizar los datos, por lo que se anima a ir probando las opciones disponibles.
En el siguiente ejemplo mediante spark.sql
se puede ver una visualización mediante gráfico circular.
Ejecutando el párrafo se visualizará el siguiente gráfico:
Conclusiones
Gracias a PySpark, es posible representar y trabajar con datos tabulares de manera sencilla y ágil. Resulta por tanto un complemento muy útil a la hora de trabajar con los Notebooks, permitiendo al analista de datos contar con un amplio abanico de posibilidades tanto para el análisis como la representación de los datos.
Descargar ejemplo
A continuación se encuentra disponible para descargar el ejemplo utilizado en este tutorial: