"Open

In [None]:
################ template to run PySpark on Colab #######################

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark1 = SparkSession.builder.appName('basic').getOrCreate()
#Test must give no error

In [4]:
import pyspark

In [5]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("basic").setMaster("local")
#sc = SparkContext(conf=conf) ## for jupyter and Databricks
sc = SparkContext.getOrCreate() ## for Colab

In [6]:
from pyspark.sql.types import *

# SQL e Dataframe 
Lo Structured Query Language (SQL) è un linguaggio standardizzato per poter interrogare ed operare su Database relazionai. SQL Supporta le seguenti operazioni:

* creare e modificare schemi di database 
* inserire, modificare e gestire dati memorizzati
* interrogare i dati memorizzati
* creare e gestire strumenti di controllo e accesso ai dati

E' possibile utilizzare query SQL su di un Dataframe Spark

In [7]:
data = [("Gianluca", "M", 23, 174, 70.5),
 ("Andrea", "M", 37, 179, 68.),
 ("Marco", "M", 33, 172, 88.5),
 ("Annalisa", "F", 38, 155, 50.2),
 ("Monica", "F", 25, 165, 54.3)]

df = spark.createDataFrame(data, ["name", "gender", "age", "height","weight"])

## Creare una View
Una view è una tabella virtuale, solitamente frutto di un'operazione di SELECT tra più tabelle. Non viene salvata su disco ma risiede in memoria e al termine della sessione viene cancellata. Possiamo creare una View partendo da un Dataframe usando il metodo *.createTempView(name)*.

In [None]:
df.createTempView("people")

Se riproviamo a ricreare la stessa view con lo stesso metodo, otterremo un'eccezione che ci informa del fatto che tale view esiste già.

In [None]:
df.createTempView("people")

AnalysisException: "Temporary view 'people' already exists;"

In questi casi ci conviene usare il metodo *.createOrReplaceTempView(name)*.

In [None]:
df.createOrReplaceTempView("People")

## Comandi di selezione
I comandi SQL sono gli stessi sia per le tabelle che per le view, dato che quest'ultime non sono altro che tabelle virtuali.

Possiamo selezionare una colonna della view usando l'istruzione *SELECT column FROM view*.

In [10]:
df_sql = spark.sql("SELECT name FROM People")
type(df_sql)

pyspark.sql.dataframe.DataFrame

Il risultato della query sarà sempre un Dataframe, che quindi possiamo stampare usa

In [11]:
df_sql.show()

+--------+
| name|
+--------+
|Gianluca|
| Andrea|
| Marco|
|Annalisa|
| Monica|
+--------+



**NOTA BENE** SQL è case-insensitive, quindi non fa distinsione tra maiuscole e minuscole.

In [12]:
spark.sql("Select nAmE FRoM people").show()

+--------+
| nAmE|
+--------+
|Gianluca|
| Andrea|
| Marco|
|Annalisa|
| Monica|
+--------+



Possiamo utilizzare lo stesso comando anche per selezionare più colonne.

In [13]:
spark.sql("SELECT name, age FROM People").show()

+--------+---+
| name|age|
+--------+---+
|Gianluca| 23|
| Andrea| 37|
| Marco| 33|
|Annalisa| 38|
| Monica| 25|
+--------+---+



Per selezionarle tutte possiamo utilizzare il selettore *.

In [14]:
spark.sql("SELECT * FROM People").show()

+--------+------+---+------+------+
| name|gender|age|height|weight|
+--------+------+---+------+------+
|Gianluca| M| 23| 174| 70.5|
| Andrea| M| 37| 179| 68.0|
| Marco| M| 33| 172| 88.5|
|Annalisa| F| 38| 155| 50.2|
| Monica| F| 25| 165| 54.3|
+--------+------+---+------+------+



Possiamo effettuare una selezione basata su una condizione utilizzando il comando WHERE, ad esempio selezioniamo nome e altezza solo per gli uomini.

In [15]:
spark.sql("SELECT name, height FROM People WHERE gender='M'").show()

+--------+------+
| name|height|
+--------+------+
|Gianluca| 174|
| Andrea| 179|
| Marco| 172|
+--------+------+



Possiamo ordinare il risultato in base ai valori di una colonna utilizzando il comando ORDER BY, ad esempio ordiniamo le righe in base al peso.

In [16]:
spark.sql("SELECT * FROM People ORDER BY weight").show()

+--------+------+---+------+------+
| name|gender|age|height|weight|
+--------+------+---+------+------+
|Annalisa| F| 38| 155| 50.2|
| Monica| F| 25| 165| 54.3|
| Andrea| M| 37| 179| 68.0|
|Gianluca| M| 23| 174| 70.5|
| Marco| M| 33| 172| 88.5|
+--------+------+---+------+------+



di default l'ordinamento viene effettuato in maniera ascendente (cioè dal valore più piccolo a quello più grande), se vogliamo effettuare un'ordinamento discendente ci basta aggiungere DESC.

In [17]:
spark.sql("SELECT * FROM People ORDER BY weight DESC").show()

