Open In Colab

################ template to run PySpark on Colab #######################
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark1 = SparkSession.builder.appName('basic').getOrCreate()
#Test must give no error
import pyspark
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("basic").setMaster("local")
#sc = SparkContext(conf=conf)  ## for jupyter and Databricks
sc = SparkContext.getOrCreate()   ## for Colab
from pyspark.sql.types import *

Analisi recensioni film

In questo notebook utilizzeremo Spark con il modulo SparkSQL ed un Dataframe per analizzare oltre 28 milioni di recensioni di film. Nello specifico le domande alla quale cerchermo di dare una risposta sono le seguenti:

!wget http://files.grouplens.org/datasets/movielens/ml-latest.zip
--2020-06-18 09:04:19--  http://files.grouplens.org/datasets/movielens/ml-latest.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 277113433 (264M) [application/zip]
Saving to: ‘ml-latest.zip’

ml-latest.zip       100%[===================>] 264.28M  54.8MB/s    in 5.1s    

2020-06-18 09:04:24 (51.6 MB/s) - ‘ml-latest.zip’ saved [277113433/277113433]

!unzip ml-latest.zip
Archive:  ml-latest.zip
   creating: ml-latest/
  inflating: ml-latest/links.csv     
  inflating: ml-latest/tags.csv      
  inflating: ml-latest/genome-tags.csv  
  inflating: ml-latest/ratings.csv   
  inflating: ml-latest/README.txt    
  inflating: ml-latest/genome-scores.csv  
  inflating: ml-latest/movies.csv    
!ls ml-latest
genome-scores.csv  links.csv   ratings.csv  tags.csv
genome-tags.csv    movies.csv  README.txt

I file che ci interessano sono:

  • ratings.csv: che contiene, per ogni riga, id dell’utente, id del film, valutazione da 1.0 a 5.0 e timestamp.

  • movies.csv: che contiene nome e genere dei film associati agli id.

Importiamo il dataset in un Dataframe

Per caricare un csv all’interno di un Dataframe possiamo utilizzare il metodo .load(filepath, type), i principali formati supportati sono csv, json e orc.

df = spark.read.load("ml-latest/ratings.csv", format="csv")
df.show(5)
+------+-------+------+----------+
|   _c0|    _c1|   _c2|       _c3|
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
+------+-------+------+----------+
only showing top 5 rows

df = spark.read.csv("ml-latest/ratings.csv")
df.show(5)
+------+-------+------+----------+
|   _c0|    _c1|   _c2|       _c3|
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
+------+-------+------+----------+
only showing top 5 rows

df.printSchema()
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)

I nomi delle colonne, presenti alla prima riga del file, non sono stati riconosciuti, inoltre anche il tipo delle colonne è totalmente sbagliato, per risolvere questi due problemi ci basta utilizzare due parametri:

  • header: se impostato a True indica al metodo che la prima riga del file contiene i nomi delle colonne.

  • inferSchema: impostandolo a True il tipo delle colonne verrà rilevato automaticamente.

df = spark.read.csv("ml-latest/ratings.csv", header=True, inferSchema=True)
df.head(5)
[Row(userId=1, movieId=307, rating=3.5, timestamp=1256677221),
 Row(userId=1, movieId=481, rating=3.5, timestamp=1256677456),
 Row(userId=1, movieId=1091, rating=1.5, timestamp=1256677471),
 Row(userId=1, movieId=1257, rating=4.5, timestamp=1256677460),
 Row(userId=1, movieId=1449, rating=4.5, timestamp=1256677264)]

Correggiamo lo schema

Piuttosto che utilizzare numeri per gli id, usiamo delle stringhe, quindi definiamo uno schema manualmente.

data_schema = [StructField('userID', StringType(), True),
                StructField('movieID', StringType(), True),
                StructField('rating', FloatType(), True),
                StructField('timestamp', IntegerType(), True)]
            
schema = StructType(fields=data_schema)

df = spark.read.schema(schema).option("header","true").option("inferSchema","false").csv("ml-latest/ratings.csv")
df.show(5)
+------+-------+------+----------+
|userID|movieID|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows

