Open In Colab

################ template to run PySpark on Colab #######################
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark1 = SparkSession.builder.appName('basic').getOrCreate()
#Test must give no error
import pyspark
################ end template PySpark on Colab ##########################
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("basic").setMaster("local")
#sc = SparkContext(conf=conf)  ## for jupyter and Databricks
sc = SparkContext.getOrCreate()   ## for Colab
from pyspark.sql.types import *

Dataframe

Load dataset

!wget https://frenzy86.s3.eu-west-2.amazonaws.com/fav/tecno/shirts.csv
df0 = spark.read.load("shirts.csv", format="csv", sep=",", inferSchema="true", header="true")
df0.show()
--2020-06-18 08:31:49--  https://frenzy86.s3.eu-west-2.amazonaws.com/fav/tecno/shirts.csv
Resolving frenzy86.s3.eu-west-2.amazonaws.com (frenzy86.s3.eu-west-2.amazonaws.com)... 52.95.150.54
Connecting to frenzy86.s3.eu-west-2.amazonaws.com (frenzy86.s3.eu-west-2.amazonaws.com)|52.95.150.54|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1741 (1.7K) [application/vnd.ms-excel]
Saving to: ‘shirts.csv’


shirts.csv            0%[                    ]       0  --.-KB/s               
shirts.csv          100%[===================>]   1.70K  --.-KB/s    in 0s      

2020-06-18 08:31:49 (79.7 MB/s) - ‘shirts.csv’ saved [1741/1741]

+---+------+------+------+
|_c0|taglia|colore|prezzo|
+---+------+------+------+
|  0|     S|bianco|  4.99|
|  1|     M|bianco| 19.99|
|  2|    XL|bianco| 12.49|
|  3|    XL|bianco| 14.99|
|  4|     S|bianco| 14.99|
|  5|     S| verde|  7.99|
|  6|     M| verde|  4.99|
|  7|     L| verde| 12.49|
|  8|    XL|bianco| 12.49|
|  9|     M| verde| 19.99|
| 10|     L|bianco| 14.99|
| 11|    XL|bianco| 19.99|
| 12|     M|bianco|  4.99|
| 13|     L|bianco|  7.99|
| 14|     M|bianco| 14.99|
| 15|    XL| rosso|  9.99|
| 16|     S| rosso| 12.49|
| 17|     L|bianco|  7.99|
| 18|    XL|bianco|  4.99|
| 19|     M| verde| 14.99|
+---+------+------+------+
only showing top 20 rows

Creazione dataframe

Possiamo creare un nuovo Dataframe usando il metodo .createDataFrame(data, names) dell’oggetto SparkSession, questo metodo ha bisogno di due parametri:

Una lista di tuple, in cui ogni tupla corrisponde ad una riga del Dataframe.
Una lista con i nomi per le colonne
data = [("Gianluca", "M", 23, 174, 70.5),
        ("Andrea", "M", 37, 179, 68.),
        ("Marco", "M", 33, 172, 88.5),
        ("Annalisa", "F", 38, 155, 50.2),
        ("Monica", "F", 25, 165, 54.3)]

df = spark.createDataFrame(data, ["name", "gender", "age", "height","weight"])
df.show()
+--------+------+---+------+------+
|    name|gender|age|height|weight|
+--------+------+---+------+------+
|Gianluca|     M| 23|   174|  70.5|
|  Andrea|     M| 37|   179|  68.0|
|   Marco|     M| 33|   172|  88.5|
|Annalisa|     F| 38|   155|  50.2|
|  Monica|     F| 25|   165|  54.3|
+--------+------+---+------+------+

df.show(3)
+--------+------+---+------+------+
|    name|gender|age|height|weight|
+--------+------+---+------+------+
|Gianluca|     M| 23|   174|  70.5|
|  Andrea|     M| 37|   179|  68.0|
|   Marco|     M| 33|   172|  88.5|
+--------+------+---+------+------+
only showing top 3 rows

df.columns
['name', 'gender', 'age', 'height', 'weight']

Per stampare lo schema del Dataframe, cioè le informazioni legate ad ogni attributo (nome, tipo, se può essere null), possiamo usare il metodo .printSchema().

df.printSchema()
root
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: long (nullable = true)
 |-- height: long (nullable = true)
 |-- weight: double (nullable = true)

Possiamo visualizzare una serie di informazioni statistiche (count, valore medio, deviazione standard, valore minimo e massimo) usando il metodo .describe(),

