Open In Colab

################ template to run PySpark on Colab #######################
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark1 = SparkSession.builder.appName('basic').getOrCreate()
#Test must give no error
import pyspark
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("basic").setMaster("local")
#sc = SparkContext(conf=conf)  ## for jupyter and Databricks
sc = SparkContext.getOrCreate()   ## for Colab
from pyspark.sql.types import *

Analisi del prezzo delle Azioni di Apple

In questo notebook faremo un po’ di pratica con Spark, Dataframe e serie storiche (timeseries). Come lo faremo ? Andando ad analizzare il titolo azionario di Apple (AAPL)

!wget https://frenzy86.s3.eu-west-2.amazonaws.com/fav/tecno/AAPL.csv
--2020-06-18 09:31:44--  https://frenzy86.s3.eu-west-2.amazonaws.com/fav/tecno/AAPL.csv
Resolving frenzy86.s3.eu-west-2.amazonaws.com (frenzy86.s3.eu-west-2.amazonaws.com)... 52.95.149.42
Connecting to frenzy86.s3.eu-west-2.amazonaws.com (frenzy86.s3.eu-west-2.amazonaws.com)|52.95.149.42|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 646721 (632K) [application/vnd.ms-excel]
Saving to: ‘AAPL.csv’

AAPL.csv            100%[===================>] 631.56K  1.40MB/s    in 0.4s    

2020-06-18 09:31:45 (1.40 MB/s) - ‘AAPL.csv’ saved [646721/646721]

df = spark.read.csv("AAPL.csv", header=True, inferSchema=True)
df.show()
+-------------------+--------+--------+--------+--------+---------+---------+
|               Date|    Open|    High|     Low|   Close|Adj Close|   Volume|
+-------------------+--------+--------+--------+--------+---------+---------+
|1980-12-12 00:00:00|0.513393|0.515625|0.513393|0.513393| 0.410525|117258400|
|1980-12-15 00:00:00|0.488839|0.488839|0.486607|0.486607| 0.389106| 43971200|
|1980-12-16 00:00:00|0.453125|0.453125|0.450893|0.450893| 0.360548| 26432000|
|1980-12-17 00:00:00|0.462054|0.464286|0.462054|0.462054| 0.369472| 21610400|
|1980-12-18 00:00:00|0.475446|0.477679|0.475446|0.475446| 0.380182| 18362400|
|1980-12-19 00:00:00|0.504464|0.506696|0.504464|0.504464| 0.403385| 12157600|
|1980-12-22 00:00:00|0.529018|0.531250|0.529018|0.529018| 0.423019|  9340800|
|1980-12-23 00:00:00|0.551339|0.553571|0.551339|0.551339| 0.440868| 11737600|
|1980-12-24 00:00:00|0.580357|0.582589|0.580357|0.580357| 0.464072| 12000800|
|1980-12-26 00:00:00|0.633929|0.636161|0.633929|0.633929| 0.506909| 13893600|
|1980-12-29 00:00:00|0.642857|0.645089|0.642857|0.642857| 0.514049| 23290400|
|1980-12-30 00:00:00|0.629464|0.629464|0.627232|0.627232| 0.501554| 17220000|
|1980-12-31 00:00:00|0.611607|0.611607|0.609375|0.609375| 0.487276|  8937600|
|1981-01-02 00:00:00|0.616071|0.620536|0.616071|0.616071| 0.492630|  5415200|
|1981-01-05 00:00:00|0.604911|0.604911|0.602679|0.602679| 0.481921|  8932000|
|1981-01-06 00:00:00|0.578125|0.578125|0.575893|0.575893| 0.460502| 11289600|
|1981-01-07 00:00:00|0.553571|0.553571|0.551339|0.551339| 0.440868| 13921600|
|1981-01-08 00:00:00|0.542411|0.542411|0.540179|0.540179| 0.431944|  9956800|
|1981-01-09 00:00:00|0.569196|0.571429|0.569196|0.569196| 0.455147|  5376000|
|1981-01-12 00:00:00|0.569196|0.569196|0.564732|0.564732| 0.451577|  5924800|
+-------------------+--------+--------+--------+--------+---------+---------+
only showing top 20 rows