Il timestamp è in formato UNIX, convertiamo in una data utilizzando le funzioni from_unix_time(unix_time) e to_date(time) di spark.

from pyspark.sql.functions import  from_unixtime , to_date

df.withColumn('timestamp', to_date(from_unixtime(df["timestamp"]))).show()
+------+-------+------+----------+
|userID|movieID|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|2009-10-27|
|     1|    481|   3.5|2009-10-27|
|     1|   1091|   1.5|2009-10-27|
|     1|   1257|   4.5|2009-10-27|
|     1|   1449|   4.5|2009-10-27|
|     1|   1590|   2.5|2009-10-27|
|     1|   1591|   1.5|2009-10-27|
|     1|   2134|   4.5|2009-10-27|
|     1|   2478|   4.0|2009-10-27|
|     1|   2840|   3.0|2009-10-27|
|     1|   2986|   2.5|2009-10-27|
|     1|   3020|   4.0|2009-10-27|
|     1|   3424|   4.5|2009-10-27|
|     1|   3698|   3.5|2009-10-27|
|     1|   3826|   2.0|2009-10-27|
|     1|   3893|   3.5|2009-10-27|
|     2|    170|   3.5|2007-10-20|
|     2|    849|   3.5|2007-10-20|
|     2|   1186|   3.5|2007-10-20|
|     2|   1235|   3.0|2007-10-20|
+------+-------+------+----------+
only showing top 20 rows

from pyspark.sql.functions import to_utc_timestamp

df = df.withColumn('timestamp', to_utc_timestamp(from_unixtime(df["timestamp"]), "yyyy-MM-dd hh:mm:ss"))
df.show(5)
+------+-------+------+-------------------+
|userID|movieID|rating|          timestamp|
+------+-------+------+-------------------+
|     1|    307|   3.5|2009-10-27 21:00:21|
|     1|    481|   3.5|2009-10-27 21:04:16|
|     1|   1091|   1.5|2009-10-27 21:04:31|
|     1|   1257|   4.5|2009-10-27 21:04:20|
|     1|   1449|   4.5|2009-10-27 21:01:04|
+------+-------+------+-------------------+
only showing top 5 rows

Ora abbiamo sia data che ora, diamo uno sguardo allo schema.

df.printSchema()
root
 |-- userID: string (nullable = true)
 |-- movieID: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: timestamp (nullable = true)

Contiamo le recensioni nel dataset

Per contare il numero totale di recenioni possiamo semplicemente utilizzare il metodo .count().

total_reviews = df.count()
print(total_reviews)
27753444

Contare il numero medio di recensioni per utente

Quante recensioni ha scritto in media un’utente ? Per saperlo dobbiamo innanziatutto conoscere il numero di recensori unici all’interno del dataset, possiamo farlo utilizzando la funzione countDisctinct(col)

from pyspark.sql.functions import col, countDistinct

total_unique_reviewers = df.agg(countDistinct("userID").alias("count_reviewers"))
total_unique_reviewers.show()
+---------------+
|count_reviewers|
+---------------+
|         283228|
+---------------+

total_unique_reviewers = total_unique_reviewers.head()["count_reviewers"]
print(total_unique_reviewers)
283228

Per calcolare il numero di recensioni per recensore dividiamo il numero totale di recensioni (calcolato per la domanda precedente) per il numero totale di recensori

mean_reviews = total_reviews/total_unique_reviewers
print(mean_reviews)
97.98976089934611

Trovare l’utente che ha scritto più recensioni

Per trovare l’utente che ha scritto più recensioni ci basta raggruppare il Dataframe per gli user id, contare il numero di recensioni per utente e poi ordinare in base a questo valore

df.groupBy("userID").count().orderBy("count", ascending=False).show(5)
+------+-----+
|userID|count|
+------+-----+
|123100|23715|
|117490| 9279|
|134596| 8381|
|212343| 7884|
|242683| 7515|
+------+-----+
only showing top 5 rows

