################ template to run PySpark on Colab #######################
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark1 = SparkSession.builder.appName('basic').getOrCreate()
#Test must give no error
import pyspark
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("basic").setMaster("local")
#sc = SparkContext(conf=conf) ## for jupyter and Databricks
sc = SparkContext.getOrCreate() ## for Colab
from pyspark.sql.types import *
SQL e Dataframe¶
Lo Structured Query Language (SQL) è un linguaggio standardizzato per poter interrogare ed operare su Database relazionai. SQL Supporta le seguenti operazioni:
creare e modificare schemi di database
inserire, modificare e gestire dati memorizzati
interrogare i dati memorizzati
creare e gestire strumenti di controllo e accesso ai dati
E’ possibile utilizzare query SQL su di un Dataframe Spark
data = [("Gianluca", "M", 23, 174, 70.5),
("Andrea", "M", 37, 179, 68.),
("Marco", "M", 33, 172, 88.5),
("Annalisa", "F", 38, 155, 50.2),
("Monica", "F", 25, 165, 54.3)]
df = spark.createDataFrame(data, ["name", "gender", "age", "height","weight"])
Creare una View¶
Una view è una tabella virtuale, solitamente frutto di un’operazione di SELECT tra più tabelle. Non viene salvata su disco ma risiede in memoria e al termine della sessione viene cancellata. Possiamo creare una View partendo da un Dataframe usando il metodo .createTempView(name).
df.createTempView("people")
Se riproviamo a ricreare la stessa view con lo stesso metodo, otterremo un’eccezione che ci informa del fatto che tale view esiste già.
df.createTempView("people")
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
~/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
~/.local/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
Py4JJavaError: An error occurred while calling o40.createTempView.
: org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException: Temporary view 'people' already exists;
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTempView(SessionCatalog.scala:495)
at org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:146)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3406)
at org.apache.spark.sql.Dataset.createTempView(Dataset.scala:3082)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
AnalysisException Traceback (most recent call last)
<ipython-input-5-df07ec3d2e4b> in <module>
----> 1 df.createTempView("people")
~/spark/python/pyspark/sql/dataframe.py in createTempView(self, name)
159
160 """
--> 161 self._jdf.createTempView(name)
162
163 @since(2.0)
~/.local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1284 answer = self.gateway_client.send_command(command)
1285 return_value = get_return_value(
-> 1286 answer, self.gateway_client, self.target_id, self.name)
1287
1288 for temp_arg in temp_args:
~/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
---> 71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
72 if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
73 raise ParseException(s.split(': ', 1)[1], stackTrace)
AnalysisException: "Temporary view 'people' already exists;"
In questi casi ci conviene usare il metodo .createOrReplaceTempView(name).
df.createOrReplaceTempView("People")
Comandi di selezione¶
I comandi SQL sono gli stessi sia per le tabelle che per le view, dato che quest’ultime non sono altro che tabelle virtuali.
Possiamo selezionare una colonna della view usando l’istruzione SELECT column FROM view.
df_sql = spark.sql("SELECT name FROM People")
type(df_sql)
pyspark.sql.dataframe.DataFrame
Il risultato della query sarà sempre un Dataframe, che quindi possiamo stampare usa
df_sql.show()
+--------+
| name|
+--------+
|Gianluca|
| Andrea|
| Marco|
|Annalisa|
| Monica|
+--------+
NOTA BENE SQL è case-insensitive, quindi non fa distinsione tra maiuscole e minuscole.
spark.sql("Select nAmE FRoM people").show()
+--------+
| nAmE|
+--------+
|Gianluca|
| Andrea|
| Marco|
|Annalisa|
| Monica|
+--------+
Possiamo utilizzare lo stesso comando anche per selezionare più colonne.
spark.sql("SELECT name, age FROM People").show()
+--------+---+
| name|age|
+--------+---+
|Gianluca| 23|
| Andrea| 37|
| Marco| 33|
|Annalisa| 38|
| Monica| 25|
+--------+---+
Per selezionarle tutte possiamo utilizzare il selettore *.
spark.sql("SELECT * FROM People").show()
+--------+------+---+------+------+
| name|gender|age|height|weight|
+--------+------+---+------+------+
|Gianluca| M| 23| 174| 70.5|
| Andrea| M| 37| 179| 68.0|
| Marco| M| 33| 172| 88.5|
|Annalisa| F| 38| 155| 50.2|
| Monica| F| 25| 165| 54.3|
+--------+------+---+------+------+
Possiamo effettuare una selezione basata su una condizione utilizzando il comando WHERE, ad esempio selezioniamo nome e altezza solo per gli uomini.
spark.sql("SELECT name, height FROM People WHERE gender='M'").show()
+--------+------+
| name|height|
+--------+------+
|Gianluca| 174|
| Andrea| 179|
| Marco| 172|
+--------+------+
Possiamo ordinare il risultato in base ai valori di una colonna utilizzando il comando ORDER BY, ad esempio ordiniamo le righe in base al peso.
spark.sql("SELECT * FROM People ORDER BY weight").show()
+--------+------+---+------+------+
| name|gender|age|height|weight|
+--------+------+---+------+------+
|Annalisa| F| 38| 155| 50.2|
| Monica| F| 25| 165| 54.3|
| Andrea| M| 37| 179| 68.0|
|Gianluca| M| 23| 174| 70.5|
| Marco| M| 33| 172| 88.5|
+--------+------+---+------+------+
di default l’ordinamento viene effettuato in maniera ascendente (cioè dal valore più piccolo a quello più grande), se vogliamo effettuare un’ordinamento discendente ci basta aggiungere DESC.
spark.sql("SELECT * FROM People ORDER BY weight DESC").show()
+--------+------+---+------+------+
| name|gender|age|height|weight|
+--------+------+---+------+------+
| Marco| M| 33| 172| 88.5|
|Gianluca| M| 23| 174| 70.5|
| Andrea| M| 37| 179| 68.0|
| Monica| F| 25| 165| 54.3|
|Annalisa| F| 38| 155| 50.2|
+--------+------+---+------+------+
Se vogliamo limitare il numero di righe restituite dalla nostra query possiamo utilizzare LIMIT, ad esempio basandoci sulla query eseguita appena sopra limitiamo il risultato alle sole prime 3 righe.
spark.sql("SELECT * FROM People ORDER BY weight DESC LIMIT 3").show()
+--------+------+---+------+------+
| name|gender|age|height|weight|
+--------+------+---+------+------+
| Marco| M| 33| 172| 88.5|
|Gianluca| M| 23| 174| 70.5|
| Andrea| M| 37| 179| 68.0|
+--------+------+---+------+------+
Per contare il numero di risultati possiamo utilizzare count, ad esempio contiamo il numero di persone di sesso maschile e più alte di 175 cm.
spark.sql("SELECT count(*) FROM People WHERE gender='M' and height>175").show()
+--------+
|count(1)|
+--------+
| 1|
+--------+
In questo caso il nome della colonna (count(1)) viene impostato automaticamente, se vogliamo modificarlo possiamo creare un alias con il comando AS, ad esempio modifichiamo in counter.
spark.sql("SELECT count(*) AS counter FROM People WHERE gender='M' and height>175").show()
+-------+
|counter|
+-------+
| 1|
+-------+
Per concludere vediamo una serie di funzioni statistiche, come la media (avg), usiamola per calolare il peso medio degli uomini.
spark.sql("SELECT avg(weight) AS avg_weight FROM People WHERE gender='M'").show()
+-----------------+
| avg_weight|
+-----------------+
|75.66666666666667|
+-----------------+
oppure peso massimo (max) e minimo (min), sempre per i soli uomini
spark.sql("SELECT max(weight) AS max_weight, min(weight) AS min_weight FROM People WHERE gender='M'").show()
+----------+----------+
|max_weight|min_weight|
+----------+----------+
| 88.5| 68.0|
+----------+----------+
Utilizzando il comando GROUP BY possiamo raggruppare le righe in base al valore di una colonna, ad esempio raggruppiamo in base al sesso e poi calcoliamo peso medio, massimo e minimo.
spark.sql("SELECT gender, avg(weight) AS avg_weight, max(weight) AS max_weight, min(weight) AS min_weight FROM People GROUP BY gender").show()
+------+-----------------+----------+----------+
|gender| avg_weight|max_weight|min_weight|
+------+-----------------+----------+----------+
| F| 52.25| 54.3| 50.2|
| M|75.66666666666667| 88.5| 68.0|
+------+-----------------+----------+----------+
Il peso medio per l’uomo ha troppe cifre dopo la virgola, arrotondiamole utilizzando la funzione round.
spark.sql("SELECT gender, round(avg(weight), 2) AS avg_weight, max(weight) AS max_weight, min(weight) AS min_weight FROM People GROUP BY gender").show()
+------+----------+----------+----------+
|gender|avg_weight|max_weight|min_weight|
+------+----------+----------+----------+
| F| 52.25| 54.3| 50.2|
| M| 75.67| 88.5| 68.0|
+------+----------+----------+----------+
Creare una tabella permanente¶
Come abbiamo già detto, la view viene salvata in RAM e al termine della sessione viene perduta, per creare una tabella permanente in memoria partendo da un Dataframe possiamo usare il metodo .saveAsTable(name)., per usare questo metodo dobbiamo importare la classe DataFrameWriter.
from pyspark.sql import DataFrameWriter
df.write.saveAsTable("People")
Adesso nella directory nella quale stiamo lavorando dovremmo trovarci una cartella spark-warehouse contentente un’altra cartella people
!ls spark-warehouse/
people
df.write.saveAsTable("People2")
!ls spark-warehouse/
people people2
e qui si trova il nostro database, se proviamo a creare nuovamente la tabella otterremo un’errore, che ci informa che questa già esiste.
df.write.saveAsTable("People")
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/content/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
/content/spark-2.4.5-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
Py4JJavaError: An error occurred while calling o95.saveAsTable.
: org.apache.spark.sql.AnalysisException: Table `People` already exists.;
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:424)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:409)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
AnalysisException Traceback (most recent call last)
<ipython-input-30-d381aadfbad4> in <module>()
----> 1 df.write.saveAsTable("People")
/content/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/readwriter.py in saveAsTable(self, name, format, mode, partitionBy, **options)
776 if format is not None:
777 self.format(format)
--> 778 self._jwrite.saveAsTable(name)
779
780 @since(1.4)
/content/spark-2.4.5-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/content/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: 'Table `People` already exists.;'
In questi casi dobbiamo utilizzare il metodo .mode(m), passando al suo interno:
‘append’: per eseguire un join con la tabella già esistente, se possibile.
‘overwrite’: per sovrascrivere la tabella già esistente.
df.write.mode('append').saveAsTable("People")
Le istruzioni di selezione per le tabelle sono esattamente le stesse di quelle che abbiamo visto su per le view.
spark.sql("SELECT * FROM People").show()
+--------+------+---+------+------+
| name|gender|age|height|weight|
+--------+------+---+------+------+
|Gianluca| M| 23| 174| 70.5|
| Andrea| M| 37| 179| 68.0|
| Marco| M| 33| 172| 88.5|
|Annalisa| F| 38| 155| 50.2|
| Monica| F| 25| 165| 54.3|
+--------+------+---+------+------+