df.printSchema()
root
 |-- Date: timestamp (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)

Lo schema è stato interpretato male, tutte le colonne, ad eccezione del timestamp, sono state lette come stringhe.

Correggiamo lo schema

Definiamo manualmente lo schema con i tipi corretti.

from pyspark.sql.types import *

data_schema = [ #StructField('Date', DateType(), True),
                StructField('Date', TimestampType(), True),
                StructField('Open', FloatType(), True),
                StructField('High', FloatType(), True),
                StructField('Low', FloatType(), True),
                StructField('Close', FloatType(), True),
                StructField('Adj Close', FloatType(), True),
                StructField('Volume', IntegerType(), True),]
            
schema = StructType(fields=data_schema)

df = spark.read.schema(schema).option("header","true").option("inferSchema","false").csv("AAPL.csv")
df.show()
+-------------------+--------+--------+--------+--------+---------+---------+
|               Date|    Open|    High|     Low|   Close|Adj Close|   Volume|
+-------------------+--------+--------+--------+--------+---------+---------+
|1980-12-12 00:00:00|0.513393|0.515625|0.513393|0.513393| 0.410525|117258400|
|1980-12-15 00:00:00|0.488839|0.488839|0.486607|0.486607| 0.389106| 43971200|
|1980-12-16 00:00:00|0.453125|0.453125|0.450893|0.450893| 0.360548| 26432000|
|1980-12-17 00:00:00|0.462054|0.464286|0.462054|0.462054| 0.369472| 21610400|
|1980-12-18 00:00:00|0.475446|0.477679|0.475446|0.475446| 0.380182| 18362400|
|1980-12-19 00:00:00|0.504464|0.506696|0.504464|0.504464| 0.403385| 12157600|
|1980-12-22 00:00:00|0.529018| 0.53125|0.529018|0.529018| 0.423019|  9340800|
|1980-12-23 00:00:00|0.551339|0.553571|0.551339|0.551339| 0.440868| 11737600|
|1980-12-24 00:00:00|0.580357|0.582589|0.580357|0.580357| 0.464072| 12000800|
|1980-12-26 00:00:00|0.633929|0.636161|0.633929|0.633929| 0.506909| 13893600|
|1980-12-29 00:00:00|0.642857|0.645089|0.642857|0.642857| 0.514049| 23290400|
|1980-12-30 00:00:00|0.629464|0.629464|0.627232|0.627232| 0.501554| 17220000|
|1980-12-31 00:00:00|0.611607|0.611607|0.609375|0.609375| 0.487276|  8937600|
|1981-01-02 00:00:00|0.616071|0.620536|0.616071|0.616071|  0.49263|  5415200|
|1981-01-05 00:00:00|0.604911|0.604911|0.602679|0.602679| 0.481921|  8932000|
|1981-01-06 00:00:00|0.578125|0.578125|0.575893|0.575893| 0.460502| 11289600|
|1981-01-07 00:00:00|0.553571|0.553571|0.551339|0.551339| 0.440868| 13921600|
|1981-01-08 00:00:00|0.542411|0.542411|0.540179|0.540179| 0.431944|  9956800|
|1981-01-09 00:00:00|0.569196|0.571429|0.569196|0.569196| 0.455147|  5376000|
|1981-01-12 00:00:00|0.569196|0.569196|0.564732|0.564732| 0.451577|  5924800|
+-------------------+--------+--------+--------+--------+---------+---------+
only showing top 20 rows

