################ template to run PySpark on Colab #######################
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark1 = SparkSession.builder.appName('basic').getOrCreate()
#Test must give no error
import pyspark
################ end template PySpark on Colab ##########################
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("basic").setMaster("local")
#sc = SparkContext(conf=conf) ## for jupyter and Databricks
sc = SparkContext.getOrCreate() ## for Colab
from pyspark.sql.types import *
Dataframe¶
Load dataset¶
!wget https://frenzy86.s3.eu-west-2.amazonaws.com/fav/tecno/shirts.csv
df0 = spark.read.load("shirts.csv", format="csv", sep=",", inferSchema="true", header="true")
df0.show()
--2020-06-18 08:31:49-- https://frenzy86.s3.eu-west-2.amazonaws.com/fav/tecno/shirts.csv
Resolving frenzy86.s3.eu-west-2.amazonaws.com (frenzy86.s3.eu-west-2.amazonaws.com)... 52.95.150.54
Connecting to frenzy86.s3.eu-west-2.amazonaws.com (frenzy86.s3.eu-west-2.amazonaws.com)|52.95.150.54|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1741 (1.7K) [application/vnd.ms-excel]
Saving to: ‘shirts.csv’
shirts.csv 0%[ ] 0 --.-KB/s
shirts.csv 100%[===================>] 1.70K --.-KB/s in 0s
2020-06-18 08:31:49 (79.7 MB/s) - ‘shirts.csv’ saved [1741/1741]
+---+------+------+------+
|_c0|taglia|colore|prezzo|
+---+------+------+------+
| 0| S|bianco| 4.99|
| 1| M|bianco| 19.99|
| 2| XL|bianco| 12.49|
| 3| XL|bianco| 14.99|
| 4| S|bianco| 14.99|
| 5| S| verde| 7.99|
| 6| M| verde| 4.99|
| 7| L| verde| 12.49|
| 8| XL|bianco| 12.49|
| 9| M| verde| 19.99|
| 10| L|bianco| 14.99|
| 11| XL|bianco| 19.99|
| 12| M|bianco| 4.99|
| 13| L|bianco| 7.99|
| 14| M|bianco| 14.99|
| 15| XL| rosso| 9.99|
| 16| S| rosso| 12.49|
| 17| L|bianco| 7.99|
| 18| XL|bianco| 4.99|
| 19| M| verde| 14.99|
+---+------+------+------+
only showing top 20 rows
Creazione dataframe¶
Possiamo creare un nuovo Dataframe usando il metodo .createDataFrame(data, names) dell’oggetto SparkSession, questo metodo ha bisogno di due parametri:
Una lista di tuple, in cui ogni tupla corrisponde ad una riga del Dataframe.
Una lista con i nomi per le colonne
data = [("Gianluca", "M", 23, 174, 70.5),
("Andrea", "M", 37, 179, 68.),
("Marco", "M", 33, 172, 88.5),
("Annalisa", "F", 38, 155, 50.2),
("Monica", "F", 25, 165, 54.3)]
df = spark.createDataFrame(data, ["name", "gender", "age", "height","weight"])
df.show()
+--------+------+---+------+------+
| name|gender|age|height|weight|
+--------+------+---+------+------+
|Gianluca| M| 23| 174| 70.5|
| Andrea| M| 37| 179| 68.0|
| Marco| M| 33| 172| 88.5|
|Annalisa| F| 38| 155| 50.2|
| Monica| F| 25| 165| 54.3|
+--------+------+---+------+------+
df.show(3)
+--------+------+---+------+------+
| name|gender|age|height|weight|
+--------+------+---+------+------+
|Gianluca| M| 23| 174| 70.5|
| Andrea| M| 37| 179| 68.0|
| Marco| M| 33| 172| 88.5|
+--------+------+---+------+------+
only showing top 3 rows
df.columns
['name', 'gender', 'age', 'height', 'weight']
Per stampare lo schema del Dataframe, cioè le informazioni legate ad ogni attributo (nome, tipo, se può essere null), possiamo usare il metodo .printSchema().
df.printSchema()
root
|-- name: string (nullable = true)
|-- gender: string (nullable = true)
|-- age: long (nullable = true)
|-- height: long (nullable = true)
|-- weight: double (nullable = true)
Possiamo visualizzare una serie di informazioni statistiche (count, valore medio, deviazione standard, valore minimo e massimo) usando il metodo .describe(),
df.describe().show()
+-------+------+------+-----------------+-----------------+-----------------+
|summary| name|gender| age| height| weight|
+-------+------+------+-----------------+-----------------+-----------------+
| count| 5| 5| 5| 5| 5|
| mean| null| null| 31.2| 169.0| 66.3|
| stddev| null| null|6.870225614927067|9.300537618869138|15.13753612712452|
| min|Andrea| F| 23| 155| 50.2|
| max|Monica| M| 38| 179| 88.5|
+-------+------+------+-----------------+-----------------+-----------------+
Modificare lo schema¶
lo schema del Dataframe è stato estratto direttamente dai dati, ma se volessimo definirlo noi ?
age: da long a intero.
height: da long a intero.
weight: da double a float.
Possiamo farlo creando uno schema per poi passarlo al metodo .createDataFrame(data, schema).
data_schema = [StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
StructField('height', IntegerType(), True),
StructField('weight', FloatType(), True)]
schema = StructType(fields=data_schema)
Adesso creiamo il Dataframe, passando i dati e lo schema all’interno del parametro schema.
df = spark.createDataFrame(data, schema=schema)
df.show()
+--------+------+---+------+------+
| name|gender|age|height|weight|
+--------+------+---+------+------+
|Gianluca| M| 23| 174| 70.5|
| Andrea| M| 37| 179| 68.0|
| Marco| M| 33| 172| 88.5|
|Annalisa| F| 38| 155| 50.2|
| Monica| F| 25| 165| 54.3|
+--------+------+---+------+------+
df.printSchema()
root
|-- name: string (nullable = true)
|-- gender: string (nullable = true)
|-- age: integer (nullable = true)
|-- height: integer (nullable = true)
|-- weight: float (nullable = true)
Righe Colonne¶
df.head(5)
[Row(name='Gianluca', gender='M', age=23, height=174, weight=70.5),
Row(name='Andrea', gender='M', age=37, height=179, weight=68.0),
Row(name='Marco', gender='M', age=33, height=172, weight=88.5),
Row(name='Annalisa', gender='F', age=38, height=155, weight=50.20000076293945),
Row(name='Monica', gender='F', age=25, height=165, weight=54.29999923706055)]
Per selezionare solo una colonna del Dataframe possiamo usare l’indice o il nome
df[0]
Column<b'name'>
df["name"]
Column<b'name'>
dfName = df.select("name")
dfName.show()
+--------+
| name|
+--------+
|Gianluca|
| Andrea|
| Marco|
|Annalisa|
| Monica|
+--------+
Possiamo usare lo stesso metodo per selezionare più colonne, passando una lista di nomi come parametro.
df.select(["name","age"]).show()
+--------+---+
| name|age|
+--------+---+
|Gianluca| 23|
| Andrea| 37|
| Marco| 33|
|Annalisa| 38|
| Monica| 25|
+--------+---+
Creare e modificare colonne¶
Possiamo modificare una determinata colonna utilizzando il metodo .withColumn(name, column). alla quale dovremo passare il nome della riga che dovremmo modificare e un oggetto colonna
df = df.withColumn("height", df["height"]/100)
df.show()
+--------+------+---+------+------+
| name|gender|age|height|weight|
+--------+------+---+------+------+
|Gianluca| M| 23| 1.74| 70.5|
| Andrea| M| 37| 1.79| 68.0|
| Marco| M| 33| 1.72| 88.5|
|Annalisa| F| 38| 1.55| 50.2|
| Monica| F| 25| 1.65| 54.3|
+--------+------+---+------+------+
df.printSchema()
root
|-- name: string (nullable = true)
|-- gender: string (nullable = true)
|-- age: integer (nullable = true)
|-- height: double (nullable = true)
|-- weight: float (nullable = true)
Lo schema è stato modificato automaticamente, dato che abbiamo convertito l’altezza da numeri interi a numeri con la virgola.
l’indice di massa corporea (bmi) di ogni persona: \($ bmi = (\frac{weight}{height})^2 $\)
bmi = df["weight"]/(df["height"]**2)
df = df.withColumn("bmi", bmi)
df.show()
+--------+------+---+------+------+------------------+
| name|gender|age|height|weight| bmi|
+--------+------+---+------+------+------------------+
|Gianluca| M| 23| 1.74| 70.5|23.285770907649624|
| Andrea| M| 37| 1.79| 68.0| 21.22280827689523|
| Marco| M| 33| 1.72| 88.5| 29.91481882098432|
|Annalisa| F| 38| 1.55| 50.2| 20.89490146220164|
| Monica| F| 25| 1.65| 54.3|19.944903301032344|
+--------+------+---+------+------+------------------+
from pyspark.sql.functions import round
df = df.withColumn("bmi", round(df["bmi"], 2))
df.show()
+--------+------+---+------+------+-----+
| name|gender|age|height|weight| bmi|
+--------+------+---+------+------+-----+
|Gianluca| M| 23| 1.74| 70.5|23.29|
| Andrea| M| 37| 1.79| 68.0|21.22|
| Marco| M| 33| 1.72| 88.5|29.91|
|Annalisa| F| 38| 1.55| 50.2|20.89|
| Monica| F| 25| 1.65| 54.3|19.94|
+--------+------+---+------+------+-----+
from pyspark.sql.functions import col, when
df = df.withColumn("is_fat", when(col("bmi")>25, True).otherwise(False))
df.show()
+--------+------+---+------+------+-----+------+
| name|gender|age|height|weight| bmi|is_fat|
+--------+------+---+------+------+-----+------+
|Gianluca| M| 23| 1.74| 70.5|23.29| false|
| Andrea| M| 37| 1.79| 68.0|21.22| false|
| Marco| M| 33| 1.72| 88.5|29.91| true|
|Annalisa| F| 38| 1.55| 50.2|20.89| false|
| Monica| F| 25| 1.65| 54.3|19.94| false|
+--------+------+---+------+------+-----+------+
Per finire rinominiamo la colonna gender in sex, possiamo farlo tramite il metodo .withColumnRenamed(old_name, new_name).
df = df.withColumnRenamed("gender","sex")
df.show()
+--------+---+---+------+------+-----+------+
| name|sex|age|height|weight| bmi|is_fat|
+--------+---+---+------+------+-----+------+
|Gianluca| M| 23| 1.74| 70.5|23.29| false|
| Andrea| M| 37| 1.79| 68.0|21.22| false|
| Marco| M| 33| 1.72| 88.5|29.91| true|
|Annalisa| F| 38| 1.55| 50.2|20.89| false|
| Monica| F| 25| 1.65| 54.3|19.94| false|
+--------+---+---+------+------+-----+------+
Filtri¶
df_male = df.filter("sex == 'M'")
df_male.show()
+--------+---+---+------+------+-----+------+
| name|sex|age|height|weight| bmi|is_fat|
+--------+---+---+------+------+-----+------+
|Gianluca| M| 23| 1.74| 70.5|23.29| false|
| Andrea| M| 37| 1.79| 68.0|21.22| false|
| Marco| M| 33| 1.72| 88.5|29.91| true|
+--------+---+---+------+------+-----+------+
oppure…
df_male = df.filter(df["sex"] == 'M')
df_male.show()
+--------+---+---+------+------+-----+------+
| name|sex|age|height|weight| bmi|is_fat|
+--------+---+---+------+------+-----+------+
|Gianluca| M| 23| 1.74| 70.5|23.29| false|
| Andrea| M| 37| 1.79| 68.0|21.22| false|
| Marco| M| 33| 1.72| 88.5|29.91| true|
+--------+---+---+------+------+-----+------+
Aggregazione¶
df_group = df.groupBy('sex')
type(df_group)
pyspark.sql.group.GroupedData
Il risultato sarà un’oggetto GroupedData, sulla quale possiamo eseguire diverse operazioni aritmetiche e statistiche, come conteggio
df_group.count().show()
+---+-----+
|sex|count|
+---+-----+
| F| 2|
| M| 3|
+---+-----+
df_group.mean().show()
+---+--------+-----------+-----------------+-----------------+
|sex|avg(age)|avg(height)| avg(weight)| avg(bmi)|
+---+--------+-----------+-----------------+-----------------+
| F| 31.5| 1.6| 52.25| 20.415|
| M| 31.0| 1.75|75.66666666666667|24.80666666666667|
+---+--------+-----------+-----------------+-----------------+
df_group.sum().show()
+---+--------+-----------+-----------+--------+
|sex|sum(age)|sum(height)|sum(weight)|sum(bmi)|
+---+--------+-----------+-----------+--------+
| F| 63| 3.2| 104.5| 40.83|
| M| 93| 5.25| 227.0| 74.42|
+---+--------+-----------+-----------+--------+
df_group.max().show()
+---+--------+-----------+-----------+--------+
|sex|max(age)|max(height)|max(weight)|max(bmi)|
+---+--------+-----------+-----------+--------+
| F| 38| 1.65| 54.3| 20.89|
| M| 37| 1.79| 88.5| 29.91|
+---+--------+-----------+-----------+--------+
df_group.min().show()
+---+--------+-----------+-----------+--------+
|sex|min(age)|min(height)|min(weight)|min(bmi)|
+---+--------+-----------+-----------+--------+
| F| 25| 1.55| 50.2| 19.94|
| M| 23| 1.72| 68.0| 21.22|
+---+--------+-----------+-----------+--------+
Possiamo anche operare su singole colonne usando il metodo .agg(op) del Dataframe, che prende come parametro un dizionario contenente nome della colonna e operazione da eseguire.
df.agg({'weight':'sum'}).show()
+-----------+
|sum(weight)|
+-----------+
| 331.5|
+-----------+
df_group.agg({'weight':'sum', 'height':'max', 'sex':'count'}).show()
+---+----------+-----------+-----------+
|sex|count(sex)|sum(weight)|max(height)|
+---+----------+-----------+-----------+
| F| 2| 104.5| 1.65|
| M| 3| 227.0| 1.79|
+---+----------+-----------+-----------+
il nome delle nuove colonne viene assegnato automaticamente, in base alla funzione ed alla colonna che abbiamo utilizzato, possiamo modificare tali nomi usando il metodo .withColumnRenamed(old_name, new_name).
df_group.agg({'weight':'sum', 'height':'max', 'sex':'count'})\
.withColumnRenamed("count(sex)","count_sex")\
.withColumnRenamed("sum(weight)","sum_weight")\
.withColumnRenamed("max(height)","max_height").show()
+---+---------+----------+----------+
|sex|count_sex|sum_weight|max_height|
+---+---------+----------+----------+
| F| 2| 104.5| 1.65|
| M| 3| 227.0| 1.79|
+---+---------+----------+----------+
Piuttosto che un dizionario, possiamo anche utilizzare delle funzioni.
from pyspark.sql.functions import sum, max, count
df_group.agg(sum("weight"), max('height'), count('sex')).show()
+---+-----------+-----------+----------+
|sex|sum(weight)|max(height)|count(sex)|
+---+-----------+-----------+----------+
| F| 104.5| 1.65| 2|
| M| 227.0| 1.79| 3|
+---+-----------+-----------+----------+
In questo caso per settare arbitrariamente i nomi delle colonne possiamo creare un alias
from pyspark.sql.functions import sum, max, count
df_group.agg(sum("weight").alias("sum_weight"), max('height').alias("max_height"), count('sex').alias("count_sex")).show()
+---+----------+----------+---------+
|sex|sum_weight|max_height|count_sex|
+---+----------+----------+---------+
| F| 104.5| 1.65| 2|
| M| 227.0| 1.79| 3|
+---+----------+----------+---------+
Ordinamento¶
Per ordinare un Dataframe possiamo utilizzare il metodo .orderBy(col), ad esempio ordiniamo in base al peso.
df.orderBy("weight").show()
+--------+---+---+------+------+-----+------+
| name|sex|age|height|weight| bmi|is_fat|
+--------+---+---+------+------+-----+------+
|Annalisa| F| 38| 1.55| 50.2|20.89| false|
| Monica| F| 25| 1.65| 54.3|19.94| false|
| Andrea| M| 37| 1.79| 68.0|21.22| false|
|Gianluca| M| 23| 1.74| 70.5|23.29| false|
| Marco| M| 33| 1.72| 88.5|29.91| true|
+--------+---+---+------+------+-----+------+
Di default l’ordinamento viene eseguito in maniera ascendente (dal valore minore al valore maggiore), per eseguirlo in maniera discendente ci basta impostare il parametro ascending a False.
df.orderBy("weight", ascending=False).show()
+--------+---+---+------+------+-----+------+
| name|sex|age|height|weight| bmi|is_fat|
+--------+---+---+------+------+-----+------+
| Marco| M| 33| 1.72| 88.5|29.91| true|
|Gianluca| M| 23| 1.74| 70.5|23.29| false|
| Andrea| M| 37| 1.79| 68.0|21.22| false|
| Monica| F| 25| 1.65| 54.3|19.94| false|
|Annalisa| F| 38| 1.55| 50.2|20.89| false|
+--------+---+---+------+------+-----+------+