Pyspark¶
################ template to run PySpark on Colab #######################
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark1 = SparkSession.builder.appName('basic').getOrCreate()
#Test must give no error
import pyspark
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("basic").setMaster("local")
#sc = SparkContext(conf=conf) ## for jupyter and Databricks
sc = SparkContext.getOrCreate() ## for Colab
from pyspark.sql.types import *
!wget https://frenzy86.s3.eu-west-2.amazonaws.com/fav/tecno/BostonHousing.csv
--2020-06-18 10:00:25-- https://frenzy86.s3.eu-west-2.amazonaws.com/fav/tecno/BostonHousing.csv
Resolving frenzy86.s3.eu-west-2.amazonaws.com (frenzy86.s3.eu-west-2.amazonaws.com)... 52.95.149.38
Connecting to frenzy86.s3.eu-west-2.amazonaws.com (frenzy86.s3.eu-west-2.amazonaws.com)|52.95.149.38|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 35735 (35K) [application/vnd.ms-excel]
Saving to: ‘BostonHousing.csv.1’
BostonHousing.csv.1 100%[===================>] 34.90K --.-KB/s in 0.1s
2020-06-18 10:00:26 (273 KB/s) - ‘BostonHousing.csv.1’ saved [35735/35735]
housing_df = spark.read.csv("BostonHousing.csv", inferSchema=True, header=True)
housing_df.show(5)
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
| crim| zn|indus|chas| nox| rm| age| dis|rad|tax|ptratio| b|lstat|medv|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|0.00632|18.0| 2.31| 0|0.538|6.575|65.2| 4.09| 1|296| 15.3| 396.9| 4.98|24.0|
|0.02731| 0.0| 7.07| 0|0.469|6.421|78.9|4.9671| 2|242| 17.8| 396.9| 9.14|21.6|
|0.02729| 0.0| 7.07| 0|0.469|7.185|61.1|4.9671| 2|242| 17.8|392.83| 4.03|34.7|
|0.03237| 0.0| 2.18| 0|0.458|6.998|45.8|6.0622| 3|222| 18.7|394.63| 2.94|33.4|
|0.06905| 0.0| 2.18| 0|0.458|7.147|54.2|6.0622| 3|222| 18.7| 396.9| 5.33|36.2|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
only showing top 5 rows
Boston Dataset¶
CRIM Tasso di criminalità per capita
ZN Percentuale di terreni residenziali suddivisi in zone per lotti superiori a 25.000 sq.ft.
INDUS Percentuale di ettari di attività non al dettaglio per città.
CHAS Variabile dummy che indica la prossimità al fiume Charles.
NOX Concentrazione di ossido d’azoto (parti per 10 milioni).
RM Numero medio di stanze per abitazione
AGE Percentuale di abitazione occupate costruite dopo il 1940
DIS Media pesata delle distanze da 5 centri lavorativi di Boston.
RAD Indice di accessibilità ad autostrade
TAX Aliquota dell’imposta sulla proprietà a valore pieno in 10.000 USD.
PRATIO Rapporto studente-insegnante per città.
BLACK 1000(Bk - 0.63)^2 dove Bk è la percentuale di abitanti di colore per città
LSTAT Percentuale della popolazione povera
MEDV Mediana del valore di abitazioni occupate in 1.000 USD.
##Preprocessing dei dati Creiamo una lista con i nomi delle colonne che saranno le features del nostro modello, cioè tutte le colonne meno l’ID e il target (MEDV).
features_cols = housing_df.columns[1:-1]
La classe MLlib richiede che le features si trovino tutte all’interno di un unico vettore su di una colonna, possiamo creare questa rappresentazione utilizzando la classe VectorAssemlber di MLlib.
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=features_cols, outputCol="features")
data_df = assembler.transform(housing_df)
data_df.show(5)
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+
| crim| zn|indus|chas| nox| rm| age| dis|rad|tax|ptratio| b|lstat|medv| features|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+
|0.00632|18.0| 2.31| 0|0.538|6.575|65.2| 4.09| 1|296| 15.3| 396.9| 4.98|24.0|[18.0,2.31,0.0,0....|
|0.02731| 0.0| 7.07| 0|0.469|6.421|78.9|4.9671| 2|242| 17.8| 396.9| 9.14|21.6|[0.0,7.07,0.0,0.4...|
|0.02729| 0.0| 7.07| 0|0.469|7.185|61.1|4.9671| 2|242| 17.8|392.83| 4.03|34.7|[0.0,7.07,0.0,0.4...|
|0.03237| 0.0| 2.18| 0|0.458|6.998|45.8|6.0622| 3|222| 18.7|394.63| 2.94|33.4|[0.0,2.18,0.0,0.4...|
|0.06905| 0.0| 2.18| 0|0.458|7.147|54.2|6.0622| 3|222| 18.7| 396.9| 5.33|36.2|[0.0,2.18,0.0,0.4...|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+
only showing top 5 rows
E’ buona norma portare le features in un range di valori comuni, questo processo può velocizzare anche di molto la fase di addestramento. Facciamolo utilizzando la normalizzazione che si esegue sottraendo il valore minimo e poi dividendo per la differenza tra valore massimo e valore minimo. Possiamo eseguire la normalizzazione con MLlib usando la classe MinMaxScaler.
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(data_df)
data_df = scaler_model.transform(data_df)
data_df.show(5)
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+--------------------+
| crim| zn|indus|chas| nox| rm| age| dis|rad|tax|ptratio| b|lstat|medv| features| scaled_features|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+--------------------+
|0.00632|18.0| 2.31| 0|0.538|6.575|65.2| 4.09| 1|296| 15.3| 396.9| 4.98|24.0|[18.0,2.31,0.0,0....|[0.18,0.067815249...|
|0.02731| 0.0| 7.07| 0|0.469|6.421|78.9|4.9671| 2|242| 17.8| 396.9| 9.14|21.6|[0.0,7.07,0.0,0.4...|[0.0,0.2423020527...|
|0.02729| 0.0| 7.07| 0|0.469|7.185|61.1|4.9671| 2|242| 17.8|392.83| 4.03|34.7|[0.0,7.07,0.0,0.4...|[0.0,0.2423020527...|
|0.03237| 0.0| 2.18| 0|0.458|6.998|45.8|6.0622| 3|222| 18.7|394.63| 2.94|33.4|[0.0,2.18,0.0,0.4...|[0.0,0.0630498533...|
|0.06905| 0.0| 2.18| 0|0.458|7.147|54.2|6.0622| 3|222| 18.7| 396.9| 5.33|36.2|[0.0,2.18,0.0,0.4...|[0.0,0.0630498533...|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+--------------------+
only showing top 5 rows
Prossimo passo, dividere il DataFrame con le features preprocessate in due DataFrame, uno per l’addestramento e uno per il testing del modello, possiamo farlo utilizzando il metodo randomSplit all’interno della quale dobbiamo passare una lista con la percentuale di osservazioni da assegnare ad ognuno dei DataFrame.
Nel nostro caso assegnamo il 70% degli esempi al set di addestramento e il 30% al set di test.
train_df, test_df = data_df.randomSplit([0.7, 0.3])
print("%d esempi nel train set" % train_df.count())
print("%d esempi nel test set" % test_df.count())
384 esempi nel train set
122 esempi nel test set
Ottimo ! Possiamo creare il modello di Regressione Lineare, usiamo la classe *LinearRegression, all’interno del costruttore dovremo passare due parametri:
featuresCol: il nome della colonna con le features
labelCol: il nome della colonna con il target
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="scaled_features", labelCol="medv")
Avviamo l’addestramento con il metodo fit, passando al suo interno il set di addetramento
model = lr.fit(train_df)
Abbiamo creato il nostro modello ! Ora verifichiamone la qualità testandolo su dati che non ha visto durante l’addestramento, possiamo farlo usando il test set e il metodo evalualte.
evaluation = model.evaluate(test_df)
Il metodo evaluate calcolerà diverse metriche che ci possono aiutare a comprendere la qualità del modello, vediamone alcune.
MAE - Mean Absolute Error (Errore medio assoluto)¶
L’errore medio assoluto consiste nella media della somma del valore assoluto degli errori.
evaluation.meanAbsoluteError
3.674155517764444
MSE - Mean Squared Error (Errore quadratico assoluto)¶
L’errore quadratico medio consiste nella media della somma degli errori al quadrato.
evaluation.meanSquaredError
28.045312901767556
RMSE - Root Mean Squared Error (Radice dell’errore quadratico medio)¶
Il RMSE è la radice dell’errore quadratico medio, questa metrica indica mediamente di quanto il nostro modello si è sbagliato.
evaluation.rootMeanSquaredError
5.295782558014213
R2 - Coefficient of determination (Coefficiente di Determinazione)¶
In pratica R2 (pronuciato R Squared) è una versione standardizzata del MSE che torna un punteggio compreso tra 0 e 1 per il train set, mentre per il test set può assumere anche valori negativi. Essendo una funzione ma di scoring, un suo valore maggiore indica una qualità migliore del modello, il suo valore può essere così interpretato:
R2_score < 0.3 il modello è inutile.
0.3 < R2_score < 0.5 il modello è scarso.
0.5 < R2_score < 0.7 il modello è discreto.
0.7 < R2_score < 0.9 il modello è buono.
0.9 < R2_score < 1 il modello è ottimo.
R2_score = 1 molto probabilmente c’è un errore nel modello.
dove RSS è la somma dei quadrati residui: \($ RSS = \sum_{i=1}^{N}(Y_i-\hat{Y}_i)^2 $\)
ed SST è la somma dei quadrati totali: \($ SST = \sum_{i=1}^{N}(Y_i-\bar{Y})^2 $\)
evaluation.r2
0.7028886094316555
Testiamo il Modello con Dataset Reale¶
!wget https://frenzy86.s3.eu-west-2.amazonaws.com/fav/tecno/houses.csv
--2020-06-18 10:00:46-- https://frenzy86.s3.eu-west-2.amazonaws.com/fav/tecno/houses.csv
Resolving frenzy86.s3.eu-west-2.amazonaws.com (frenzy86.s3.eu-west-2.amazonaws.com)... 52.95.148.2
Connecting to frenzy86.s3.eu-west-2.amazonaws.com (frenzy86.s3.eu-west-2.amazonaws.com)|52.95.148.2|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 700 [application/vnd.ms-excel]
Saving to: ‘houses.csv.3’
houses.csv.3 0%[ ] 0 --.-KB/s
houses.csv.3 100%[===================>] 700 --.-KB/s in 0s
2020-06-18 10:00:47 (11.3 MB/s) - ‘houses.csv.3’ saved [700/700]
houses_df = spark.read.csv("houses.csv", inferSchema=True, header=True)
houses_df.show(10)
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+
| crim| zn|indus|chas| nox| rm| age| dis|rad|tax|ptratio| black|lstat|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+
|0.05789|12.5| 6.07| 0|0.409|5.878|21.4| 6.498| 4|345| 18.9|396.21| 8.1|
|0.13554|12.5| 6.07| 0|0.409|5.594|36.8| 6.498| 4|345| 18.9| 396.9|13.09|
|0.08826| 0.0|10.81| 0|0.413|6.417| 6.6|5.2873| 4|305| 19.2|383.73| 6.72|
|0.09164| 0.0|10.81| 0|0.413|6.065| 7.8|5.2873| 4|305| 19.2|390.91| 5.52|
|0.19539| 0.0|10.81| 0|0.413|6.245| 6.2|5.2873| 4|305| 19.2|377.17| 7.54|
|0.07896| 0.0|12.83| 0|0.437|6.273| 6.0|4.2515| 5|398| 18.7|394.92| 6.78|
|0.09512| 0.0|12.83| 0|0.437|6.286|45.0|4.5026| 5|398| 18.7|383.23| 8.94|
|0.10153| 0.0|12.83| 0|0.437|6.279|74.5|4.0522| 5|398| 18.7|373.66|11.97|
|0.08707| 0.0|12.83| 0|0.437| 6.14|45.8|4.0905| 5|398| 18.7|386.96|10.27|
|0.04741| 0.0|11.93| 0|0.573| 6.03|80.8| 2.505| 1|273| 21.0| 396.9| 7.88|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+
Creiamo la colonna con le features.
features_cols = housing_df.columns[1:-1]
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=features_cols, outputCol="features")
input_df = assembler.transform(housing_df)
input_df.show(5)
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+
| crim| zn|indus|chas| nox| rm| age| dis|rad|tax|ptratio| b|lstat|medv| features|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+
|0.00632|18.0| 2.31| 0|0.538|6.575|65.2| 4.09| 1|296| 15.3| 396.9| 4.98|24.0|[18.0,2.31,0.0,0....|
|0.02731| 0.0| 7.07| 0|0.469|6.421|78.9|4.9671| 2|242| 17.8| 396.9| 9.14|21.6|[0.0,7.07,0.0,0.4...|
|0.02729| 0.0| 7.07| 0|0.469|7.185|61.1|4.9671| 2|242| 17.8|392.83| 4.03|34.7|[0.0,7.07,0.0,0.4...|
|0.03237| 0.0| 2.18| 0|0.458|6.998|45.8|6.0622| 3|222| 18.7|394.63| 2.94|33.4|[0.0,2.18,0.0,0.4...|
|0.06905| 0.0| 2.18| 0|0.458|7.147|54.2|6.0622| 3|222| 18.7| 396.9| 5.33|36.2|[0.0,2.18,0.0,0.4...|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+
only showing top 5 rows
Applichiamo la normalizzazione, assicurandoci di applicare la stessa trasformazione che abbiamo applicato agli esempi di addestramento. In che modo ? Utilizzando solamente il meotodo transform dello stesso oggetto sulla quale abbiamo già eseguito fit sui dati di addestramento.
input_df = scaler_model.transform(input_df)
input_df.show(5)
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+--------------------+
| crim| zn|indus|chas| nox| rm| age| dis|rad|tax|ptratio| b|lstat|medv| features| scaled_features|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+--------------------+
|0.00632|18.0| 2.31| 0|0.538|6.575|65.2| 4.09| 1|296| 15.3| 396.9| 4.98|24.0|[18.0,2.31,0.0,0....|[0.18,0.067815249...|
|0.02731| 0.0| 7.07| 0|0.469|6.421|78.9|4.9671| 2|242| 17.8| 396.9| 9.14|21.6|[0.0,7.07,0.0,0.4...|[0.0,0.2423020527...|
|0.02729| 0.0| 7.07| 0|0.469|7.185|61.1|4.9671| 2|242| 17.8|392.83| 4.03|34.7|[0.0,7.07,0.0,0.4...|[0.0,0.2423020527...|
|0.03237| 0.0| 2.18| 0|0.458|6.998|45.8|6.0622| 3|222| 18.7|394.63| 2.94|33.4|[0.0,2.18,0.0,0.4...|[0.0,0.0630498533...|
|0.06905| 0.0| 2.18| 0|0.458|7.147|54.2|6.0622| 3|222| 18.7| 396.9| 5.33|36.2|[0.0,2.18,0.0,0.4...|[0.0,0.0630498533...|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+--------------------+
only showing top 5 rows
Adesso utilizziamo il meotod predict del modello per ottenere la sua predizione, che verrà inserita all’interno di una colonna ‘prediction’.
pred_df = model.transform(input_df)
pred_df.show(10)
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+--------------------+--------------------+------------------+
| crim| zn|indus|chas| nox| rm| age| dis|rad|tax|ptratio| b|lstat|medv| features| scaled_features| prediction|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+--------------------+--------------------+------------------+
|0.00632|18.0| 2.31| 0|0.538|6.575| 65.2| 4.09| 1|296| 15.3| 396.9| 4.98|24.0|[18.0,2.31,0.0,0....|[0.18,0.067815249...|30.391022737957485|
|0.02731| 0.0| 7.07| 0|0.469|6.421| 78.9|4.9671| 2|242| 17.8| 396.9| 9.14|21.6|[0.0,7.07,0.0,0.4...|[0.0,0.2423020527...|25.123059832928902|
|0.02729| 0.0| 7.07| 0|0.469|7.185| 61.1|4.9671| 2|242| 17.8|392.83| 4.03|34.7|[0.0,7.07,0.0,0.4...|[0.0,0.2423020527...|31.072444051044197|
|0.03237| 0.0| 2.18| 0|0.458|6.998| 45.8|6.0622| 3|222| 18.7|394.63| 2.94|33.4|[0.0,2.18,0.0,0.4...|[0.0,0.0630498533...| 29.22011369728967|
|0.06905| 0.0| 2.18| 0|0.458|7.147| 54.2|6.0622| 3|222| 18.7| 396.9| 5.33|36.2|[0.0,2.18,0.0,0.4...|[0.0,0.0630498533...|28.387542014865595|
|0.02985| 0.0| 2.18| 0|0.458| 6.43| 58.7|6.0622| 3|222| 18.7|394.12| 5.21|28.7|[0.0,2.18,0.0,0.4...|[0.0,0.0630498533...|25.661083048377893|
|0.08829|12.5| 7.87| 0|0.524|6.012| 66.6|5.5605| 5|311| 15.2| 395.6|12.43|22.9|[12.5,7.87,0.0,0....|[0.125,0.27162756...|22.785168258862452|
|0.14455|12.5| 7.87| 0|0.524|6.172| 96.1|5.9505| 5|311| 15.2| 396.9|19.15|27.1|[12.5,7.87,0.0,0....|[0.125,0.27162756...| 18.88146478111009|
|0.21124|12.5| 7.87| 0|0.524|5.631|100.0|6.0821| 5|311| 15.2|386.63|29.93|16.5|[12.5,7.87,0.0,0....|[0.125,0.27162756...|10.166879188464911|
|0.17004|12.5| 7.87| 0|0.524|6.004| 85.9|6.5921| 5|311| 15.2|386.71| 17.1|18.9|[12.5,7.87,0.0,0....|[0.125,0.27162756...| 18.54179621600624|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+--------------------+--------------------+------------------+
only showing top 10 rows
Ora rimuoviamo le colonne col le features, il prezzo è rappresentato in $10.000, quindi moltiplichiamo per questa cifra per ottenere il prezzo reale e rinominiamo la colonna ‘prediction’ in ‘estimanted_price’.
from pyspark.sql.functions import round
pred_df = pred_df.drop("features") \
.drop("scaled_features") \
.withColumn("estimated_price", round(pred_df["prediction"]*10000, 2)) \
.drop("prediction")
pred_df.show(10)
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+---------------+
| crim| zn|indus|chas| nox| rm| age| dis|rad|tax|ptratio| b|lstat|medv|estimated_price|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+---------------+
|0.00632|18.0| 2.31| 0|0.538|6.575| 65.2| 4.09| 1|296| 15.3| 396.9| 4.98|24.0| 303910.23|
|0.02731| 0.0| 7.07| 0|0.469|6.421| 78.9|4.9671| 2|242| 17.8| 396.9| 9.14|21.6| 251230.6|
|0.02729| 0.0| 7.07| 0|0.469|7.185| 61.1|4.9671| 2|242| 17.8|392.83| 4.03|34.7| 310724.44|
|0.03237| 0.0| 2.18| 0|0.458|6.998| 45.8|6.0622| 3|222| 18.7|394.63| 2.94|33.4| 292201.14|
|0.06905| 0.0| 2.18| 0|0.458|7.147| 54.2|6.0622| 3|222| 18.7| 396.9| 5.33|36.2| 283875.42|
|0.02985| 0.0| 2.18| 0|0.458| 6.43| 58.7|6.0622| 3|222| 18.7|394.12| 5.21|28.7| 256610.83|
|0.08829|12.5| 7.87| 0|0.524|6.012| 66.6|5.5605| 5|311| 15.2| 395.6|12.43|22.9| 227851.68|
|0.14455|12.5| 7.87| 0|0.524|6.172| 96.1|5.9505| 5|311| 15.2| 396.9|19.15|27.1| 188814.65|
|0.21124|12.5| 7.87| 0|0.524|5.631|100.0|6.0821| 5|311| 15.2|386.63|29.93|16.5| 101668.79|
|0.17004|12.5| 7.87| 0|0.524|6.004| 85.9|6.5921| 5|311| 15.2|386.71| 17.1|18.9| 185417.96|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+---------------+
only showing top 10 rows