df.describe().show()
+-------+------+------+-----------------+-----------------+-----------------+
|summary|  name|gender|              age|           height|           weight|
+-------+------+------+-----------------+-----------------+-----------------+
|  count|     5|     5|                5|                5|                5|
|   mean|  null|  null|             31.2|            169.0|             66.3|
| stddev|  null|  null|6.870225614927067|9.300537618869138|15.13753612712452|
|    min|Andrea|     F|               23|              155|             50.2|
|    max|Monica|     M|               38|              179|             88.5|
+-------+------+------+-----------------+-----------------+-----------------+

Modificare lo schema

lo schema del Dataframe è stato estratto direttamente dai dati, ma se volessimo definirlo noi ?

age: da long a intero.
height: da long a intero.
weight: da double a float.

Possiamo farlo creando uno schema per poi passarlo al metodo .createDataFrame(data, schema).

data_schema = [StructField('name', StringType(), True),
                StructField('gender', StringType(), True),
                StructField('age', IntegerType(), True),
                StructField('height', IntegerType(), True),
                StructField('weight', FloatType(), True)]
            
schema = StructType(fields=data_schema)

Adesso creiamo il Dataframe, passando i dati e lo schema all’interno del parametro schema.

df = spark.createDataFrame(data, schema=schema)
df.show()
+--------+------+---+------+------+
|    name|gender|age|height|weight|
+--------+------+---+------+------+
|Gianluca|     M| 23|   174|  70.5|
|  Andrea|     M| 37|   179|  68.0|
|   Marco|     M| 33|   172|  88.5|
|Annalisa|     F| 38|   155|  50.2|
|  Monica|     F| 25|   165|  54.3|
+--------+------+---+------+------+

df.printSchema()
root
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- height: integer (nullable = true)
 |-- weight: float (nullable = true)

Righe Colonne

df.head(5)
[Row(name='Gianluca', gender='M', age=23, height=174, weight=70.5),
 Row(name='Andrea', gender='M', age=37, height=179, weight=68.0),
 Row(name='Marco', gender='M', age=33, height=172, weight=88.5),
 Row(name='Annalisa', gender='F', age=38, height=155, weight=50.20000076293945),
 Row(name='Monica', gender='F', age=25, height=165, weight=54.29999923706055)]

Per selezionare solo una colonna del Dataframe possiamo usare l’indice o il nome

df[0]
Column<b'name'>
df["name"]
Column<b'name'>
dfName = df.select("name")
dfName.show()
+--------+
|    name|
+--------+
|Gianluca|
|  Andrea|
|   Marco|
|Annalisa|
|  Monica|
+--------+

Possiamo usare lo stesso metodo per selezionare più colonne, passando una lista di nomi come parametro.

df.select(["name","age"]).show()
+--------+---+
|    name|age|
+--------+---+
|Gianluca| 23|
|  Andrea| 37|
|   Marco| 33|
|Annalisa| 38|
|  Monica| 25|
+--------+---+

Creare e modificare colonne

Possiamo modificare una determinata colonna utilizzando il metodo .withColumn(name, column). alla quale dovremo passare il nome della riga che dovremmo modificare e un oggetto colonna

df = df.withColumn("height", df["height"]/100)
df.show()
+--------+------+---+------+------+
|    name|gender|age|height|weight|
+--------+------+---+------+------+
|Gianluca|     M| 23|  1.74|  70.5|
|  Andrea|     M| 37|  1.79|  68.0|
|   Marco|     M| 33|  1.72|  88.5|
|Annalisa|     F| 38|  1.55|  50.2|
|  Monica|     F| 25|  1.65|  54.3|
+--------+------+---+------+------+

df.printSchema()
root
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: float (nullable = true)

Lo schema è stato modificato automaticamente, dato che abbiamo convertito l’altezza da numeri interi a numeri con la virgola.

l’indice di massa corporea (bmi) di ogni persona: \($ bmi = (\frac{weight}{height})^2 $\)

bmi = df["weight"]/(df["height"]**2)
df = df.withColumn("bmi", bmi)
df.show()
+--------+------+---+------+------+------------------+
|    name|gender|age|height|weight|               bmi|
+--------+------+---+------+------+------------------+
|Gianluca|     M| 23|  1.74|  70.5|23.285770907649624|
|  Andrea|     M| 37|  1.79|  68.0| 21.22280827689523|
|   Marco|     M| 33|  1.72|  88.5| 29.91481882098432|
|Annalisa|     F| 38|  1.55|  50.2| 20.89490146220164|
|  Monica|     F| 25|  1.65|  54.3|19.944903301032344|
+--------+------+---+------+------+------------------+

