Open In Colab

################ template to run PySpark on Colab #######################
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark1 = SparkSession.builder.appName('basic').getOrCreate()
#Test must give no error
import pyspark
################ end template PySpark on Colab ##########################

RDD: Resilient Distributed Dataset

Il Resilient Distributed Dataset (RDD) è l’astrazione principale di Spark, una collezione di elementi partizionati tra i nodi del cluster che possono essere operati in parallelo. In questo notebook vederemo le operazioni principali che possiamo eseguire su un RDD.

from pyspark import SparkConf, SparkContext

SparkContext, che indicherà a spark come accedere al cluster, l’oggetto ha bisogno di una configurazione, che possiamo creare con la classe SparkConf. All’interno della configurazione dovremo specificare almeno:

  • nome dell’applicazione: tramite il metodo setAppName(string)

  • indirizzo del cluster: tramite il metodo setMaster(string), nel caso in cui usiamo la nostra macchina locale, possiamo specificare ‘local’.

conf = SparkConf().setAppName("basic").setMaster("local")
#sc = SparkContext(conf=conf)  ## for jupyter and Databricks
sc = SparkContext.getOrCreate()   ## for Colab
data = [0,1,2,3,4,5,6,7,8,9]
dataDist = sc.parallelize(data)
type(dataDist)
pyspark.rdd.RDD

Possiamo raccogliere i dati distribuiti dal RDD in una lista utilizzando il metodo .collect().

dataList = dataDist.collect()
print(type(dataList))
print(dataList)
<class 'list'>
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Se invece volessimo ottenere soltanto n elementi, possiamo utilizzare il metodo .take(n), ad esempio selezioniamo soltato 3 elementi.

dataList = dataDist.take(3)
print(type(dataList))
print(dataList)
<class 'list'>
[0, 1, 2]

Per contare il numero di elementi di un RDD possiamo usare il metodo .count().

dataDist.count()
10

Per contare il numero di elementi unici possiamo usare il metodo .countByValue(), il risultato sarà un oggetto defaultdict che mappa ogni elemento del RDD al numero delle volte che questo elemento viene trovato all’interno del RDD.

dataDist.countByValue()
defaultdict(int, {0: 1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1})

Possiamo ottenere gli n valori maggiori all’interno del RDD usando il metodo top(n)

dataDist.top(5)
[9, 8, 7, 6, 5]

Altre operazioni RDD

Vediamo altre azioni che possiamo eseguire sugli RDD. Definiamo due nuovi RDD.

dist1 = sc.parallelize([1,2,3,4,5])
dist2 = sc.parallelize([5,6,7,8,9])

Union Ci permette di unire due RDD in un unico RDD.

dist3 = dist1.union(dist2)
dist3.collect()
[1, 2, 3, 4, 5, 5, 6, 7, 8, 9]

Intersection Ci permette di creare un nuovo RDD contenente solo gli elementi presenti in entrambi gli RDD.

dist3 = dist1.intersection(dist2)
dist3.collect()
[5]

Subtract Ci permette di creare un nuovo RDD con gli elementi del primo RDD non presenti anche nel secondo RDD.

dist3 = dist1.subtract(dist2)
dist3.collect()
[4, 1, 2, 3]

Cartesian Il risultato è un nuovo RDD composto da tutte le combinazioni di 2 coppie di elementi presi dai due RDD.

dist3 = dist1.cartesian(dist2)
dist3.collect()
[(1, 5),
 (1, 6),
 (2, 5),
 (2, 6),
 (1, 7),
 (1, 8),
 (2, 7),
 (2, 8),
 (1, 9),
 (2, 9),
 (3, 5),
 (3, 6),
 (4, 5),
 (4, 6),
 (5, 5),
 (5, 6),
 (3, 7),
 (3, 8),
 (4, 7),
 (4, 8),
 (3, 9),
 (4, 9),
 (5, 7),
 (5, 8),
 (5, 9)]

Map e Reduce

Le applicazioni principali del RDD, come per qualsiasi altro tipo di oggetto distribuito, sono Map e Reduce. Map ci permette di applicare un’operazione ad ogni elemento del RDD, passando al suo interno la funzione da applicare, facciamo un’esempio con una funzione che calcola il quadrato di ogni valore all’interno del RDD.

def compute_pow(d):
    return d*d

powDist = dataDist.map(compute_pow)
powDist.collect()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
powDist = dataDist.map(lambda d: d*d)
powDist.collect()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
s = ["Questo corso FAV è stupendo!","Li seguirò tutti!"]
sDist = sc.parallelize(s)