#L'utente 123100 è quello che ha scritto più recenioni in assoluto, con ben 23715 recensioni
df.filter("userID == '123100'").agg({"rating":"mean"}).show()
+------------------+
|       avg(rating)|
+------------------+
|3.1306346194391734|
+------------------+

Trovare i 10 film che hanno ricevuto più recensioni

Questo è facile, per ottenere i film che hanno avuto più recensioni ci raggruppare per film.

dfMovies = df.groupBy("movieID")
dfMovies.count().orderBy("count", ascending=False).show(10)
+-------+-----+
|movieID|count|
+-------+-----+
|    318|97999|
|    356|97040|
|    296|92406|
|    593|87899|
|   2571|84545|
|    260|81815|
|    480|76451|
|    527|71516|
|    110|68803|
|      1|68469|
+-------+-----+
only showing top 10 rows

Trovare i 10 film con la valutazione più alta

Per prima cosa dobbiamo calcolare sul Datagroup ottenuto appena sopra, la valutazione media e il numero di volte che il film è stato valutato, possiamo farlo in 2 modi.

dfMoviesAvg = dfMovies.agg({"rating":"mean", "movieID":"count"}).withColumnRenamed("avg(rating)","avg_rating").withColumnRenamed("count(movieID)","count_rating")
dfMoviesAvg.show(5)
+-------+------------------+------------+
|movieID|        avg_rating|count_rating|
+-------+------------------+------------+
|    296| 4.173971387139363|       92406|
|   1090|3.9017529880478086|       18825|
|   2294|3.2357021735779252|       12974|
|   3210| 3.636775639067115|        9819|
|  48738| 3.849010703859877|        6166|
+-------+------------------+------------+
only showing top 5 rows

oppure

from pyspark.sql.functions import avg, count

dfMoviesAvg = dfMovies.agg(avg("rating").alias("avg_rating"), count("movieID").alias("count_rating"))
dfMoviesAvg.show(5)
+-------+------------------+------------+
|movieID|        avg_rating|count_rating|
+-------+------------------+------------+
|    296| 4.173971387139363|       92406|
|   1090|3.9017529880478086|       18825|
|   2294|3.2357021735779252|       12974|
|   3210| 3.636775639067115|        9819|
|  48738| 3.849010703859877|        6166|
+-------+------------------+------------+
only showing top 5 rows

Per evitare di trovare in cima film che sono stati recensiti una sola volta con 5 stelle, filtriamo solo i film che hanno ricevuto più di 100 recensioni.

dfMoviesMostRated = dfMoviesAvg.filter("count_rating > 100")

Ora ci basta ordinare in base alla valutazione media.

dfMoviesTopRated = dfMoviesMostRated.orderBy("avg_rating", ascending=False)
dfMoviesTopRated.show(10)
+-------+------------------+------------+
|movieID|        avg_rating|count_rating|
+-------+------------------+------------+
| 171011|4.4865181711606095|         853|
| 159817| 4.458092485549133|        1384|
|    318| 4.424188001918387|       97999|
| 170705| 4.399898373983739|         984|
| 174053| 4.350558659217877|        1074|
| 171495| 4.343949044585988|         157|
| 172591| 4.339667458432304|         421|
|    858| 4.332892749244713|       60904|
|     50| 4.291958829205532|       62180|
| 176601| 4.263888888888889|         180|
+-------+------------------+------------+
only showing top 10 rows

Trovare i 10 film con la valutazione più bassa

Per trovare i 10 film con la valutazione più bassa, ci basta ottenere il dataframe ottenuto sopra, questa volta in maniera ascendente.

dfMoviesWorstRated = dfMoviesMostRated.orderBy("avg_rating", ascending=True)
dfMoviesWorstRated.show(10)
+-------+------------------+------------+
|movieID|        avg_rating|count_rating|
+-------+------------------+------------+
|   8859|0.8739495798319328|         238|
|   6483|1.0138592750533049|         469|
|   4775| 1.141025641025641|         741|
|   1826|1.2038288288288288|         444|
|   6587|1.2055555555555555|         810|
|  31698|1.2441176470588236|         680|
|   5739|1.2612359550561798|         178|
|  61348|1.2672849915682969|         593|
|   5738|1.3549382716049383|         162|
|   3574|1.3580645161290323|         155|
+-------+------------------+------------+
only showing top 10 rows

