Open In Colab

################ template to run PySpark on Colab #######################
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark1 = SparkSession.builder.appName('basic').getOrCreate()
#Test must give no error
import pyspark
################ end template PySpark on Colab ##########################

Amazon Ratings Books

!wget http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/ratings_Books.csv
--2020-06-18 08:10:16--  http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/ratings_Books.csv
Resolving snap.stanford.edu (snap.stanford.edu)... 171.64.75.80
Connecting to snap.stanford.edu (snap.stanford.edu)|171.64.75.80|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 916259348 (874M) [text/csv]
Saving to: ‘ratings_Books.csv’

ratings_Books.csv   100%[===================>] 873.81M  8.69MB/s    in 3m 7s   

2020-06-18 08:13:23 (4.68 MB/s) - ‘ratings_Books.csv’ saved [916259348/916259348]

from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("basic").setMaster("local")
#sc = SparkContext(conf=conf)  ## for jupyter and Databricks
sc = SparkContext.getOrCreate()   ## for Colab
reviewsRDD = sc.textFile("ratings_Books.csv")

Vediamo un po’ cosa contiene l’RDD. Avendo 22.5 milioni di elementi, utilizzare il metodo .collect() è sconveniente per ovvie ragioni, al suo posto possiamo usare il metodo .take(n)

reviewsRDD.take(5)
['AH2L9G3DQHHAJ,0000000116,4.0,1019865600',
 'A2IIIDRK3PRRZY,0000000116,1.0,1395619200',
 'A1TADCM7YWPQ8M,0000000868,4.0,1031702400',
 'AWGH7V0BDOJKB,0000013714,4.0,1383177600',
 'A3UTQPQPM4TQO0,0000013714,5.0,1374883200']

Ogni elemento di ogni riga corrisponde a (in ordine):

Id dell'utente che ha lasciato la valutazione.
Id del libro recensito.
Valutazione da 1.0 a 5.0.
Timestamp di quando è stata lasciata la recensione.

Contiamo il numero totale di valutazioni

reviewsRDD.count()
22507155

Contiamo le valutazioni per ogni libro

Per contare le valutazioni che ogni libro ha ricevuto creiamo un nuovo RDD contenente soltanto gli ID dei libri per ogni riga.

productsRDD = reviewsRDD.map(lambda x: x.split(",")[1])
productsRDD.take(5)
['0000000116', '0000000116', '0000000868', '0000013714', '0000013714']

Poi usiamo semplicemente il metodo .countByValue() Stampiamo il numero di valutazioni ricevute per i primi 10 libri.

productsCount = productsRDD.countByValue()

i = 0
print("ID LIBRO\tCONTEGGIO")
for product_id, count in productsCount.items():
    print("%s\t%s" % (product_id, count))
    if(i>=10):
        break
    i+=1
ID LIBRO	CONTEGGIO
0000000116	2
0000000868	1
0000013714	14
0000015393	1
0000029831	5
0000038504	2
0000041696	4
0000095699	1
0000174076	1
0000202010	1
0000230022	10

Troviamo i 10 libri più valutati

Per trovare i 10 libri più valutati potremmo semplicemente utilizzare il defaultdict ottenuto sopra, però voglio farti vedere un’altro modo per farlo ! Mappiamo ogni elemento ad una lista, contenente l’elemento stesso ed un valore 1

productsCount = productsRDD.map(lambda x: (x, 1))
productsCount.take(5)
[('0000000116', 1),
 ('0000000116', 1),
 ('0000000868', 1),
 ('0000013714', 1),
 ('0000013714', 1)]

Utilizziamo il metodo reduceByKey per sommare i valori degli elementi aventi la stessa chiave.

productsCount = productsCount.reduceByKey(lambda x, y: x+y)
productsCount.take(5)
[('0001006657', 2),
 ('0001922408', 2),
 ('0002000601', 6),
 ('0002006650', 2),
 ('0002007770', 6001)]