df.printSchema()
root
 |-- Date: timestamp (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- Adj Close: float (nullable = true)
 |-- Volume: integer (nullable = true)

Adesso lo schema è corretto, l’ora presente nel timestamp è un’informazione superflua, dato che coincide sempre con la mezzanotte, rimuoviamola.

Convertiamo il timestamp in una data

Per rimuovere l’ora ci basta convertire il timestamp in una data, per farlo utilizziamo la funzione .to_date(column, format) di spark.

from pyspark.sql.functions import to_date

df = df.withColumn('Date', to_date(df["Date"], "yyyy-MM-dd"))
df.show()
+----------+--------+--------+--------+--------+---------+---------+
|      Date|    Open|    High|     Low|   Close|Adj Close|   Volume|
+----------+--------+--------+--------+--------+---------+---------+
|1980-12-12|0.513393|0.515625|0.513393|0.513393| 0.410525|117258400|
|1980-12-15|0.488839|0.488839|0.486607|0.486607| 0.389106| 43971200|
|1980-12-16|0.453125|0.453125|0.450893|0.450893| 0.360548| 26432000|
|1980-12-17|0.462054|0.464286|0.462054|0.462054| 0.369472| 21610400|
|1980-12-18|0.475446|0.477679|0.475446|0.475446| 0.380182| 18362400|
|1980-12-19|0.504464|0.506696|0.504464|0.504464| 0.403385| 12157600|
|1980-12-22|0.529018| 0.53125|0.529018|0.529018| 0.423019|  9340800|
|1980-12-23|0.551339|0.553571|0.551339|0.551339| 0.440868| 11737600|
|1980-12-24|0.580357|0.582589|0.580357|0.580357| 0.464072| 12000800|
|1980-12-26|0.633929|0.636161|0.633929|0.633929| 0.506909| 13893600|
|1980-12-29|0.642857|0.645089|0.642857|0.642857| 0.514049| 23290400|
|1980-12-30|0.629464|0.629464|0.627232|0.627232| 0.501554| 17220000|
|1980-12-31|0.611607|0.611607|0.609375|0.609375| 0.487276|  8937600|
|1981-01-02|0.616071|0.620536|0.616071|0.616071|  0.49263|  5415200|
|1981-01-05|0.604911|0.604911|0.602679|0.602679| 0.481921|  8932000|
|1981-01-06|0.578125|0.578125|0.575893|0.575893| 0.460502| 11289600|
|1981-01-07|0.553571|0.553571|0.551339|0.551339| 0.440868| 13921600|
|1981-01-08|0.542411|0.542411|0.540179|0.540179| 0.431944|  9956800|
|1981-01-09|0.569196|0.571429|0.569196|0.569196| 0.455147|  5376000|
|1981-01-12|0.569196|0.569196|0.564732|0.564732| 0.451577|  5924800|
+----------+--------+--------+--------+--------+---------+---------+
only showing top 20 rows

df.printSchema()
root
 |-- Date: date (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- Adj Close: float (nullable = true)
 |-- Volume: integer (nullable = true)

Cerchiamo il giorno con il valore maggiore

Per trovare il giorno in cui il prezzo del titolo ha raggiunto il valore maggiore dobbiamo semplicemente ordinare il dataset in base alla colonna High, in ordine discendente, poi stampiamo la prima riga.

from pyspark.sql.functions import max

df.orderBy("High", ascending=False).show(1)
+----------+------+------+------+------+---------+--------+
|      Date|  Open|  High|   Low| Close|Adj Close|  Volume|
+----------+------+------+------+------+---------+--------+
|2018-10-03|230.05|233.47|229.78|232.07|229.39209|28654800|
+----------+------+------+------+------+---------+--------+
only showing top 1 row

Cerchiamo il giorno con il valore minore dopo il 2000

Anche in questo caso l’operazione è semplice, filtriamo i giorni successivi al primo Gennaio del 2000 e ordiniamoli in base alla colonna Low, in ordine ascendente, quindi stampiamo la prima riga.

df.filter("Date > '2000-01-01'").orderBy("Low", ascending=True).show(1)
+----------+--------+--------+--------+--------+---------+---------+
|      Date|    Open|    High|     Low|   Close|Adj Close|   Volume|
+----------+--------+--------+--------+--------+---------+---------+
|2003-04-17|0.942857|0.946429|0.908571|0.937143| 0.820964|154064400|
+----------+--------+--------+--------+--------+---------+---------+
only showing top 1 row

Calcoliamo la percentuale di giorni con un valore in chiusura minore di 100 USD

Per calcolare la percentuale dobbiamo dividere il numero di giorni in cui il valore ha chiuso sotto i 100$ per il numero di gironi totali e moltiplicare per 100, dato che il numero avrà molte cifre dopo la virogola lo arrotondiamo utilizzando la funzio round di python.

round((df.filter(df["Close"]<100).count() / df.count())*100, 2)
89.67

I giorni sono quasi il 90% ed è comune dato che stiamo considerando fin da quando la Apple è stata quotata in borsa negli anni 80.

Contiamo i giorni successivi al 2014 con un valore in chiusura minore di 100 USD

Ripetiamo lo stesso processo di sopra, ma questa volta filtriamo soltanto i giorno successivi al primo Gennaio 2014.

df2014 = df.filter("Date >= '2014-01-01'")
df2014.show()
+----------+---------+---------+---------+---------+---------+---------+
|      Date|     Open|     High|      Low|    Close|Adj Close|   Volume|
+----------+---------+---------+---------+---------+---------+---------+
|2014-01-02| 79.38286|79.575714|    78.86| 79.01857| 71.59167| 58671200|
|2014-01-03|    78.98|     79.1|77.204285| 77.28286|  70.0191| 98116900|
|2014-01-06| 76.77857| 78.11429| 76.22857|77.704285|  70.4009|103152700|
|2014-01-07|    77.76|77.994286| 76.84571|77.148575| 69.89742| 79302300|
|2014-01-08|76.972855| 77.93714| 76.95571|77.637146|70.340096| 64632400|
|2014-01-09| 78.11429|78.122856| 76.47857| 76.64571| 69.44184| 69787200|
|2014-01-10| 77.11857| 77.25714|75.872856|76.134285| 68.97846| 76244000|
|2014-01-13| 75.70143|     77.5| 75.69714| 76.53286|69.339584| 94623200|
|2014-01-14| 76.88857| 78.10429| 76.80857| 78.05572| 70.71932| 83140400|
|2014-01-15| 79.07429| 80.02857| 78.80857|79.622856| 72.13917| 97909700|
|2014-01-16| 79.27143|    79.55| 78.81143| 79.17857| 71.73665| 57319500|
|2014-01-17| 78.78286| 78.86714| 77.12857| 77.23857| 69.97897|106684900|
|2014-01-21| 77.28429| 78.58143| 77.20286| 78.43857|71.066185| 82131700|
|2014-01-22| 78.70143|79.612854| 78.25857| 78.78714|   71.382| 94996300|
|2014-01-23| 78.56286|     79.5|    77.83|79.454285| 71.98645|100809800|
|2014-01-24| 79.14286| 79.37428| 77.82143|    78.01|70.677895|107338700|
|2014-01-27| 78.58143| 79.25714| 77.96429| 78.64286| 71.25127|138719700|
|2014-01-28|    72.68| 73.57143| 71.72429| 72.35714| 65.55634|266380800|
|2014-01-29| 71.99286| 72.48143| 71.23143| 71.53571|64.812126|125702500|
|2014-01-30| 71.79143| 72.35714|70.957146| 71.39714|64.686554|169625400|
+----------+---------+---------+---------+---------+---------+---------+
only showing top 20 rows

round((df2014.filter(df["Close"]<100).count() / df2010.count())*100, 2)
21.54

In questo caso la Apple ha chiuso sotto i 100$ per soltanto il 21.5% delle volte

Visualizziamo il massimo per anno

Raggruppiamo il dataframe per anno, per estrarre solo l’anno dalla data possiamo usare la funzione year(col) di spark.

from pyspark.sql.functions import year

dfGroupYear = df.groupBy(year("Date").alias("Year"))

Ora aggreghiamo in base al valore bassimo della colonna High e ordiniamo in base all’anno.

from pyspark.sql.functions import year

dfHigh = dfGroupYear.agg(max("High"))
dfHigh.orderBy("Year", ascending=False).show(39)
+----+---------+
|Year|max(High)|
+----+---------+
|2018|   233.47|
|2017|    177.2|
|2016|   118.69|
|2015|   134.54|
|2014|   119.75|
|2013| 82.16286|
|2012|100.72429|
|2011| 60.95714|
|2010|46.665714|
|2009|30.564285|
|2008|28.608572|
|2007|28.994286|
|2006|13.308572|
|2005|    10.78|
|2004| 4.969285|
|2003| 1.786429|
|2002| 1.869286|
|2001| 1.937143|
|2000| 5.370536|
|1999| 4.214286|
|1998|   1.5625|
|1997| 1.055804|
|1996| 1.267857|
|1995| 1.790179|
|1994|   1.5625|
|1993| 2.330357|
|1992|      2.5|
|1991| 2.616071|
|1990| 1.705357|
|1989| 1.799107|
|1988| 1.705357|
|1987| 2.133929|
|1986| 0.783482|
|1985| 0.555804|
|1984| 0.613839|
|1983| 1.129464|
|1982| 0.622768|
|1981| 0.620536|
|1980| 0.645089|
+----+---------+
only showing top 39 rows

Troviamo l’anno con il volume maggiore

Utiliziamo il Dataframe raggruppato per anno che abbiamo creato sopra e sommiamo i volumi di ogni anno utilizzando il metodo .agg, poi stampiamo i 10 risultati ordinati per volumi totali.

from pyspark.sql.functions import sum

dfVolume = dfGroupYear.agg(sum("Volume").alias("Volume"))
dfVolume.orderBy("Volume", ascending=False).show(10)
+----+-----------+
|Year|     Volume|
+----+-----------+
|2008|71495301500|
|2007|61748996400|
|2006|53924741500|
|2005|45600245600|
|2010|37756231800|
|2009|35813421700|
|1999|34275676400|
|2012|32991051100|
|2011|31014834900|
|2004|30450417200|
+----+-----------+
only showing top 10 rows

Verifichiamo la variazione del titolo di Apple in seguito alla commercializzazione dell’iPhone.

Dobbiamo considerare il valore del titolo tra il 2007-06-29 e 180 giorni dopo, per calcolare la seconda data utilizziamo il modulo datetime di python.

from datetime import datetime, timedelta

start_date = '2007-06-29'
end_date = datetime.strptime(start_date, "%Y-%m-%d")+timedelta(days=180) # convertiamo la stringa in data e aggiungiamo 180 giorni
end_date = datetime.strftime(end_date, "%Y-%m-%d") # convertiamo la data in stringa

print(end_date)
2007-12-26

Ottimo, ora estraiamo i valori per la data di inizio all’interno di un dizionario python, possiamo farlo usando un semplice filtro e poi il metodo .asDict() sull’oggetto Row.

rowFirst = df.filter("Date == '"+start_date+"'").take(1)
dictFirst = rowFirst[0].asDict()

dictFirst
{'Adj Close': 15.272941589355469,
 'Close': 17.43428611755371,
 'Date': datetime.date(2007, 6, 29),
 'High': 17.714284896850586,
 'Low': 17.29857063293457,
 'Open': 17.424285888671875,
 'Volume': 284460400}

Ripetiamo lo stesso procedimento per la data di fine.

rowLast = df.filter("Date == '"+end_date+"'").take(1)
dictLast = rowLast[0].asDict()

dictLast
{'Adj Close': 24.89799690246582,
 'Close': 28.421428680419922,
 'Date': datetime.date(2007, 12, 26),
 'High': 28.70857048034668,
 'Low': 28.117143630981445,
 'Open': 28.43000030517578,
 'Volume': 175933100}

Ora per ottenere la variazione in percentuale ci basta applicare la seguente formula:

\($\frac{close-open}{close}*100\)$ e arrotondiamo il risultato a 2 cifre dopo la virgola.

round((dictLast["Close"]-dictFirst["Open"])/dictLast["Close"]*100,2)
38.69

Un +38.7% per la gioia degli investitori :D