Trovare le 10 valutazioni più recenti

Questo è semplice, ci basta eseguire l’ordinamento in base al timestamp, spark è in grado di ordinare anche delle date.

df.orderBy("timestamp", ascending=False).show(10)
+------+-------+------+-------------------+
|userID|movieID|rating|          timestamp|
+------+-------+------+-------------------+
| 82922| 167780|   4.0|2018-09-26 06:59:09|
| 82922|  53519|   4.0|2018-09-26 06:58:50|
|280481|    494|   3.0|2018-09-26 06:58:47|
|280481|   2355|   3.0|2018-09-26 06:58:43|
|280481|   2294|   2.0|2018-09-26 06:58:41|
|280481| 176101|   3.5|2018-09-26 06:58:30|
|280481|  64614|   3.0|2018-09-26 06:58:22|
| 82922| 165831|   4.0|2018-09-26 06:58:09|
|280481|   1079|   2.5|2018-09-26 06:58:06|
| 82922|  52281|   4.0|2018-09-26 06:58:05|
+------+-------+------+-------------------+
only showing top 10 rows

Trovare i film più visti per anno

Assumption = solo 1% di chi vede il film recensisce

Questa domanda è invece più complessa delle altre. Per semplificare le operazioni creiamo una nuova colonna che contiene soltanto l’anno, possiamo estrarre l’anno dalla data usando la funzione year(timestamp) di spark.

from pyspark.sql.functions import year

dfWithYear = df.withColumn("year", year(df['timestamp'])).drop("timestamp")
dfWithYear.show(5)
+------+-------+------+----+
|userID|movieID|rating|year|
+------+-------+------+----+
|     1|    307|   3.5|2009|
|     1|    481|   3.5|2009|
|     1|   1091|   1.5|2009|
|     1|   1257|   4.5|2009|
|     1|   1449|   4.5|2009|
+------+-------+------+----+
only showing top 5 rows

Ora raggruppiamo il dataframe sia per anno che per id del film, così facendo avremo dei gruppi caratterizzati dalle combinazioni di queste due colonne, e su questo nuovo dataframe calcoliamo il numero di valutazioni.

from pyspark.sql.functions import avg, count

dfMovieYear = dfWithYear.groupBy("year","movieID").agg(count("rating").alias("count_rating"))
dfMovieYear.show(5)
+----+-------+------------+
|year|movieID|count_rating|
+----+-------+------------+
|2005|    255|          43|
|2005|   1917|        3460|
|2005|   3793|        4089|
|2005|   5064|         788|
|2005|   6966|         257|
+----+-------+------------+
only showing top 5 rows

Adesso ci dovrebbe bastare raggruppare per anno per poi calcolare le valutazioni massime ricevute e ordinare in base a questo valore.

from pyspark.sql.functions import max, col

dfMostRatedYear = dfMovieYear.groupBy("year").agg(max("count_rating").alias("count_rating")).orderBy("count_rating", ascending=False)
dfMostRatedYear.show(5)
+----+------------+
|year|count_rating|
+----+------------+
|1996|       25760|
|2015|       12776|
|1997|       11350|
|2016|        8976|
|2017|        7873|
+----+------------+
only showing top 5 rows

ERRORE!!! così facendo abbiamo perso la colonna movie id e, pur conoscendo quante volte è stato valutato il film più valutato dell’anno, non sappiamo quale questo effettivamente sia. Dobbiamo trovare un’altra strada. Una soluzione consiste nel sfruttare una Window insieme alla funzione where.

from pyspark.sql import Window
from pyspark.sql.functions import max, col

window = Window.partitionBy("year")

