Open In Colab

################ template to run PySpark on Colab #######################
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark1 = SparkSession.builder.appName('basic').getOrCreate()
#Test must give no error
import pyspark
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("basic").setMaster("local")
#sc = SparkContext(conf=conf)  ## for jupyter and Databricks
sc = SparkContext.getOrCreate()   ## for Colab
from pyspark.sql.types import *

SQL e Dataframe

Lo Structured Query Language (SQL) è un linguaggio standardizzato per poter interrogare ed operare su Database relazionai. SQL Supporta le seguenti operazioni:

  • creare e modificare schemi di database

  • inserire, modificare e gestire dati memorizzati

  • interrogare i dati memorizzati

  • creare e gestire strumenti di controllo e accesso ai dati

E’ possibile utilizzare query SQL su di un Dataframe Spark

data = [("Gianluca", "M", 23, 174, 70.5),
        ("Andrea", "M", 37, 179, 68.),
        ("Marco", "M", 33, 172, 88.5),
        ("Annalisa", "F", 38, 155, 50.2),
        ("Monica", "F", 25, 165, 54.3)]

df = spark.createDataFrame(data, ["name", "gender", "age", "height","weight"])

Creare una View

Una view è una tabella virtuale, solitamente frutto di un’operazione di SELECT tra più tabelle. Non viene salvata su disco ma risiede in memoria e al termine della sessione viene cancellata. Possiamo creare una View partendo da un Dataframe usando il metodo .createTempView(name).

df.createTempView("people")

Se riproviamo a ricreare la stessa view con lo stesso metodo, otterremo un’eccezione che ci informa del fatto che tale view esiste già.

df.createTempView("people")
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
~/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

~/.local/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o40.createTempView.
: org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException: Temporary view 'people' already exists;
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTempView(SessionCatalog.scala:495)
	at org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:146)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3406)
	at org.apache.spark.sql.Dataset.createTempView(Dataset.scala:3082)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

AnalysisException                         Traceback (most recent call last)
<ipython-input-5-df07ec3d2e4b> in <module>
----> 1 df.createTempView("people")