+--------+------+---+------+------+
| name|gender|age|height|weight|
+--------+------+---+------+------+
| Marco| M| 33| 172| 88.5|
|Gianluca| M| 23| 174| 70.5|
| Andrea| M| 37| 179| 68.0|
| Monica| F| 25| 165| 54.3|
|Annalisa| F| 38| 155| 50.2|
+--------+------+---+------+------+



Se vogliamo limitare il numero di righe restituite dalla nostra query possiamo utilizzare LIMIT, ad esempio basandoci sulla query eseguita appena sopra limitiamo il risultato alle sole prime 3 righe.

In [18]:
spark.sql("SELECT * FROM People ORDER BY weight DESC LIMIT 3").show()

+--------+------+---+------+------+
| name|gender|age|height|weight|
+--------+------+---+------+------+
| Marco| M| 33| 172| 88.5|
|Gianluca| M| 23| 174| 70.5|
| Andrea| M| 37| 179| 68.0|
+--------+------+---+------+------+



Per contare il numero di risultati possiamo utilizzare count, ad esempio contiamo il numero di persone di sesso maschile e più alte di 175 cm.

In [19]:
spark.sql("SELECT count(*) FROM People WHERE gender='M' and height>175").show()

+--------+
|count(1)|
+--------+
| 1|
+--------+



In questo caso il nome della colonna (count(1)) viene impostato automaticamente, se vogliamo modificarlo possiamo creare un alias con il comando AS, ad esempio modifichiamo in counter.

In [20]:
spark.sql("SELECT count(*) AS counter FROM People WHERE gender='M' and height>175").show()

+-------+
|counter|
+-------+
| 1|
+-------+



Per concludere vediamo una serie di funzioni statistiche, come la media (avg), usiamola per calolare il peso medio degli uomini.

In [21]:
spark.sql("SELECT avg(weight) AS avg_weight FROM People WHERE gender='M'").show()

+-----------------+
| avg_weight|
+-----------------+
|75.66666666666667|
+-----------------+



oppure peso massimo (max) e minimo (min), sempre per i soli uomini

In [22]:
spark.sql("SELECT max(weight) AS max_weight, min(weight) AS min_weight FROM People WHERE gender='M'").show()

+----------+----------+
|max_weight|min_weight|
+----------+----------+
| 88.5| 68.0|
+----------+----------+



Utilizzando il comando GROUP BY possiamo raggruppare le righe in base al valore di una colonna, ad esempio raggruppiamo in base al sesso e poi calcoliamo peso medio, massimo e minimo.

In [23]:
spark.sql("SELECT gender, avg(weight) AS avg_weight, max(weight) AS max_weight, min(weight) AS min_weight FROM People GROUP BY gender").show()

+------+-----------------+----------+----------+
|gender| avg_weight|max_weight|min_weight|
+------+-----------------+----------+----------+
| F| 52.25| 54.3| 50.2|
| M|75.66666666666667| 88.5| 68.0|
+------+-----------------+----------+----------+



Il peso medio per l'uomo ha troppe cifre dopo la virgola, arrotondiamole utilizzando la funzione round.

In [24]:
spark.sql("SELECT gender, round(avg(weight), 2) AS avg_weight, max(weight) AS max_weight, min(weight) AS min_weight FROM People GROUP BY gender").show()

+------+----------+----------+----------+
|gender|avg_weight|max_weight|min_weight|
+------+----------+----------+----------+
| F| 52.25| 54.3| 50.2|
| M| 75.67| 88.5| 68.0|
+------+----------+----------+----------+



## Creare una tabella permanente
Come abbiamo già detto, la view viene salvata in RAM e al termine della sessione viene perduta, per creare una tabella permanente in memoria partendo da un Dataframe possiamo usare il metodo *.saveAsTable(name).*, per usare questo metodo dobbiamo importare la classe *DataFrameWriter*.

In [25]:
from pyspark.sql import DataFrameWriter

df.write.saveAsTable("People")

Adesso nella directory nella quale stiamo lavorando dovremmo trovarci una cartella spark-warehouse contentente un'altra cartella people

In [27]:
!ls spark-warehouse/

people


In [28]:
df.write.saveAsTable("People2")

In [29]:
!ls spark-warehouse/

people	people2


 e qui si trova il nostro database, se proviamo a creare nuovamente la tabella otterremo un'errore, che ci informa che questa già esiste.

In [30]:
df.write.saveAsTable("People")

AnalysisException: ignored

In questi casi dobbiamo utilizzare il metodo *.mode(m)*, passando al suo interno:
* 'append': per eseguire un join con la tabella già esistente, se possibile.
* 'overwrite': per sovrascrivere la tabella già esistente.

In [31]:
df.write.mode('append').saveAsTable("People")

Le istruzioni di selezione per le tabelle sono esattamente le stesse di quelle che abbiamo visto su per le view.

In [32]:
spark.sql("SELECT * FROM People").show()

+--------+------+---+------+------+
| name|gender|age|height|weight|
+--------+------+---+------+------+
|Gianluca| M| 23| 174| 70.5|
| Andrea| M| 37| 179| 68.0|
| Marco| M| 33| 172| 88.5|
|Annalisa| F| 38| 155| 50.2|
| Monica| F| 25| 165| 54.3|
+--------+------+---+------+------+