dfMostRatedYear = dfMovieYear.withColumn("max", max("count_rating").over(window)).where(col("count_rating")==col("max")).drop("max")
dfMostRatedYear.show()
+----+-------+------------+
|year|movieID|count_rating|
+----+-------+------------+
|2003|   5952|        3684|
|2007|   2571|        3409|
|2018|    318|        4311|
|2015|   2571|       12776|
|2006|   7153|        4001|
|2013|    318|        2714|
|1997|    780|       11350|
|2014|    318|        2672|
|2004|   7153|        3697|
|1996|    592|       25760|
|1998|   1721|        2399|
|2012|  79132|        2422|
|2009|  58559|        3720|
|2016|    318|        8976|
|1995|     47|           1|
|1995|     21|           1|
|1995|   1176|           1|
|1995|   1079|           1|
|2001|   1210|        4517|
|2005|   5952|        6228|
+----+-------+------------+
only showing top 20 rows

Rimuoviamo il 1995 dal dataframe, dato che non contiene nessuna informazione utile.

dfMostRatedYear = dfMostRatedYear.where("year != 1995")

Ora moltiplichiamo il count_rating per 100 e, siccome abbiamo affermato che solo l’1% di chi vede un film lo recensisce, otteremo una stima del numero totale di spettatori per film.

dfMostViewedYear = dfMostRatedYear.withColumn("total_viewers",dfMostRatedYear["count_rating"]*100).drop("count_rating")
dfMostViewedYear.show(27)
+----+-------+-------------+
|year|movieID|total_viewers|
+----+-------+-------------+
|2003|   5952|       368400|
|2007|   2571|       340900|
|2018|    318|       431100|
|2015|   2571|      1277600|
|2006|   7153|       400100|
|2013|    318|       271400|
|1997|    780|      1135000|
|2014|    318|       267200|
|2004|   7153|       369700|
|1996|    592|      2576000|
|1998|   1721|       239900|
|2012|  79132|       242200|
|2009|  58559|       372000|
|2016|    318|       897600|
|2001|   1210|       451700|
|2005|   5952|       622800|
|2000|   1210|       745200|
|2010|  72998|       388200|
|2011|  79132|       328000|
|2008|   2571|       420700|
|2017|    318|       787300|
|1999|   2396|       462100|
|2002|   4993|       355100|
+----+-------+-------------+

Aggiungiamo titolo e genere alla lista dei film più visti per anno

Abbiamo già detto che all’interno del file movies.csv si trovano titolo e genere dei film. Carichiamo tale film all’interno di un nuovo Dataframe

df_desc = spark.read.csv("ml-latest/movies.csv", header=True, inferSchema=True)
df_desc.show(10)
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
+-------+--------------------+--------------------+
only showing top 10 rows

Per aggiungere quelle informazioni al dataframe con i film più visti per anno ci basta eseguire un join lunga la colonna movieID, presente in entrambi i Dataframe.

dfMostViewedYear = dfMostViewedYear.join(df_desc, ["movieId"])

# impostando il secondo parametro del metodo .show() a False
# possiamo visualizzare tutto il Dataframe in ampiezza