~/spark/python/pyspark/sql/dataframe.py in createTempView(self, name)
    159 
    160         """
--> 161         self._jdf.createTempView(name)
    162 
    163     @since(2.0)

~/.local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1284         answer = self.gateway_client.send_command(command)
   1285         return_value = get_return_value(
-> 1286             answer, self.gateway_client, self.target_id, self.name)
   1287 
   1288         for temp_arg in temp_args:

~/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
---> 71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     72             if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
     73                 raise ParseException(s.split(': ', 1)[1], stackTrace)

AnalysisException: "Temporary view 'people' already exists;"

In questi casi ci conviene usare il metodo .createOrReplaceTempView(name).

df.createOrReplaceTempView("People")

Comandi di selezione

I comandi SQL sono gli stessi sia per le tabelle che per le view, dato che quest’ultime non sono altro che tabelle virtuali.

Possiamo selezionare una colonna della view usando l’istruzione SELECT column FROM view.

df_sql = spark.sql("SELECT name FROM People")
type(df_sql)
pyspark.sql.dataframe.DataFrame

Il risultato della query sarà sempre un Dataframe, che quindi possiamo stampare usa

df_sql.show()
+--------+
|    name|
+--------+
|Gianluca|
|  Andrea|
|   Marco|
|Annalisa|
|  Monica|
+--------+

NOTA BENE SQL è case-insensitive, quindi non fa distinsione tra maiuscole e minuscole.

spark.sql("Select nAmE FRoM people").show()
+--------+
|    nAmE|
+--------+
|Gianluca|
|  Andrea|
|   Marco|
|Annalisa|
|  Monica|
+--------+

Possiamo utilizzare lo stesso comando anche per selezionare più colonne.

spark.sql("SELECT name, age FROM People").show()
+--------+---+
|    name|age|
+--------+---+
|Gianluca| 23|
|  Andrea| 37|
|   Marco| 33|
|Annalisa| 38|
|  Monica| 25|
+--------+---+

Per selezionarle tutte possiamo utilizzare il selettore *.

spark.sql("SELECT * FROM People").show()
+--------+------+---+------+------+
|    name|gender|age|height|weight|
+--------+------+---+------+------+
|Gianluca|     M| 23|   174|  70.5|
|  Andrea|     M| 37|   179|  68.0|
|   Marco|     M| 33|   172|  88.5|
|Annalisa|     F| 38|   155|  50.2|
|  Monica|     F| 25|   165|  54.3|
+--------+------+---+------+------+

Possiamo effettuare una selezione basata su una condizione utilizzando il comando WHERE, ad esempio selezioniamo nome e altezza solo per gli uomini.

spark.sql("SELECT name, height FROM People WHERE gender='M'").show()
+--------+------+
|    name|height|
+--------+------+
|Gianluca|   174|
|  Andrea|   179|
|   Marco|   172|
+--------+------+

Possiamo ordinare il risultato in base ai valori di una colonna utilizzando il comando ORDER BY, ad esempio ordiniamo le righe in base al peso.

spark.sql("SELECT * FROM People ORDER BY weight").show()
+--------+------+---+------+------+
|    name|gender|age|height|weight|
+--------+------+---+------+------+
|Annalisa|     F| 38|   155|  50.2|
|  Monica|     F| 25|   165|  54.3|
|  Andrea|     M| 37|   179|  68.0|
|Gianluca|     M| 23|   174|  70.5|
|   Marco|     M| 33|   172|  88.5|
+--------+------+---+------+------+

di default l’ordinamento viene effettuato in maniera ascendente (cioè dal valore più piccolo a quello più grande), se vogliamo effettuare un’ordinamento discendente ci basta aggiungere DESC.

spark.sql("SELECT * FROM People ORDER BY weight DESC").show()
+--------+------+---+------+------+
|    name|gender|age|height|weight|
+--------+------+---+------+------+
|   Marco|     M| 33|   172|  88.5|
|Gianluca|     M| 23|   174|  70.5|
|  Andrea|     M| 37|   179|  68.0|
|  Monica|     F| 25|   165|  54.3|
|Annalisa|     F| 38|   155|  50.2|
+--------+------+---+------+------+

Se vogliamo limitare il numero di righe restituite dalla nostra query possiamo utilizzare LIMIT, ad esempio basandoci sulla query eseguita appena sopra limitiamo il risultato alle sole prime 3 righe.

spark.sql("SELECT * FROM People ORDER BY weight DESC LIMIT 3").show()
+--------+------+---+------+------+
|    name|gender|age|height|weight|
+--------+------+---+------+------+
|   Marco|     M| 33|   172|  88.5|
|Gianluca|     M| 23|   174|  70.5|
|  Andrea|     M| 37|   179|  68.0|
+--------+------+---+------+------+

Per contare il numero di risultati possiamo utilizzare count, ad esempio contiamo il numero di persone di sesso maschile e più alte di 175 cm.

spark.sql("SELECT count(*) FROM People WHERE gender='M' and height>175").show()
+--------+
|count(1)|
+--------+
|       1|
+--------+

In questo caso il nome della colonna (count(1)) viene impostato automaticamente, se vogliamo modificarlo possiamo creare un alias con il comando AS, ad esempio modifichiamo in counter.

spark.sql("SELECT count(*) AS counter FROM People WHERE gender='M' and height>175").show()
+-------+
|counter|
+-------+
|      1|
+-------+

Per concludere vediamo una serie di funzioni statistiche, come la media (avg), usiamola per calolare il peso medio degli uomini.

spark.sql("SELECT avg(weight) AS avg_weight FROM People WHERE gender='M'").show()
+-----------------+
|       avg_weight|
+-----------------+
|75.66666666666667|
+-----------------+

oppure peso massimo (max) e minimo (min), sempre per i soli uomini

spark.sql("SELECT max(weight) AS max_weight, min(weight) AS min_weight FROM People WHERE gender='M'").show()
+----------+----------+
|max_weight|min_weight|
+----------+----------+
|      88.5|      68.0|
+----------+----------+

Utilizzando il comando GROUP BY possiamo raggruppare le righe in base al valore di una colonna, ad esempio raggruppiamo in base al sesso e poi calcoliamo peso medio, massimo e minimo.

spark.sql("SELECT gender, avg(weight) AS avg_weight, max(weight) AS max_weight, min(weight) AS min_weight FROM People GROUP BY gender").show()
+------+-----------------+----------+----------+
|gender|       avg_weight|max_weight|min_weight|
+------+-----------------+----------+----------+
|     F|            52.25|      54.3|      50.2|
|     M|75.66666666666667|      88.5|      68.0|
+------+-----------------+----------+----------+

Il peso medio per l’uomo ha troppe cifre dopo la virgola, arrotondiamole utilizzando la funzione round.

spark.sql("SELECT gender, round(avg(weight), 2) AS avg_weight, max(weight) AS max_weight, min(weight) AS min_weight FROM People GROUP BY gender").show()
+------+----------+----------+----------+
|gender|avg_weight|max_weight|min_weight|
+------+----------+----------+----------+
|     F|     52.25|      54.3|      50.2|
|     M|     75.67|      88.5|      68.0|
+------+----------+----------+----------+

Creare una tabella permanente

Come abbiamo già detto, la view viene salvata in RAM e al termine della sessione viene perduta, per creare una tabella permanente in memoria partendo da un Dataframe possiamo usare il metodo .saveAsTable(name)., per usare questo metodo dobbiamo importare la classe DataFrameWriter.

from pyspark.sql import DataFrameWriter

df.write.saveAsTable("People")

Adesso nella directory nella quale stiamo lavorando dovremmo trovarci una cartella spark-warehouse contentente un’altra cartella people

!ls spark-warehouse/
people
df.write.saveAsTable("People2")
!ls spark-warehouse/
people	people2

e qui si trova il nostro database, se proviamo a creare nuovamente la tabella otterremo un’errore, che ci informa che questa già esiste.

df.write.saveAsTable("People")
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/content/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

/content/spark-2.4.5-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o95.saveAsTable.
: org.apache.spark.sql.AnalysisException: Table `People` already exists.;
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:424)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:409)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

AnalysisException                         Traceback (most recent call last)
<ipython-input-30-d381aadfbad4> in <module>()
----> 1 df.write.saveAsTable("People")

/content/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/readwriter.py in saveAsTable(self, name, format, mode, partitionBy, **options)
    776         if format is not None:
    777             self.format(format)
--> 778         self._jwrite.saveAsTable(name)
    779 
    780     @since(1.4)

/content/spark-2.4.5-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/content/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: 'Table `People` already exists.;'

In questi casi dobbiamo utilizzare il metodo .mode(m), passando al suo interno:

  • ‘append’: per eseguire un join con la tabella già esistente, se possibile.

  • ‘overwrite’: per sovrascrivere la tabella già esistente.

df.write.mode('append').saveAsTable("People")

Le istruzioni di selezione per le tabelle sono esattamente le stesse di quelle che abbiamo visto su per le view.

spark.sql("SELECT * FROM People").show()
+--------+------+---+------+------+
|    name|gender|age|height|weight|
+--------+------+---+------+------+
|Gianluca|     M| 23|   174|  70.5|
|  Andrea|     M| 37|   179|  68.0|
|   Marco|     M| 33|   172|  88.5|
|Annalisa|     F| 38|   155|  50.2|
|  Monica|     F| 25|   165|  54.3|
+--------+------+---+------+------+