from pyspark.sql.functions import round

df = df.withColumn("bmi", round(df["bmi"], 2))
df.show()
+--------+------+---+------+------+-----+
|    name|gender|age|height|weight|  bmi|
+--------+------+---+------+------+-----+
|Gianluca|     M| 23|  1.74|  70.5|23.29|
|  Andrea|     M| 37|  1.79|  68.0|21.22|
|   Marco|     M| 33|  1.72|  88.5|29.91|
|Annalisa|     F| 38|  1.55|  50.2|20.89|
|  Monica|     F| 25|  1.65|  54.3|19.94|
+--------+------+---+------+------+-----+

from pyspark.sql.functions import col, when

df = df.withColumn("is_fat", when(col("bmi")>25, True).otherwise(False))
df.show()
+--------+------+---+------+------+-----+------+
|    name|gender|age|height|weight|  bmi|is_fat|
+--------+------+---+------+------+-----+------+
|Gianluca|     M| 23|  1.74|  70.5|23.29| false|
|  Andrea|     M| 37|  1.79|  68.0|21.22| false|
|   Marco|     M| 33|  1.72|  88.5|29.91|  true|
|Annalisa|     F| 38|  1.55|  50.2|20.89| false|
|  Monica|     F| 25|  1.65|  54.3|19.94| false|
+--------+------+---+------+------+-----+------+

Per finire rinominiamo la colonna gender in sex, possiamo farlo tramite il metodo .withColumnRenamed(old_name, new_name).

df = df.withColumnRenamed("gender","sex")
df.show()
+--------+---+---+------+------+-----+------+
|    name|sex|age|height|weight|  bmi|is_fat|
+--------+---+---+------+------+-----+------+
|Gianluca|  M| 23|  1.74|  70.5|23.29| false|
|  Andrea|  M| 37|  1.79|  68.0|21.22| false|
|   Marco|  M| 33|  1.72|  88.5|29.91|  true|
|Annalisa|  F| 38|  1.55|  50.2|20.89| false|
|  Monica|  F| 25|  1.65|  54.3|19.94| false|
+--------+---+---+------+------+-----+------+

Filtri

df_male = df.filter("sex == 'M'")
df_male.show()
+--------+---+---+------+------+-----+------+
|    name|sex|age|height|weight|  bmi|is_fat|
+--------+---+---+------+------+-----+------+
|Gianluca|  M| 23|  1.74|  70.5|23.29| false|
|  Andrea|  M| 37|  1.79|  68.0|21.22| false|
|   Marco|  M| 33|  1.72|  88.5|29.91|  true|
+--------+---+---+------+------+-----+------+

oppure…

df_male = df.filter(df["sex"] == 'M')
df_male.show()
+--------+---+---+------+------+-----+------+
|    name|sex|age|height|weight|  bmi|is_fat|
+--------+---+---+------+------+-----+------+
|Gianluca|  M| 23|  1.74|  70.5|23.29| false|
|  Andrea|  M| 37|  1.79|  68.0|21.22| false|
|   Marco|  M| 33|  1.72|  88.5|29.91|  true|
+--------+---+---+------+------+-----+------+

Aggregazione

df_group = df.groupBy('sex')
type(df_group)
pyspark.sql.group.GroupedData

Il risultato sarà un’oggetto GroupedData, sulla quale possiamo eseguire diverse operazioni aritmetiche e statistiche, come conteggio

df_group.count().show()
+---+-----+
|sex|count|
+---+-----+
|  F|    2|
|  M|    3|
+---+-----+

df_group.mean().show()
+---+--------+-----------+-----------------+-----------------+
|sex|avg(age)|avg(height)|      avg(weight)|         avg(bmi)|
+---+--------+-----------+-----------------+-----------------+
|  F|    31.5|        1.6|            52.25|           20.415|
|  M|    31.0|       1.75|75.66666666666667|24.80666666666667|
+---+--------+-----------+-----------------+-----------------+

df_group.sum().show()
+---+--------+-----------+-----------+--------+
|sex|sum(age)|sum(height)|sum(weight)|sum(bmi)|
+---+--------+-----------+-----------+--------+
|  F|      63|        3.2|      104.5|   40.83|
|  M|      93|       5.25|      227.0|   74.42|
+---+--------+-----------+-----------+--------+