lensDist = sDist.map(lambda w: w.split())
lensDist.collect()
[['Questo', 'corso', 'FAV', 'è', 'stupendo!'], ['Li', 'seguirò', 'tutti!']]
sDist = sc.parallelize(s)

wordsDist = sDist.flatMap(lambda w: w.split())
wordsDist.collect()
['Questo', 'corso', 'FAV', 'è', 'stupendo!', 'Li', 'seguirò', 'tutti!']
def add(a,b):
    return a+b

dataSum = dataDist.reduce(add)
print(dataSum)
45
dataSum = dataDist.reduce(lambda a,b: a+b)
print(dataSum)
45
from operator import add

dataSum = dataDist.reduce(add)
print(dataSum)
45

Filter

Il metodo filter ci permette di filtrare gli elementi del RDD in base ad una funzione definita da noi, ad esempio creiamo un nuovo RDD con 10 parole e filtriamo quelle che hanno una lunghezza superiore a 15 caratteri.

words = ["Artificial Intelligence","Machine Learning", "Reinforcement Learning"
         "Deep Learning","Computer Vision", "Natural Language Processing",
        "Augmented Reality", "Blockchain", "Robotic", "Cyber Security"]

wordsDist = sc.parallelize(words)

filterDist = wordsDist.filter(lambda w: len(w)>16)
filterDist.collect()
['Artificial Intelligence',
 'Reinforcement LearningDeep Learning',
 'Natural Language Processing',
 'Augmented Reality']

Oppure filtriamo solo quelle che cominciamo per una vocale

filterDist = wordsDist.filter(lambda w: (w[0].lower() in "aeiou"))
filterDist.collect()
['Artificial Intelligence', 'Augmented Reality']

###Distinct

Il metodo .dinstrinct() ci permette di ridurre il contenuto del RDD ad elementi unici, rimuovendo eventuali doppi.

namesDist = sc.parallelize(["Andrea","Luca","Marco","Marco","Gabriele"])

uniqueDist = namesDist.distinct()
uniqueDist.collect()
['Andrea', 'Gabriele', 'Luca', 'Marco']

Sample

Il metodo .sample(withReplacement, fraction) ci permette di selezionare casualmente dal RDD degli elementi, questo metodo ha bisogno di due parametri:

withReplacement: va settato a True se un elemento può essere selezionato più di una volta, a False altrimenti.
fraction: probabilità che un elemento ha di essere selezionato, una probabilità di 0 ci ritornerà un rdd vuoto, una probabilità di 0.5 indica che ogni elemento ha il 50% di possibilità di essere selezionato, una probabilità di 1 ritornerà l'RDD originale.
wordsDist.sample(withReplacement=False, fraction=0.5).collect()
['Artificial Intelligence',
 'Machine Learning',
 'Computer Vision',
 'Natural Language Processing',
 'Augmented Reality',
 'Blockchain',
 'Robotic']

RDD chiave-valore

Creiamo un RDD di esempio, contenente degli acquisti effettuati all’interno di un app, ogni elemento sarà caratterizzato da una lista contenente:

Username: che fungerà da chiave.
Item: una nuova lista che contiene id dell'item acquistato ed il prezzo.

Quando utilizziamo una rappresentazione a lista, il primo elemento viene sempre interpretato da spark come la chiave ed il secondo come il valore.

purchases = [("guizard", ("pacchetto-crediti-1", "0.89 €")),
       ("bitleader", ("pacchetto-crediti-1", "0.89 €")),
       ("guizard",  ("ads-remover", "4.99 €")),
       ("guizard", ("pacchetto-crediti-3", "1.99 €")),
       ("bitleader", ("potenziamento-1", "1.49 €")),
       ("bitleader", ("potenziamento-2", "2.99 €")),
       ("lightlord", ("ads-remover", "4.99 €")),
       ("peanut", ("pacchett-crediti-1", "0.89 €")),
       ("lightlord", ("pacchetto-crediti-3", "4.99 €"))]

purchasesRDD = sc.parallelize(purchases)
purchasesRDD.collect()
[('guizard', ('pacchetto-crediti-1', '0.89 €')),
 ('bitleader', ('pacchetto-crediti-1', '0.89 €')),
 ('guizard', ('ads-remover', '4.99 €')),
 ('guizard', ('pacchetto-crediti-3', '1.99 €')),
 ('bitleader', ('potenziamento-1', '1.49 €')),
 ('bitleader', ('potenziamento-2', '2.99 €')),
 ('lightlord', ('ads-remover', '4.99 €')),
 ('peanut', ('pacchett-crediti-1', '0.89 €')),
 ('lightlord', ('pacchetto-crediti-3', '4.99 €'))]