dfMostViewedYear.show(27, False)
+-------+----+-------------+---------------------------------------------------------+-----------------------------------------------+
|movieID|year|total_viewers|title                                                    |genres                                         |
+-------+----+-------------+---------------------------------------------------------+-----------------------------------------------+
|5952   |2003|368400       |Lord of the Rings: The Two Towers, The (2002)            |Adventure|Fantasy                              |
|2571   |2007|340900       |Matrix, The (1999)                                       |Action|Sci-Fi|Thriller                         |
|318    |2018|431100       |Shawshank Redemption, The (1994)                         |Crime|Drama                                    |
|2571   |2015|1277600      |Matrix, The (1999)                                       |Action|Sci-Fi|Thriller                         |
|7153   |2006|400100       |Lord of the Rings: The Return of the King, The (2003)    |Action|Adventure|Drama|Fantasy                 |
|318    |2013|271400       |Shawshank Redemption, The (1994)                         |Crime|Drama                                    |
|780    |1997|1135000      |Independence Day (a.k.a. ID4) (1996)                     |Action|Adventure|Sci-Fi|Thriller               |
|318    |2014|267200       |Shawshank Redemption, The (1994)                         |Crime|Drama                                    |
|7153   |2004|369700       |Lord of the Rings: The Return of the King, The (2003)    |Action|Adventure|Drama|Fantasy                 |
|592    |1996|2576000      |Batman (1989)                                            |Action|Crime|Thriller                          |
|1721   |1998|239900       |Titanic (1997)                                           |Drama|Romance                                  |
|79132  |2012|242200       |Inception (2010)                                         |Action|Crime|Drama|Mystery|Sci-Fi|Thriller|IMAX|
|58559  |2009|372000       |Dark Knight, The (2008)                                  |Action|Crime|Drama|IMAX                        |
|318    |2016|897600       |Shawshank Redemption, The (1994)                         |Crime|Drama                                    |
|1210   |2001|451700       |Star Wars: Episode VI - Return of the Jedi (1983)        |Action|Adventure|Sci-Fi                        |
|5952   |2005|622800       |Lord of the Rings: The Two Towers, The (2002)            |Adventure|Fantasy                              |
|1210   |2000|745200       |Star Wars: Episode VI - Return of the Jedi (1983)        |Action|Adventure|Sci-Fi                        |
|72998  |2010|388200       |Avatar (2009)                                            |Action|Adventure|Sci-Fi|IMAX                   |
|79132  |2011|328000       |Inception (2010)                                         |Action|Crime|Drama|Mystery|Sci-Fi|Thriller|IMAX|
|2571   |2008|420700       |Matrix, The (1999)                                       |Action|Sci-Fi|Thriller                         |
|318    |2017|787300       |Shawshank Redemption, The (1994)                         |Crime|Drama                                    |
|2396   |1999|462100       |Shakespeare in Love (1998)                               |Comedy|Drama|Romance                           |
|4993   |2002|355100       |Lord of the Rings: The Fellowship of the Ring, The (2001)|Adventure|Fantasy                              |
+-------+----+-------------+---------------------------------------------------------+-----------------------------------------------+

# dfMostViewedYear.toPandas().to_csv("top_movie_by_year.csv", header=True)
dfMostViewedYear.write.csv("top_movie_by_year")

Data la natura parallela di spark, le righe del Dataframe non verranno salvate all’interno di un unico file, ma verrà creato un file per ogni riga. Possiamo unire i file in un singolo file csv utilizzando il comando cat da terminale, infine utilizziamo il comando rm per rimuovere la cartella con i vari file creata da spark.

!cat top_movie_by_year/part* > top_movie_by_year.csv
!rm -r top_movie_by_year

SQL

Se sei appassionato di SQL, questa è la query che ci permette di trovare i film più valutati per anno.

dfWithYear.registerTempTable('movies')
new_df = spark.sql("""
SELECT movieID, YEAR, VALUT FROM (
SELECT T.*, RANK() OVER(PARTITION BY YEAR ORDER BY VALUT DESC) RNK FROM (
SELECT movieID, YEAR, COUNT(*) VALUT FROM movies
GROUP BY movieID, YEAR) T)
WHERE RNK = 1
""")
new_df.show(20)
+-------+----+-----+
|movieID|YEAR|VALUT|
+-------+----+-----+
|   5952|2003| 3684|
|   2571|2007| 3409|
|    318|2018| 4311|
|   2571|2015|12776|
|   7153|2006| 4001|
|    318|2013| 2714|
|    780|1997|11350|
|    318|2014| 2672|
|   7153|2004| 3697|
|    592|1996|25760|
|   1721|1998| 2399|
|  79132|2012| 2422|
|  58559|2009| 3720|
|    318|2016| 8976|
|   1079|1995|    1|
|     21|1995|    1|
|     47|1995|    1|
|   1176|1995|    1|
|   1210|2001| 4517|
|   5952|2005| 6228|
+-------+----+-----+
only showing top 20 rows