df_group.max().show()
+---+--------+-----------+-----------+--------+
|sex|max(age)|max(height)|max(weight)|max(bmi)|
+---+--------+-----------+-----------+--------+
|  F|      38|       1.65|       54.3|   20.89|
|  M|      37|       1.79|       88.5|   29.91|
+---+--------+-----------+-----------+--------+

df_group.min().show()
+---+--------+-----------+-----------+--------+
|sex|min(age)|min(height)|min(weight)|min(bmi)|
+---+--------+-----------+-----------+--------+
|  F|      25|       1.55|       50.2|   19.94|
|  M|      23|       1.72|       68.0|   21.22|
+---+--------+-----------+-----------+--------+

Possiamo anche operare su singole colonne usando il metodo .agg(op) del Dataframe, che prende come parametro un dizionario contenente nome della colonna e operazione da eseguire.

df.agg({'weight':'sum'}).show()
+-----------+
|sum(weight)|
+-----------+
|      331.5|
+-----------+

df_group.agg({'weight':'sum', 'height':'max', 'sex':'count'}).show()
+---+----------+-----------+-----------+
|sex|count(sex)|sum(weight)|max(height)|
+---+----------+-----------+-----------+
|  F|         2|      104.5|       1.65|
|  M|         3|      227.0|       1.79|
+---+----------+-----------+-----------+

il nome delle nuove colonne viene assegnato automaticamente, in base alla funzione ed alla colonna che abbiamo utilizzato, possiamo modificare tali nomi usando il metodo .withColumnRenamed(old_name, new_name).

df_group.agg({'weight':'sum', 'height':'max', 'sex':'count'})\
            .withColumnRenamed("count(sex)","count_sex")\
            .withColumnRenamed("sum(weight)","sum_weight")\
            .withColumnRenamed("max(height)","max_height").show()
+---+---------+----------+----------+
|sex|count_sex|sum_weight|max_height|
+---+---------+----------+----------+
|  F|        2|     104.5|      1.65|
|  M|        3|     227.0|      1.79|
+---+---------+----------+----------+

Piuttosto che un dizionario, possiamo anche utilizzare delle funzioni.

from pyspark.sql.functions import sum, max, count

df_group.agg(sum("weight"), max('height'), count('sex')).show()
+---+-----------+-----------+----------+
|sex|sum(weight)|max(height)|count(sex)|
+---+-----------+-----------+----------+
|  F|      104.5|       1.65|         2|
|  M|      227.0|       1.79|         3|
+---+-----------+-----------+----------+

In questo caso per settare arbitrariamente i nomi delle colonne possiamo creare un alias

from pyspark.sql.functions import sum, max, count

df_group.agg(sum("weight").alias("sum_weight"), max('height').alias("max_height"), count('sex').alias("count_sex")).show()
+---+----------+----------+---------+
|sex|sum_weight|max_height|count_sex|
+---+----------+----------+---------+
|  F|     104.5|      1.65|        2|
|  M|     227.0|      1.79|        3|
+---+----------+----------+---------+

Ordinamento

Per ordinare un Dataframe possiamo utilizzare il metodo .orderBy(col), ad esempio ordiniamo in base al peso.

df.orderBy("weight").show()
+--------+---+---+------+------+-----+------+
|    name|sex|age|height|weight|  bmi|is_fat|
+--------+---+---+------+------+-----+------+
|Annalisa|  F| 38|  1.55|  50.2|20.89| false|
|  Monica|  F| 25|  1.65|  54.3|19.94| false|
|  Andrea|  M| 37|  1.79|  68.0|21.22| false|
|Gianluca|  M| 23|  1.74|  70.5|23.29| false|
|   Marco|  M| 33|  1.72|  88.5|29.91|  true|
+--------+---+---+------+------+-----+------+

Di default l’ordinamento viene eseguito in maniera ascendente (dal valore minore al valore maggiore), per eseguirlo in maniera discendente ci basta impostare il parametro ascending a False.

df.orderBy("weight", ascending=False).show()
+--------+---+---+------+------+-----+------+
|    name|sex|age|height|weight|  bmi|is_fat|
+--------+---+---+------+------+-----+------+
|   Marco|  M| 33|  1.72|  88.5|29.91|  true|
|Gianluca|  M| 23|  1.74|  70.5|23.29| false|
|  Andrea|  M| 37|  1.79|  68.0|21.22| false|
|  Monica|  F| 25|  1.65|  54.3|19.94| false|
|Annalisa|  F| 38|  1.55|  50.2|20.89| false|
+--------+---+---+------+------+-----+------+