Map e Reduce con chiave

Per eseguire una trasformazione al contenuto del RDD, ma non alle chiavi, possiamo utilizzare il metodo .mapValues(func). Ad esempio convertiamo in maiuscolo l’item id.

purchasesRDD = purchasesRDD.mapValues(lambda x: (x[0].upper(), x[1]))
purchasesRDD.collect()
[('guizard', ('PACCHETTO-CREDITI-1', '0.89 €')),
 ('bitleader', ('PACCHETTO-CREDITI-1', '0.89 €')),
 ('guizard', ('ADS-REMOVER', '4.99 €')),
 ('guizard', ('PACCHETTO-CREDITI-3', '1.99 €')),
 ('bitleader', ('POTENZIAMENTO-1', '1.49 €')),
 ('bitleader', ('POTENZIAMENTO-2', '2.99 €')),
 ('lightlord', ('ADS-REMOVER', '4.99 €')),
 ('peanut', ('PACCHETT-CREDITI-1', '0.89 €')),
 ('lightlord', ('PACCHETTO-CREDITI-3', '4.99 €'))]
purchasesRDD = purchasesRDD.mapValues(lambda x: (x[0], float(x[1].split(" €")[0])))
purchasesRDD.collect()
[('guizard', ('PACCHETTO-CREDITI-1', 0.89)),
 ('bitleader', ('PACCHETTO-CREDITI-1', 0.89)),
 ('guizard', ('ADS-REMOVER', 4.99)),
 ('guizard', ('PACCHETTO-CREDITI-3', 1.99)),
 ('bitleader', ('POTENZIAMENTO-1', 1.49)),
 ('bitleader', ('POTENZIAMENTO-2', 2.99)),
 ('lightlord', ('ADS-REMOVER', 4.99)),
 ('peanut', ('PACCHETT-CREDITI-1', 0.89)),
 ('lightlord', ('PACCHETTO-CREDITI-3', 4.99))]

Un’altra operazione comune quando si lavora con una struttura dati in formato chiave-valore è il voler raggruppare i dati in base alla chiave, con un RDD possiamo farlo usando il metodo .reduceByKey(func), usiamolo per sommare gli acquisti effettuati da ogni utente ed ottenere il valore totale. Per prima cosa creiamo una nuovo RDD che contiene soltanto il nome utente come chiave ed il costo dell’acquisto come valore.

totalByUserRDD = purchasesRDD.mapValues(lambda x: x[1])
totalByUserRDD.collect()
[('guizard', 0.89),
 ('bitleader', 0.89),
 ('guizard', 4.99),
 ('guizard', 1.99),
 ('bitleader', 1.49),
 ('bitleader', 2.99),
 ('lightlord', 4.99),
 ('peanut', 0.89),
 ('lightlord', 4.99)]

Poi usiamo il metodo reduceByKey per sommare i valori ed ottenere la spesa totale dell’utente all’interno dell’app.

totalByUserRDD = totalByUserRDD.reduceByKey(lambda x,y: x+y)
totalByUserRDD.collect()
[('peanut', 0.89), ('guizard', 7.87), ('bitleader', 5.37), ('lightlord', 9.98)]

E se volessimo sapere l’entrate totali delle app ? Dovremmo sommare la spesa di tutti gli utenti

total = totalByUserRDD.map(lambda x: x[1]).reduce(lambda x,y: x+y)
total
24.11

Ordinamento di un RDD

Il metodo .sortBy(func) ci permette di ordinare gli elementi di un RDD in base ad una delle sue proprietà o ad una funzione specificata da noi. Questo metodo non richiede di avere i dati in formato chiave valore, quindi è sempre utilizzabile. Utilizziamolo per ordinare l’RDD in base all’importo speso da ogni utente.

totalByUserSortedRDD = totalByUserRDD.sortBy(lambda x: x[1])
totalByUserSortedRDD.collect()
[('peanut', 0.89), ('bitleader', 5.37), ('guizard', 7.87), ('lightlord', 9.98)]

Come vedi la lista va dal minore (0.89) al maggiore (9.98), possiamo invertire l’ordine di ordinamento impostando il parametro ascending a False.

totalByUserSortedRDD = totalByUserRDD.sortBy(lambda x: x[1], ascending=False)
totalByUserSortedRDD.collect()
[('lightlord', 9.98), ('guizard', 7.87), ('bitleader', 5.37), ('peanut', 0.89)]