{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"
"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {},
"colab_type": "code",
"id": "fFqeiV3qxso7"
},
"outputs": [],
"source": [
"################ template to run PySpark on Colab #######################"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {},
"colab_type": "code",
"id": "m6uGlMRavWlb"
},
"outputs": [],
"source": [
"!apt-get install openjdk-8-jdk-headless -qq > /dev/null\n",
"!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz\n",
"!tar xf spark-2.4.5-bin-hadoop2.7.tgz\n",
"!pip install -q findspark"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {},
"colab_type": "code",
"id": "f75mt5iLvaI3"
},
"outputs": [],
"source": [
"import os\n",
"os.environ[\"JAVA_HOME\"] = \"/usr/lib/jvm/java-8-openjdk-amd64\"\n",
"os.environ[\"SPARK_HOME\"] = \"/content/spark-2.4.5-bin-hadoop2.7\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {},
"colab_type": "code",
"id": "X1NyKr7qvaGP"
},
"outputs": [],
"source": [
"import findspark\n",
"findspark.init()\n",
"\n",
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder.master(\"local[*]\").getOrCreate()\n",
"spark1 = SparkSession.builder.appName('basic').getOrCreate()\n",
"#Test must give no error"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {},
"colab_type": "code",
"id": "MXYgwjn4vaAH"
},
"outputs": [],
"source": [
"import pyspark"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {},
"colab_type": "code",
"id": "08V9HAqMxm9D"
},
"outputs": [],
"source": [
"################ end template PySpark on Colab ##########################"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "PeicqwBMIm7O"
},
"source": [
"# RDD: Resilient Distributed Dataset"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "D-q5ru9fIuEw"
},
"source": [
"Il Resilient Distributed Dataset (RDD) è l'astrazione principale di Spark, una collezione di elementi partizionati tra i nodi del cluster che possono essere operati in parallelo. In questo notebook vederemo le operazioni principali che possiamo eseguire su un RDD."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {},
"colab_type": "code",
"id": "KyXvQxwIISa9"
},
"outputs": [],
"source": [
"from pyspark import SparkConf, SparkContext"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "DWV7rK61IZDo"
},
"source": [
"SparkContext, che indicherà a spark come accedere al cluster, l'oggetto ha bisogno di una configurazione, che possiamo creare con la classe SparkConf. All'interno della configurazione dovremo specificare almeno:\n",
"\n",
"- nome dell'applicazione: tramite il metodo setAppName(string)\n",
"- indirizzo del cluster: tramite il metodo setMaster(string), nel caso in cui usiamo la nostra macchina locale, possiamo specificare 'local'.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {},
"colab_type": "code",
"id": "KJioxsB0ISYK"
},
"outputs": [],
"source": [
"conf = SparkConf().setAppName(\"basic\").setMaster(\"local\")\n",
"#sc = SparkContext(conf=conf) ## for jupyter and Databricks\n",
"sc = SparkContext.getOrCreate() ## for Colab"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 24463,
"status": "ok",
"timestamp": 1592260802429,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "tnzU3bSlISVW",
"outputId": "2fcd71c8-f28f-4aaa-f8e4-40582aa09a29"
},
"outputs": [
{
"data": {
"text/plain": [
"pyspark.rdd.RDD"
]
},
"execution_count": 9,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"data = [0,1,2,3,4,5,6,7,8,9]\n",
"dataDist = sc.parallelize(data)\n",
"type(dataDist)"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "Y36O4Xk7I5xq"
},
"source": [
"Possiamo raccogliere i dati distribuiti dal RDD in una lista utilizzando il metodo .collect()."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 50
},
"colab_type": "code",
"executionInfo": {
"elapsed": 1067,
"status": "ok",
"timestamp": 1592260807279,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "5u1iarwmISTE",
"outputId": "c4385da7-a24c-4a18-8503-322984b9d302"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n"
]
}
],
"source": [
"dataList = dataDist.collect()\n",
"print(type(dataList))\n",
"print(dataList)"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "ORr4qb_OI-rN"
},
"source": [
"Se invece volessimo ottenere soltanto n elementi, possiamo utilizzare il metodo .take(n), ad esempio selezioniamo soltato 3 elementi."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 50
},
"colab_type": "code",
"executionInfo": {
"elapsed": 1244,
"status": "ok",
"timestamp": 1592260828090,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "5Gg2N5iTISQk",
"outputId": "dc6359cc-2a99-4aaa-f9a9-f09e100a2578"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"[0, 1, 2]\n"
]
}
],
"source": [
"dataList = dataDist.take(3)\n",
"print(type(dataList))\n",
"print(dataList)"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "vSG6xAl5JHfR"
},
"source": [
"Per contare il numero di elementi di un RDD possiamo usare il metodo .count()."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 885,
"status": "ok",
"timestamp": 1592260843179,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "9JPgmX9XISN-",
"outputId": "73f60c06-92bc-4193-bc38-58ab91f8d558"
},
"outputs": [
{
"data": {
"text/plain": [
"10"
]
},
"execution_count": 12,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"dataDist.count()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "Ml84YGpxJKL5"
},
"source": [
"Per contare il numero di elementi unici possiamo usare il metodo .countByValue(), il risultato sarà un oggetto defaultdict che mappa ogni elemento del RDD al numero delle volte che questo elemento viene trovato all'interno del RDD."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 889,
"status": "ok",
"timestamp": 1592260877344,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "NmYBgQf0ISLo",
"outputId": "b60c3fa3-226a-40f9-de41-294e639e787d"
},
"outputs": [
{
"data": {
"text/plain": [
"defaultdict(int, {0: 1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1})"
]
},
"execution_count": 13,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"dataDist.countByValue()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "RGWZHOI3JRcZ"
},
"source": [
"Possiamo ottenere gli n valori maggiori all'interno del RDD usando il metodo top(n)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 887,
"status": "ok",
"timestamp": 1592260900784,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "TGusWF8ZISGS",
"outputId": "025f7ad6-436e-4f0e-a156-3730971abc86"
},
"outputs": [
{
"data": {
"text/plain": [
"[9, 8, 7, 6, 5]"
]
},
"execution_count": 15,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"dataDist.top(5)"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "f5fsPolwJXEp"
},
"source": [
"## Altre operazioni RDD"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "9kgG5EjQJanm"
},
"source": [
"Vediamo altre azioni che possiamo eseguire sugli RDD. Definiamo due nuovi RDD."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {},
"colab_type": "code",
"id": "zfmXM_NTJcuo"
},
"outputs": [],
"source": [
"dist1 = sc.parallelize([1,2,3,4,5])\n",
"dist2 = sc.parallelize([5,6,7,8,9])"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "jBVHnCl8JhDY"
},
"source": [
"**Union**\n",
"Ci permette di unire due RDD in un unico RDD.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 880,
"status": "ok",
"timestamp": 1592260988409,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "HNqmlSVrJcqZ",
"outputId": "4e9cbb98-aed1-4b6d-c661-4e7e49c371df"
},
"outputs": [
{
"data": {
"text/plain": [
"[1, 2, 3, 4, 5, 5, 6, 7, 8, 9]"
]
},
"execution_count": 17,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"dist3 = dist1.union(dist2)\n",
"dist3.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "z4vAXrNIJrck"
},
"source": [
"**Intersection**\n",
"Ci permette di creare un nuovo RDD contenente solo gli elementi presenti in entrambi gli RDD.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 1249,
"status": "ok",
"timestamp": 1592261037060,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "wzLNvZxBJcnP",
"outputId": "9c820c8d-29f5-453a-8f5a-610b46805846"
},
"outputs": [
{
"data": {
"text/plain": [
"[5]"
]
},
"execution_count": 19,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"dist3 = dist1.intersection(dist2)\n",
"dist3.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "mQf1U8muJ3Kk"
},
"source": [
"**Subtract**\n",
"Ci permette di creare un nuovo RDD con gli elementi del primo RDD non presenti anche nel secondo RDD.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 1234,
"status": "ok",
"timestamp": 1592261064365,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "kgBTQU2tJckD",
"outputId": "f5b70a2a-12a9-434f-8a36-59413f6b25e3"
},
"outputs": [
{
"data": {
"text/plain": [
"[4, 1, 2, 3]"
]
},
"execution_count": 20,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"dist3 = dist1.subtract(dist2)\n",
"dist3.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "Bha5wPCFJ-4K"
},
"source": [
"**Cartesian** Il risultato è un nuovo RDD composto da tutte le combinazioni di 2 coppie di elementi presi dai due RDD.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 436
},
"colab_type": "code",
"executionInfo": {
"elapsed": 1011,
"status": "ok",
"timestamp": 1592261093759,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "LouIpvxDJ8SE",
"outputId": "3069e25a-0899-4d2d-850d-be6ec336c5a6"
},
"outputs": [
{
"data": {
"text/plain": [
"[(1, 5),\n",
" (1, 6),\n",
" (2, 5),\n",
" (2, 6),\n",
" (1, 7),\n",
" (1, 8),\n",
" (2, 7),\n",
" (2, 8),\n",
" (1, 9),\n",
" (2, 9),\n",
" (3, 5),\n",
" (3, 6),\n",
" (4, 5),\n",
" (4, 6),\n",
" (5, 5),\n",
" (5, 6),\n",
" (3, 7),\n",
" (3, 8),\n",
" (4, 7),\n",
" (4, 8),\n",
" (3, 9),\n",
" (4, 9),\n",
" (5, 7),\n",
" (5, 8),\n",
" (5, 9)]"
]
},
"execution_count": 21,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"dist3 = dist1.cartesian(dist2)\n",
"dist3.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "1yMCuQSpKFeA"
},
"source": [
"## Map e Reduce\n",
"Le applicazioni principali del RDD, come per qualsiasi altro tipo di oggetto distribuito, sono Map e Reduce.\n",
"Map ci permette di applicare un'operazione ad ogni elemento del RDD, passando al suo interno la funzione da applicare, facciamo un'esempio con una funzione che calcola il quadrato di ogni valore all'interno del RDD.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 975,
"status": "ok",
"timestamp": 1592261132852,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "9I5GwmgOJ8Op",
"outputId": "371479eb-6c87-412f-9673-cf620a634b5f"
},
"outputs": [
{
"data": {
"text/plain": [
"[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]"
]
},
"execution_count": 22,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"def compute_pow(d):\n",
" return d*d\n",
"\n",
"powDist = dataDist.map(compute_pow)\n",
"powDist.collect()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 952,
"status": "ok",
"timestamp": 1592261142800,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "aGBMDke_J8MK",
"outputId": "6a04a76a-1130-4e0b-be2c-fa38bd3b4864"
},
"outputs": [
{
"data": {
"text/plain": [
"[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]"
]
},
"execution_count": 23,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"powDist = dataDist.map(lambda d: d*d)\n",
"powDist.collect()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 688,
"status": "ok",
"timestamp": 1592261240928,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "dwVCH5gjKmLY",
"outputId": "fb5c8e14-8224-487c-daca-b56ae62bb600"
},
"outputs": [
{
"data": {
"text/plain": [
"[['Questo', 'corso', 'FAV', 'è', 'stupendo!'], ['Li', 'seguirò', 'tutti!']]"
]
},
"execution_count": 25,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"s = [\"Questo corso FAV è stupendo!\",\"Li seguirò tutti!\"]\n",
"sDist = sc.parallelize(s)\n",
"\n",
"lensDist = sDist.map(lambda w: w.split())\n",
"lensDist.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "Sdtj1ZcJKadf"
},
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 735,
"status": "ok",
"timestamp": 1592261243830,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "Nz9HJASFJ8Jq",
"outputId": "f6e565ac-a4b3-4c1e-c6bb-0de661c4754b"
},
"outputs": [
{
"data": {
"text/plain": [
"['Questo', 'corso', 'FAV', 'è', 'stupendo!', 'Li', 'seguirò', 'tutti!']"
]
},
"execution_count": 26,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"sDist = sc.parallelize(s)\n",
"\n",
"wordsDist = sDist.flatMap(lambda w: w.split())\n",
"wordsDist.collect()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 571,
"status": "ok",
"timestamp": 1592261270557,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "vxwcmnt1J8HH",
"outputId": "15e9b7f5-ac56-42a1-a0b4-d5dd16c8313a"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"45\n"
]
}
],
"source": [
"def add(a,b):\n",
" return a+b\n",
"\n",
"dataSum = dataDist.reduce(add)\n",
"print(dataSum)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 811,
"status": "ok",
"timestamp": 1592261295866,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "lk1QwDn8J8EP",
"outputId": "058e3f9d-f49c-470d-f163-a28b42fc4698"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"45\n"
]
}
],
"source": [
"dataSum = dataDist.reduce(lambda a,b: a+b)\n",
"print(dataSum)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 874,
"status": "ok",
"timestamp": 1592261382836,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "DlcWd0bIJcez",
"outputId": "423fc7da-432e-436e-bd8d-49e1fd858e17"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"45\n"
]
}
],
"source": [
"from operator import add\n",
"\n",
"dataSum = dataDist.reduce(add)\n",
"print(dataSum)"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "F7NTEKNsLPTp"
},
"source": [
"## Filter\n",
"\n",
"Il metodo filter ci permette di filtrare gli elementi del RDD in base ad una funzione definita da noi, ad esempio creiamo un nuovo RDD con 10 parole e filtriamo quelle che hanno una lunghezza superiore a 15 caratteri.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 84
},
"colab_type": "code",
"executionInfo": {
"elapsed": 907,
"status": "ok",
"timestamp": 1592261427657,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "nxEVisP-Jcb9",
"outputId": "1171571e-82e9-4b17-ca5d-fc935a9233de"
},
"outputs": [
{
"data": {
"text/plain": [
"['Artificial Intelligence',\n",
" 'Reinforcement LearningDeep Learning',\n",
" 'Natural Language Processing',\n",
" 'Augmented Reality']"
]
},
"execution_count": 32,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"words = [\"Artificial Intelligence\",\"Machine Learning\", \"Reinforcement Learning\"\n",
" \"Deep Learning\",\"Computer Vision\", \"Natural Language Processing\",\n",
" \"Augmented Reality\", \"Blockchain\", \"Robotic\", \"Cyber Security\"]\n",
"\n",
"wordsDist = sc.parallelize(words)\n",
"\n",
"filterDist = wordsDist.filter(lambda w: len(w)>16)\n",
"filterDist.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "AGC0HtCfLZM2"
},
"source": [
"Oppure filtriamo solo quelle che cominciamo per una vocale\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 1042,
"status": "ok",
"timestamp": 1592261441562,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "dcTEVz6uLR-J",
"outputId": "2490b1e2-9e0c-4e23-ec04-6ada4b360204"
},
"outputs": [
{
"data": {
"text/plain": [
"['Artificial Intelligence', 'Augmented Reality']"
]
},
"execution_count": 33,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"filterDist = wordsDist.filter(lambda w: (w[0].lower() in \"aeiou\"))\n",
"filterDist.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "BLJas-icLdWY"
},
"source": [
"###Distinct\n",
"\n",
"Il metodo .dinstrinct() ci permette di ridurre il contenuto del RDD ad elementi unici, rimuovendo eventuali doppi.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 896,
"status": "ok",
"timestamp": 1592261506064,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "PGKOOiaMLW0m",
"outputId": "bf0673d4-0094-4c3a-cc39-0cc1fa400590"
},
"outputs": [
{
"data": {
"text/plain": [
"['Andrea', 'Gabriele', 'Luca', 'Marco']"
]
},
"execution_count": 35,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"namesDist = sc.parallelize([\"Andrea\",\"Luca\",\"Marco\",\"Marco\",\"Gabriele\"])\n",
"\n",
"uniqueDist = namesDist.distinct()\n",
"uniqueDist.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "BAIJFtunLq0h"
},
"source": [
"Sample\n",
"\n",
"Il metodo .sample(withReplacement, fraction) ci permette di selezionare casualmente dal RDD degli elementi, questo metodo ha bisogno di due parametri:\n",
"\n",
" withReplacement: va settato a True se un elemento può essere selezionato più di una volta, a False altrimenti.\n",
" fraction: probabilità che un elemento ha di essere selezionato, una probabilità di 0 ci ritornerà un rdd vuoto, una probabilità di 0.5 indica che ogni elemento ha il 50% di possibilità di essere selezionato, una probabilità di 1 ritornerà l'RDD originale.\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 134
},
"colab_type": "code",
"executionInfo": {
"elapsed": 723,
"status": "ok",
"timestamp": 1592261537489,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "d44U_3DwLWw-",
"outputId": "ddda214e-960b-44f6-f3b2-9139095ebdce"
},
"outputs": [
{
"data": {
"text/plain": [
"['Artificial Intelligence',\n",
" 'Machine Learning',\n",
" 'Computer Vision',\n",
" 'Natural Language Processing',\n",
" 'Augmented Reality',\n",
" 'Blockchain',\n",
" 'Robotic']"
]
},
"execution_count": 36,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"wordsDist.sample(withReplacement=False, fraction=0.5).collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "pkdgfiqVL4xU"
},
"source": [
"## RDD chiave-valore"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "RMNTSxybL4r2"
},
"source": [
"Creiamo un RDD di esempio, contenente degli acquisti effettuati all'interno di un app, ogni elemento sarà caratterizzato da una lista contenente:\n",
"\n",
" Username: che fungerà da chiave.\n",
" Item: una nuova lista che contiene id dell'item acquistato ed il prezzo.\n",
"\n",
"Quando utilizziamo una rappresentazione a lista, il primo elemento viene sempre interpretato da spark come la chiave ed il secondo come il valore.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 168
},
"colab_type": "code",
"executionInfo": {
"elapsed": 925,
"status": "ok",
"timestamp": 1592261609783,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "Q7gZR4CdLWvA",
"outputId": "9cd7a406-dc6f-4abc-e2f8-1d8ebf9f530c"
},
"outputs": [
{
"data": {
"text/plain": [
"[('guizard', ('pacchetto-crediti-1', '0.89 €')),\n",
" ('bitleader', ('pacchetto-crediti-1', '0.89 €')),\n",
" ('guizard', ('ads-remover', '4.99 €')),\n",
" ('guizard', ('pacchetto-crediti-3', '1.99 €')),\n",
" ('bitleader', ('potenziamento-1', '1.49 €')),\n",
" ('bitleader', ('potenziamento-2', '2.99 €')),\n",
" ('lightlord', ('ads-remover', '4.99 €')),\n",
" ('peanut', ('pacchett-crediti-1', '0.89 €')),\n",
" ('lightlord', ('pacchetto-crediti-3', '4.99 €'))]"
]
},
"execution_count": 37,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"purchases = [(\"guizard\", (\"pacchetto-crediti-1\", \"0.89 €\")),\n",
" (\"bitleader\", (\"pacchetto-crediti-1\", \"0.89 €\")),\n",
" (\"guizard\", (\"ads-remover\", \"4.99 €\")),\n",
" (\"guizard\", (\"pacchetto-crediti-3\", \"1.99 €\")),\n",
" (\"bitleader\", (\"potenziamento-1\", \"1.49 €\")),\n",
" (\"bitleader\", (\"potenziamento-2\", \"2.99 €\")),\n",
" (\"lightlord\", (\"ads-remover\", \"4.99 €\")),\n",
" (\"peanut\", (\"pacchett-crediti-1\", \"0.89 €\")),\n",
" (\"lightlord\", (\"pacchetto-crediti-3\", \"4.99 €\"))]\n",
"\n",
"purchasesRDD = sc.parallelize(purchases)\n",
"purchasesRDD.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "KgQ4sJVKMEMW"
},
"source": [
"## Map e Reduce con chiave\n",
"Per eseguire una trasformazione al contenuto del RDD, ma non alle chiavi, possiamo utilizzare il metodo .mapValues(func).\n",
"Ad esempio convertiamo in maiuscolo l'item id.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 168
},
"colab_type": "code",
"executionInfo": {
"elapsed": 877,
"status": "ok",
"timestamp": 1592261636574,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "Re2Lc_NzLWsB",
"outputId": "db2ab1b0-da69-4daf-d598-1f282c554783"
},
"outputs": [
{
"data": {
"text/plain": [
"[('guizard', ('PACCHETTO-CREDITI-1', '0.89 €')),\n",
" ('bitleader', ('PACCHETTO-CREDITI-1', '0.89 €')),\n",
" ('guizard', ('ADS-REMOVER', '4.99 €')),\n",
" ('guizard', ('PACCHETTO-CREDITI-3', '1.99 €')),\n",
" ('bitleader', ('POTENZIAMENTO-1', '1.49 €')),\n",
" ('bitleader', ('POTENZIAMENTO-2', '2.99 €')),\n",
" ('lightlord', ('ADS-REMOVER', '4.99 €')),\n",
" ('peanut', ('PACCHETT-CREDITI-1', '0.89 €')),\n",
" ('lightlord', ('PACCHETTO-CREDITI-3', '4.99 €'))]"
]
},
"execution_count": 38,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"purchasesRDD = purchasesRDD.mapValues(lambda x: (x[0].upper(), x[1]))\n",
"purchasesRDD.collect()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 168
},
"colab_type": "code",
"executionInfo": {
"elapsed": 905,
"status": "ok",
"timestamp": 1592261644617,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "XMK3ncQiMBqD",
"outputId": "3910a1cb-004b-4b3a-e823-f39dec6c83ae"
},
"outputs": [
{
"data": {
"text/plain": [
"[('guizard', ('PACCHETTO-CREDITI-1', 0.89)),\n",
" ('bitleader', ('PACCHETTO-CREDITI-1', 0.89)),\n",
" ('guizard', ('ADS-REMOVER', 4.99)),\n",
" ('guizard', ('PACCHETTO-CREDITI-3', 1.99)),\n",
" ('bitleader', ('POTENZIAMENTO-1', 1.49)),\n",
" ('bitleader', ('POTENZIAMENTO-2', 2.99)),\n",
" ('lightlord', ('ADS-REMOVER', 4.99)),\n",
" ('peanut', ('PACCHETT-CREDITI-1', 0.89)),\n",
" ('lightlord', ('PACCHETTO-CREDITI-3', 4.99))]"
]
},
"execution_count": 39,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"purchasesRDD = purchasesRDD.mapValues(lambda x: (x[0], float(x[1].split(\" €\")[0])))\n",
"purchasesRDD.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "hmP7PjvGMOP3"
},
"source": [
"Un'altra operazione comune quando si lavora con una struttura dati in formato chiave-valore è il voler raggruppare i dati in base alla chiave, con un RDD possiamo farlo usando il metodo .reduceByKey(func), usiamolo per sommare gli acquisti effettuati da ogni utente ed ottenere il valore totale. Per prima cosa creiamo una nuovo RDD che contiene soltanto il nome utente come chiave ed il costo dell'acquisto come valore."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 168
},
"colab_type": "code",
"executionInfo": {
"elapsed": 918,
"status": "ok",
"timestamp": 1592261673879,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "jq_a0UHVMBnF",
"outputId": "488711b5-5d4a-4ee8-9601-6b35e1ff7251"
},
"outputs": [
{
"data": {
"text/plain": [
"[('guizard', 0.89),\n",
" ('bitleader', 0.89),\n",
" ('guizard', 4.99),\n",
" ('guizard', 1.99),\n",
" ('bitleader', 1.49),\n",
" ('bitleader', 2.99),\n",
" ('lightlord', 4.99),\n",
" ('peanut', 0.89),\n",
" ('lightlord', 4.99)]"
]
},
"execution_count": 40,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"totalByUserRDD = purchasesRDD.mapValues(lambda x: x[1])\n",
"totalByUserRDD.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "eGrjPmToMTK_"
},
"source": [
"Poi usiamo il metodo reduceByKey per sommare i valori ed ottenere la spesa totale dell'utente all'interno dell'app."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 916,
"status": "ok",
"timestamp": 1592261693550,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "mj2zG1MWMBkK",
"outputId": "e1f02461-532a-4dcd-db34-1c3e1386a7de"
},
"outputs": [
{
"data": {
"text/plain": [
"[('peanut', 0.89), ('guizard', 7.87), ('bitleader', 5.37), ('lightlord', 9.98)]"
]
},
"execution_count": 41,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"totalByUserRDD = totalByUserRDD.reduceByKey(lambda x,y: x+y)\n",
"totalByUserRDD.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "6oDvZKQcMZZZ"
},
"source": [
"E se volessimo sapere l'entrate totali delle app ? Dovremmo sommare la spesa di tutti gli utenti\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 921,
"status": "ok",
"timestamp": 1592261701487,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "t6Qd4yzXMBiQ",
"outputId": "c390c752-adad-4b06-b7ce-104fcf156cd3"
},
"outputs": [
{
"data": {
"text/plain": [
"24.11"
]
},
"execution_count": 42,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"total = totalByUserRDD.map(lambda x: x[1]).reduce(lambda x,y: x+y)\n",
"total"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "hxLUIkkKMcn1"
},
"source": [
"## Ordinamento di un RDD\n",
"\n",
"Il metodo .sortBy(func) ci permette di ordinare gli elementi di un RDD in base ad una delle sue proprietà o ad una funzione specificata da noi. Questo metodo non richiede di avere i dati in formato chiave valore, quindi è sempre utilizzabile.\n",
"Utilizziamolo per ordinare l'RDD in base all'importo speso da ogni utente.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 1101,
"status": "ok",
"timestamp": 1592261738288,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "mfYL25mhMBfd",
"outputId": "6823083b-ee18-4552-f507-f8b642b0208b"
},
"outputs": [
{
"data": {
"text/plain": [
"[('peanut', 0.89), ('bitleader', 5.37), ('guizard', 7.87), ('lightlord', 9.98)]"
]
},
"execution_count": 43,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"totalByUserSortedRDD = totalByUserRDD.sortBy(lambda x: x[1])\n",
"totalByUserSortedRDD.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "i54QIpDGMifL"
},
"source": [
"Come vedi la lista va dal minore (0.89) al maggiore (9.98), possiamo invertire l'ordine di ordinamento impostando il parametro ascending a False."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 33
},
"colab_type": "code",
"executionInfo": {
"elapsed": 850,
"status": "ok",
"timestamp": 1592261759801,
"user": {
"displayName": "T3Lab Vision",
"photoUrl": "",
"userId": "14779383426442114373"
},
"user_tz": -120
},
"id": "GwptqWL7MBdN",
"outputId": "df70fd20-d464-457b-c357-63880a97b9b4"
},
"outputs": [
{
"data": {
"text/plain": [
"[('lightlord', 9.98), ('guizard', 7.87), ('bitleader', 5.37), ('peanut', 0.89)]"
]
},
"execution_count": 44,
"metadata": {
"tags": []
},
"output_type": "execute_result"
}
],
"source": [
"totalByUserSortedRDD = totalByUserRDD.sortBy(lambda x: x[1], ascending=False)\n",
"totalByUserSortedRDD.collect()"
]
}
],
"metadata": {
"colab": {
"authorship_tag": "ABX9TyO2aU4kgBpXCR3p6WtohzAo",
"collapsed_sections": [],
"name": "Pyspark_RDD.ipynb",
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
}
},
"nbformat": 4,
"nbformat_minor": 1
}