Riducendo l’RDD tramite una somma dei valori 1 che abbiamo aggiunto prima abbiamo ottenuto la somma totale delle valutazioni per ogni libro. Ora ci basta ordinarli in senso decrescente e tenere stampare i primi 10 risultati.

productsCountSorted = productsCount.sortBy(lambda x: x[1], ascending=False)
productsCountSorted.take(10)
[('0439023483', 21398),
 ('030758836X', 19867),
 ('0439023513', 14114),
 ('0385537859', 12973),
 ('0007444117', 12629),
 ('0375831002', 12571),
 ('038536315X', 12564),
 ('0345803485', 12290),
 ('0316055433', 11746),
 ('0849922070', 10424)]

Ecco qui i 10 libri più recensiti, qui possiamo vedere il primo:

https://www.amazon.com/dp/0439023483

Calcoliamo la valutazione media

Per calcolare la valutazione media creiamo un nuovo RDD contenete soltanto ID del libro e valutazione.

def parseProductRating(row):
    columns = row.split(",")
    product = columns[1]
    rating = float(columns[2])
    
    return (product, rating)

productsRDD = reviewsRDD.map(parseProductRating)
productsRDD.take(5)
[('0000000116', 4.0),
 ('0000000116', 1.0),
 ('0000000868', 4.0),
 ('0000013714', 4.0),
 ('0000013714', 5.0)]

Proviamo a somamre il totale delle valutazioni usando il metodo reduceByKey.

ratingSumRDD = productsRDD.reduceByKey(lambda x,y: x+y)
ratingSumRDD.take(5)
[('0001006657', 10.0),
 ('0001922408', 10.0),
 ('0002000601', 23.0),
 ('0002006650', 8.0),
 ('0002007770', 26398.0)]

Ora dovremmo dividere per il numero di valutazioni che ogni libro ha ricevuto, ma eseguendo la riduzione abbiamo perso questa informazione, quindi non è la cosa giusta da fare.

soluzione: mappiamo ogni elemento ad una lista, contenente l’elemento stesso ed un valore 1 che ci servirà come contatore, esattamente come fatto in precedenza, poi eseguiamo la riduzione per chiave sommando sia i contatori che le valutazione come fatto appena sopra

ratingSumRDD = productsRDD.mapValues(lambda x: (x,1)).reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
ratingSumRDD.take(5)
[('0001006657', (10.0, 2)),
 ('0001922408', (10.0, 2)),
 ('0002000601', (23.0, 6)),
 ('0002006650', (8.0, 2)),
 ('0002007770', (26398.0, 6001))]

Perfetto ! Ora abbiamo sia la somma che il conteggio, quindi possiamo eseguire un map di nuovo, dividendo il secondo per il primo.

ratingMeanRDD = ratingSumRDD.mapValues(lambda x: x[0]/x[1])
ratingMeanRDD.take(5)
[('0001006657', 5.0),
 ('0001922408', 5.0),
 ('0002000601', 3.8333333333333335),
 ('0002006650', 4.0),
 ('0002007770', 4.398933511081486)]

Troviamo i 10 libri con la valutazione più alta

Per trovare i libri con la valutazione più alta potremmo semplicemente ordinare l’RDD calcolato appena sopra, però otterremo dei risulati falsati, dato che libri che hanno ottenuto un’unica valutazione a 5 stelle saranno alle prime posizioni.

## consideriamo solo i libri che sono stati valutati almeno 100 volte
ratingMeanRDD = ratingSumRDD.mapValues(lambda x: (x[0]/x[1], x[1]))
ratingMeanRDD.take(5)
[('0001006657', (5.0, 2)),
 ('0001922408', (5.0, 2)),
 ('0002000601', (3.8333333333333335, 6)),
 ('0002006650', (4.0, 2)),
 ('0002007770', (4.398933511081486, 6001))]
ratingMeanRDD = ratingMeanRDD.filter(lambda x: x[1][1]>=100)
ratingMeanRDD.count()
29296

Ora ordiniamo quest’ultimo RDD in base alla valutazione media e stampiamo i primi 10 risultati.

ratingSortedRDD = ratingMeanRDD.sortBy(lambda x: x[1][0], ascending=False)
ratingSortedRDD.take(10)
[('0983408904', (5.0, 128)),
 ('0830766316', (5.0, 103)),
 ('0972394648', (4.992647058823529, 136)),
 ('1499390165', (4.991803278688525, 122)),
 ('0849381185', (4.990566037735849, 106)),
 ('0757317723', (4.9862068965517246, 145)),
 ('1939629071', (4.983193277310924, 119)),
 ('1499381921', (4.982857142857143, 350)),
 ('1616387165', (4.981308411214953, 107)),
 ('0814416993', (4.980769230769231, 104))]

Troviamo i 10 recensori più critici

Cerchiamo i 10 recensori più critici, cioè quelli che sono soliti lasciare le recensioni più basse, per farlo calcoliamo la valutazione media lasciata da ogni recensore e ordiniamo l’RDD così ottenuto in maniera ascendente

def parseReviewerRating(row):
    columns = row.split(",")
    reviewer = columns[0]
    rating = float(columns[2])
    
    return (reviewer, (rating, 1))

reviewerRDD = reviewsRDD.map(parseReviewerRating)
reviewerRDD.take(5)
[('AH2L9G3DQHHAJ', (4.0, 1)),
 ('A2IIIDRK3PRRZY', (1.0, 1)),
 ('A1TADCM7YWPQ8M', (4.0, 1)),
 ('AWGH7V0BDOJKB', (4.0, 1)),
 ('A3UTQPQPM4TQO0', (5.0, 1))]

E sommiamo tutte le valutazioni e il contatore.

reviewerRDD = reviewerRDD.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
reviewerRDD.take(5)
[('A2742OG8PK8KU6', (10.0, 2)),
 ('A2GKR2Q7MD8DG4', (12.0, 3)),
 ('A1MC4E00RO5E9T', (17.0, 4)),
 ('A3IKTM9D8RVWKU', (5.0, 1)),
 ('A3UZSIDE90JWW1', (5.0, 1))]
## consideriamo solo reviewer che hanno lasciato almeno 100 feedback
reviewerRDD = reviewerRDD.filter(lambda x: x[1][1]>100)
reviewerRDD.count()
11244

Ne abbiamo oltre 11mila, vediamo tra questi chi sono i più cattivi, calcoliamo la loro valutazione media.

criticalReviewerRDD = reviewerRDD.mapValues(lambda x: x[0]/x[1])
criticalReviewerRDD.take(5)
[('A8IPQ1Q1O7YX5', 4.227048371174728),
 ('A2PN65B6BSTIYZ', 3.953271028037383),
 ('AX724J32HPG1J', 4.184738955823293),
 ('AFFGYGNO989PD', 4.2785714285714285),
 ('A1WCJEZS66D224', 3.5789473684210527)]
criticalReviewerSortedRDD = criticalReviewerRDD.sortBy(lambda x: x[1])
criticalReviewerSortedRDD.take(10)
[('AH62BQTCMR3BR', 1.0534188034188035),
 ('A186OSXC7LHJDB', 1.2014925373134329),
 ('A2HESNQJZ9OB7H', 1.2543859649122806),
 ('A36IQRD3B5MK8G', 1.505050505050505),
 ('A3JF63XRSLLH0P', 1.5648148148148149),
 ('A344N0X5LIV43M', 1.646551724137931),
 ('A1SS16UHYW77D4', 1.855421686746988),
 ('A19UFCMSFGOZ2K', 2.076923076923077),
 ('A1NJHOGKZZRAX8', 2.1588785046728973),
 ('A1ZY08GYVIKZFM', 2.2446043165467624)]

L’utente AH62BQTCMR3BR ha il ph molto basso, è un pò acidello!