From 7a4521b809744591e3632d0e32ee180b8283e6fc Mon Sep 17 00:00:00 2001
From: TYLER CARAZA-HARTER <tharter@cs544-tharter.cs.wisc.edu>
Date: Tue, 18 Mar 2025 10:04:24 -0500
Subject: [PATCH] lec 24 demos

---
 lec/22-spark/nb/lec1.ipynb | 538 +++++++++++++++++++++++++++++++++-
 lec/22-spark/nb/lec2.ipynb | 574 ++++++++++++++++++++++++++++++++++++-
 2 files changed, 1097 insertions(+), 15 deletions(-)

diff --git a/lec/22-spark/nb/lec1.ipynb b/lec/22-spark/nb/lec1.ipynb
index 59c8968..68a7739 100644
--- a/lec/22-spark/nb/lec1.ipynb
+++ b/lec/22-spark/nb/lec1.ipynb
@@ -2,7 +2,7 @@
  "cells": [
   {
    "cell_type": "code",
-   "execution_count": 1,
+   "execution_count": 11,
    "id": "c8dca847-54af-4284-97d8-0682e88a6e8d",
    "metadata": {},
    "outputs": [
@@ -12,7 +12,7 @@
      "text": [
       "Setting default log level to \"WARN\".\n",
       "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
-      "25/03/14 13:51:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
+      "25/03/17 14:23:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
      ]
     }
    ],
@@ -3245,9 +3245,17 @@
     "\"\"\").explain(\"formatted\")"
    ]
   },
+  {
+   "cell_type": "markdown",
+   "id": "c5d2a9e4-4dfb-461d-ae82-84b672b3f060",
+   "metadata": {},
+   "source": [
+    "# Local Joins"
+   ]
+  },
   {
    "cell_type": "code",
-   "execution_count": 88,
+   "execution_count": 1,
    "id": "09288abb-f691-4135-9c98-3dcbd35d2aa0",
    "metadata": {},
    "outputs": [],
@@ -3257,7 +3265,7 @@
   },
   {
    "cell_type": "code",
-   "execution_count": 91,
+   "execution_count": 2,
    "id": "597e459d-7d1d-4e50-899f-a8bf5ffbca1c",
    "metadata": {},
    "outputs": [],
@@ -3282,7 +3290,7 @@
   },
   {
    "cell_type": "code",
-   "execution_count": 93,
+   "execution_count": 3,
    "id": "389c4231-f58d-45ec-ab04-125df0461714",
    "metadata": {},
    "outputs": [
@@ -3292,7 +3300,7 @@
        "{'A': 'Apple', 'B': 'Banana', 'C': 'Carrot'}"
       ]
      },
-     "execution_count": 93,
+     "execution_count": 3,
      "metadata": {},
      "output_type": "execute_result"
     }
@@ -3304,7 +3312,7 @@
   },
   {
    "cell_type": "code",
-   "execution_count": 96,
+   "execution_count": 4,
    "id": "81bc095c-e9da-4f58-84f3-ed3a4852e389",
    "metadata": {},
    "outputs": [
@@ -3328,7 +3336,7 @@
   },
   {
    "cell_type": "code",
-   "execution_count": 89,
+   "execution_count": 5,
    "id": "dbda9312-21b1-42f0-9039-dfdde0528bf7",
    "metadata": {},
    "outputs": [],
@@ -3338,7 +3346,7 @@
   },
   {
    "cell_type": "code",
-   "execution_count": 97,
+   "execution_count": 6,
    "id": "8ff96927-bf93-4fc7-a030-0e97a2c92c27",
    "metadata": {},
    "outputs": [
@@ -3353,7 +3361,7 @@
        " ('C', 'Purple')]"
       ]
      },
-     "execution_count": 97,
+     "execution_count": 6,
      "metadata": {},
      "output_type": "execute_result"
     }
@@ -3366,7 +3374,28 @@
   },
   {
    "cell_type": "code",
-   "execution_count": 101,
+   "execution_count": 8,
+   "id": "471d5273-c54e-4bf5-aa0a-aeaf6504481b",
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "[('A', 'Apple'), ('B', 'Banana'), ('C', 'Carrot')]"
+      ]
+     },
+     "execution_count": 8,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "kinds"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 9,
    "id": "ba147116-c980-45ca-bc16-8bb05abacbde",
    "metadata": {},
    "outputs": [
@@ -3384,11 +3413,492 @@
     }
    ],
    "source": [
-    "# TODO: don't start the fruit loop at the beginning each time\n",
+    "fruit_idx = 0\n",
     "for kind in kinds:\n",
-    "    for fruit in fruits:\n",
+    "    while fruit_idx < len(fruits):\n",
+    "        fruit = fruits[fruit_idx]\n",
     "        if kind[0] == fruit[0]:\n",
-    "            print(fruit[1], kind[1])"
+    "            print(fruit[1], kind[1])\n",
+    "        elif fruit[0] > kind[0]:\n",
+    "            break\n",
+    "        fruit_idx += 1"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "7187463b-bd97-4fe6-afcb-70c46a75e6c7",
+   "metadata": {},
+   "source": [
+    "# Machine Learning"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 12,
+   "id": "7dbe60b4-9a2b-4f9c-8e74-88eccaf2bcc6",
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "DataFrame[x1: double, x2: double, y: double]"
+      ]
+     },
+     "execution_count": 12,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "import pandas as pd\n",
+    "import numpy as np\n",
+    "df = pd.DataFrame({\"x1\": np.random.randint(0, 10, 100).astype(float), \n",
+    "                   \"x2\": np.random.randint(0, 3, 100).astype(float)})\n",
+    "df[\"y\"] = df[\"x1\"] + df[\"x2\"] + np.random.rand(len(df))\n",
+    "df = spark.createDataFrame(df)\n",
+    "df"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 23,
+   "id": "4c628bc4-072d-4988-bb21-f4c36ce22883",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "train, test = df.randomSplit([0.75, 0.25], seed=500)\n",
+    "#train.show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 24,
+   "id": "3e387f9f-fb9c-4c03-9ba1-c8345ed13126",
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "(76, 24)"
+      ]
+     },
+     "execution_count": 24,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "train.count(), test.count()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 25,
+   "id": "4c65f3f3-b279-416f-b720-db6c29ab296e",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stderr",
+     "output_type": "stream",
+     "text": [
+      "                                                                                "
+     ]
+    }
+   ],
+   "source": [
+    "train.write.format(\"parquet\").mode(\"ignore\").save(\"hdfs://nn:9000/train.parquet\")\n",
+    "test.write.format(\"parquet\").mode(\"ignore\").save(\"hdfs://nn:9000/test.parquet\")"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 26,
+   "id": "2dcc55e4-81d8-4ad3-b7cc-f6830c09888b",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "train = spark.read.format(\"parquet\").load(\"hdfs://nn:9000/train.parquet\")\n",
+    "test = spark.read.format(\"parquet\").load(\"hdfs://nn:9000/test.parquet\")"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 28,
+   "id": "e5ff46a6-3bdf-4218-af62-2c34ca00a725",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+---+---+------------------+\n",
+      "| x1| x2|                 y|\n",
+      "+---+---+------------------+\n",
+      "|0.0|1.0|1.4040414970076673|\n",
+      "|1.0|0.0|1.4184373417804137|\n",
+      "|2.0|0.0|2.0571564174643573|\n",
+      "|2.0|0.0| 2.738817606905206|\n",
+      "|4.0|0.0| 4.351893618224956|\n",
+      "+---+---+------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "train.limit(5).show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 29,
+   "id": "d4d8dfa7-3bad-42ce-a1c1-351c91fc3b06",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from pyspark.ml.regression import DecisionTreeRegressor       # unfit model\n",
+    "from pyspark.ml.regression import DecisionTreeRegressorModel  # fit model"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 32,
+   "id": "567b440b-5fcd-426b-80a7-2929105ff145",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from pyspark.ml.feature import VectorAssembler"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 36,
+   "id": "20f6a9a8-ae64-4275-8839-d06aed76ead0",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+---+---+------------------+---------+\n",
+      "| x1| x2|                 y| features|\n",
+      "+---+---+------------------+---------+\n",
+      "|0.0|1.0|1.4040414970076673|[0.0,1.0]|\n",
+      "|1.0|0.0|1.4184373417804137|[1.0,0.0]|\n",
+      "|2.0|0.0|2.0571564174643573|[2.0,0.0]|\n",
+      "|2.0|0.0| 2.738817606905206|[2.0,0.0]|\n",
+      "|4.0|0.0| 4.351893618224956|[4.0,0.0]|\n",
+      "+---+---+------------------+---------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "va = VectorAssembler(inputCols=[\"x1\", \"x2\"], outputCol=\"features\")\n",
+    "va.transform(train).limit(5).show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 37,
+   "id": "c1f3af58-0781-4686-82c8-c537257595d2",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stderr",
+     "output_type": "stream",
+     "text": [
+      "                                                                                "
+     ]
+    }
+   ],
+   "source": [
+    "dt = DecisionTreeRegressor(featuresCol=\"features\", labelCol=\"y\")\n",
+    "model = dt.fit(va.transform(train))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 38,
+   "id": "2cb9d1e9-27dc-41f7-8d27-9f73d87cd805",
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "(pyspark.ml.regression.DecisionTreeRegressor,\n",
+       " pyspark.ml.regression.DecisionTreeRegressionModel)"
+      ]
+     },
+     "execution_count": 38,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "type(dt), type(model)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 42,
+   "id": "df176bed-374f-4663-b0d3-68fe549afd35",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+---+---+------------------+---------+------------------+\n",
+      "| x1| x2|                 y| features|        prediction|\n",
+      "+---+---+------------------+---------+------------------+\n",
+      "|0.0|1.0| 1.825532164689176|[0.0,1.0]|1.4040414970076673|\n",
+      "|0.0|2.0|2.3378982019998977|[0.0,2.0]|2.5233870027882084|\n",
+      "|1.0|0.0|1.6481475984177445|[1.0,0.0]|1.6492029178176435|\n",
+      "|1.0|0.0|1.7915461186682566|[1.0,0.0]|1.6492029178176435|\n",
+      "|1.0|1.0|2.6028920463022995|[1.0,1.0]|2.7683675845769957|\n",
+      "+---+---+------------------+---------+------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "# model.transform(test)   # need the same formatted data as when we trained\n",
+    "model.transform(va.transform(test)).limit(5).show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 44,
+   "id": "275627c5-ccd1-4b8d-9c7d-7961f2bda773",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from pyspark.ml.pipeline import Pipeline      # unfit\n",
+    "from pyspark.ml.pipeline import PipelineModel # fitted"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 46,
+   "id": "2ac72a6d-9928-48a0-88b2-90b61f13bafb",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "pipe = Pipeline(stages=[va, dt])\n",
+    "model = pipe.fit(train)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 47,
+   "id": "1c98e894-a0ea-4919-b664-93fe2079ef4c",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+---+---+------------------+---------+------------------+\n",
+      "| x1| x2|                 y| features|        prediction|\n",
+      "+---+---+------------------+---------+------------------+\n",
+      "|0.0|1.0| 1.825532164689176|[0.0,1.0]|1.4040414970076673|\n",
+      "|0.0|2.0|2.3378982019998977|[0.0,2.0]|2.5233870027882084|\n",
+      "|1.0|0.0|1.6481475984177445|[1.0,0.0]|1.6492029178176435|\n",
+      "|1.0|0.0|1.7915461186682566|[1.0,0.0]|1.6492029178176435|\n",
+      "|1.0|1.0|2.6028920463022995|[1.0,1.0]|2.7683675845769957|\n",
+      "+---+---+------------------+---------+------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "model.transform(test).limit(5).show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 52,
+   "id": "eae7eb14-fe65-43d8-9896-980aa63dd7e2",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "DecisionTreeRegressionModel: uid=DecisionTreeRegressor_a71faf54b217, depth=5, numNodes=51, numFeatures=2\n",
+      "  If (feature 0 <= 4.5)\n",
+      "   If (feature 0 <= 1.5)\n",
+      "    If (feature 1 <= 0.5)\n",
+      "     If (feature 0 <= 0.5)\n",
+      "      Predict: 0.45968856929425694\n",
+      "     Else (feature 0 > 0.5)\n",
+      "      Predict: 1.6492029178176435\n",
+      "    Else (feature 1 > 0.5)\n",
+      "     If (feature 0 <= 0.5)\n",
+      "      If (feature 1 <= 1.5)\n",
+      "       Predict: 1.4040414970076673\n",
+      "      Else (feature 1 > 1.5)\n",
+      "       Predict: 2.5233870027882084\n",
+      "     Else (feature 0 > 0.5)\n",
+      "      If (feature 1 <= 1.5)\n",
+      "       Predict: 2.7683675845769957\n",
+      "      Else (feature 1 > 1.5)\n",
+      "       Predict: 3.5316623010447774\n",
+      "   Else (feature 0 > 1.5)\n",
+      "    If (feature 1 <= 0.5)\n",
+      "     If (feature 0 <= 2.5)\n",
+      "      Predict: 2.3979870121847817\n",
+      "     Else (feature 0 > 2.5)\n",
+      "      If (feature 0 <= 3.5)\n",
+      "       Predict: 3.5133904895373607\n",
+      "      Else (feature 0 > 3.5)\n",
+      "       Predict: 4.317093345475164\n",
+      "    Else (feature 1 > 0.5)\n",
+      "     If (feature 0 <= 2.5)\n",
+      "      If (feature 1 <= 1.5)\n",
+      "       Predict: 3.6707752377825416\n",
+      "      Else (feature 1 > 1.5)\n",
+      "       Predict: 4.6983808565300755\n",
+      "     Else (feature 0 > 2.5)\n",
+      "      If (feature 1 <= 1.5)\n",
+      "       Predict: 5.3954773862665135\n",
+      "      Else (feature 1 > 1.5)\n",
+      "       Predict: 6.2509689582932975\n",
+      "  Else (feature 0 > 4.5)\n",
+      "   If (feature 0 <= 6.5)\n",
+      "    If (feature 1 <= 1.5)\n",
+      "     If (feature 0 <= 5.5)\n",
+      "      If (feature 1 <= 0.5)\n",
+      "       Predict: 5.527235382083779\n",
+      "      Else (feature 1 > 0.5)\n",
+      "       Predict: 6.315046161076428\n",
+      "     Else (feature 0 > 5.5)\n",
+      "      If (feature 1 <= 0.5)\n",
+      "       Predict: 6.376190776816699\n",
+      "      Else (feature 1 > 0.5)\n",
+      "       Predict: 7.576965189743188\n",
+      "    Else (feature 1 > 1.5)\n",
+      "     If (feature 0 <= 5.5)\n",
+      "      Predict: 7.812773028725937\n",
+      "     Else (feature 0 > 5.5)\n",
+      "      Predict: 8.233809052748645\n",
+      "   Else (feature 0 > 6.5)\n",
+      "    If (feature 1 <= 0.5)\n",
+      "     If (feature 0 <= 8.5)\n",
+      "      If (feature 0 <= 7.5)\n",
+      "       Predict: 7.885401379590637\n",
+      "      Else (feature 0 > 7.5)\n",
+      "       Predict: 8.525778987202642\n",
+      "     Else (feature 0 > 8.5)\n",
+      "      Predict: 9.604192552646033\n",
+      "    Else (feature 1 > 0.5)\n",
+      "     If (feature 0 <= 7.5)\n",
+      "      If (feature 1 <= 1.5)\n",
+      "       Predict: 8.533554015788187\n",
+      "      Else (feature 1 > 1.5)\n",
+      "       Predict: 9.55312286519982\n",
+      "     Else (feature 0 > 7.5)\n",
+      "      If (feature 1 <= 1.5)\n",
+      "       Predict: 10.000188475606585\n",
+      "      Else (feature 1 > 1.5)\n",
+      "       Predict: 10.409642251373787\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "print(model.stages[1].toDebugString)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 55,
+   "id": "72930355-914f-4890-b037-b3d9bc63d9a2",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stderr",
+     "output_type": "stream",
+     "text": [
+      "                                                                                "
+     ]
+    }
+   ],
+   "source": [
+    "model.write().overwrite().save(\"hdfs://nn:9000/model\")"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 58,
+   "id": "52de063d-3d97-4510-9fc0-3de20bb87e71",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "Found 2 items\n",
+      "drwxr-xr-x   - root supergroup          0 2025-03-17 14:38 hdfs://nn:9000/model/stages/0_VectorAssembler_f70504ebcb7d\n",
+      "drwxr-xr-x   - root supergroup          0 2025-03-17 14:38 hdfs://nn:9000/model/stages/1_DecisionTreeRegressor_a71faf54b217\n"
+     ]
+    }
+   ],
+   "source": [
+    "!hdfs dfs -ls hdfs://nn:9000/model/stages"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 60,
+   "id": "ada9e676-df19-4b32-a153-5885d6ca7b98",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "model = PipelineModel.load(\"hdfs://nn:9000/model\")"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 61,
+   "id": "eb0039d5-0c8b-4bf6-b3b2-1c055f375448",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+---+---+------------------+---------+-------------------+\n",
+      "| x1| x2|                 y| features|         prediction|\n",
+      "+---+---+------------------+---------+-------------------+\n",
+      "|0.0|1.0| 1.825532164689176|[0.0,1.0]| 1.4040414970076673|\n",
+      "|0.0|2.0|2.3378982019998977|[0.0,2.0]| 2.5233870027882084|\n",
+      "|1.0|0.0|1.6481475984177445|[1.0,0.0]| 1.6492029178176435|\n",
+      "|1.0|0.0|1.7915461186682566|[1.0,0.0]| 1.6492029178176435|\n",
+      "|1.0|1.0|2.6028920463022995|[1.0,1.0]| 2.7683675845769957|\n",
+      "|3.0|2.0| 5.350290084086203|[3.0,2.0]| 6.2509689582932975|\n",
+      "|6.0|1.0| 7.520628154018157|[6.0,1.0]|  7.576965189743188|\n",
+      "|8.0|2.0|10.690777404180885|[8.0,2.0]| 10.409642251373787|\n",
+      "|9.0|1.0|10.406420567186862|[9.0,1.0]| 10.000188475606585|\n",
+      "|0.0|0.0|0.6282897829715238|(2,[],[])|0.45968856929425694|\n",
+      "|2.0|1.0| 3.961294224851377|[2.0,1.0]| 3.6707752377825416|\n",
+      "|4.0|0.0|  4.83443596090177|[4.0,0.0]|  4.317093345475164|\n",
+      "|5.0|0.0| 5.982564448517017|[5.0,0.0]|  5.527235382083779|\n",
+      "|5.0|1.0| 6.984174014959795|[5.0,1.0]|  6.315046161076428|\n",
+      "|6.0|1.0| 7.935274195582237|[6.0,1.0]|  7.576965189743188|\n",
+      "|8.0|1.0| 9.990683484052727|[8.0,1.0]| 10.000188475606585|\n",
+      "|2.0|1.0|3.8856373417168104|[2.0,1.0]| 3.6707752377825416|\n",
+      "|2.0|2.0| 4.823994914064544|[2.0,2.0]| 4.6983808565300755|\n",
+      "|6.0|2.0| 8.555804264666113|[6.0,2.0]|  8.233809052748645|\n",
+      "|8.0|0.0|  8.59804463859586|[8.0,0.0]|  8.525778987202642|\n",
+      "+---+---+------------------+---------+-------------------+\n",
+      "only showing top 20 rows\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "model.transform(test).show()"
    ]
   }
  ],
diff --git a/lec/22-spark/nb/lec2.ipynb b/lec/22-spark/nb/lec2.ipynb
index 7c7a234..09af800 100644
--- a/lec/22-spark/nb/lec2.ipynb
+++ b/lec/22-spark/nb/lec2.ipynb
@@ -12,7 +12,7 @@
      "text": [
       "Setting default log level to \"WARN\".\n",
       "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
-      "25/03/14 15:30:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
+      "25/03/17 15:53:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
      ]
     }
    ],
@@ -3273,6 +3273,578 @@
     "GROUP BY Call_Date\n",
     "\"\"\").explain(\"formatted\")"
    ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "92613c0f-b4de-47d2-9438-d050dd44cd9a",
+   "metadata": {},
+   "source": [
+    "# Join"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 3,
+   "id": "64e5b7bd-34b3-4f6a-a44e-353fd1b2dfc4",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# kind_id, color\n",
+    "fruits = [\n",
+    "    (\"B\", \"Yellow\"),\n",
+    "    (\"A\", \"Green\"),\n",
+    "    (\"C\", \"Orange\"),\n",
+    "    (\"A\", \"Red\"),\n",
+    "    (\"C\", \"Purple\"),\n",
+    "    (\"B\", \"Green\")\n",
+    "]\n",
+    "\n",
+    "# kind_id, name (assume no duplicate kind_id's)\n",
+    "kinds = [\n",
+    "    (\"A\", \"Apple\"),\n",
+    "    (\"B\", \"Banana\"),\n",
+    "    (\"C\", \"Carrot\")\n",
+    "]"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 4,
+   "id": "74d9f86c-38ba-4f7c-903f-daa3e31b351a",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# example 1: hash join (hash table is the data struct behind a Python dict)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 9,
+   "id": "9ec93d2a-fc2d-4f93-b995-73ec5aaaf34a",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "Yellow Banana\n",
+      "Green Apple\n",
+      "Orange Carrot\n",
+      "Red Apple\n",
+      "Purple Carrot\n",
+      "Green Banana\n"
+     ]
+    }
+   ],
+   "source": [
+    "kind_lookup = dict(kinds)\n",
+    "for fruit in fruits:\n",
+    "    print(fruit[1], kind_lookup[fruit[0]])"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 5,
+   "id": "f9592686-27ca-47a2-9ed6-d0ac8b648e8b",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# example 2: sort merge join"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 11,
+   "id": "22c7a6f3-16be-4a9a-ba29-fe04fd4274d7",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "fruits.sort()\n",
+    "kinds.sort()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 12,
+   "id": "5bccd1e0-9c62-48cc-b2bc-2b68dcd67b34",
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "[('A', 'Green'),\n",
+       " ('A', 'Red'),\n",
+       " ('B', 'Green'),\n",
+       " ('B', 'Yellow'),\n",
+       " ('C', 'Orange'),\n",
+       " ('C', 'Purple')]"
+      ]
+     },
+     "execution_count": 12,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "fruits"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 13,
+   "id": "c10df79d-4ef4-4abb-b1c7-0ce95a296412",
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "[('A', 'Apple'), ('B', 'Banana'), ('C', 'Carrot')]"
+      ]
+     },
+     "execution_count": 13,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "kinds"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 18,
+   "id": "d4df28cc-4399-4758-afe6-84e599103d06",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "#fruit_idx = 0\n",
+    "for kind in kinds:\n",
+    "    while fruit_idx < len(fruits):\n",
+    "        fruit = fruits[fruit_idx]\n",
+    "        if kind[0] == fruit[0]:\n",
+    "            print(fruit[1], kind[1])\n",
+    "        if fruit[0] > kind[0]:\n",
+    "            break\n",
+    "        fruit_idx += 1"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "f54af56e-1d81-429c-b1dc-e1fa22d5f634",
+   "metadata": {},
+   "source": [
+    "# Machine Learning"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 19,
+   "id": "5e1fa832-5b29-4013-a14a-5e4f1a559007",
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "DataFrame[x1: double, x2: double, y: double]"
+      ]
+     },
+     "execution_count": 19,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "import pandas as pd\n",
+    "import numpy as np\n",
+    "df = pd.DataFrame({\"x1\": np.random.randint(0, 10, 100).astype(float), \n",
+    "                   \"x2\": np.random.randint(0, 3, 100).astype(float)})\n",
+    "df[\"y\"] = df[\"x1\"] + df[\"x2\"] + np.random.rand(len(df))\n",
+    "df = spark.createDataFrame(df)\n",
+    "df"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 20,
+   "id": "82c444af-2fcb-4fd8-8a8a-b5aa1be2f97e",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stderr",
+     "output_type": "stream",
+     "text": [
+      "                                                                                "
+     ]
+    },
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+---+---+------------------+\n",
+      "| x1| x2|                 y|\n",
+      "+---+---+------------------+\n",
+      "|8.0|2.0|10.459743603898675|\n",
+      "|3.0|2.0| 5.858137063702627|\n",
+      "|5.0|2.0| 7.165570076269624|\n",
+      "+---+---+------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "df.limit(3).show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 32,
+   "id": "1aefdb03-ac66-47ec-8801-6e36155fae0c",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+---+---+------------------+\n",
+      "| x1| x2|                 y|\n",
+      "+---+---+------------------+\n",
+      "|0.0|2.0|2.4986684050549606|\n",
+      "|1.0|0.0|1.2313666339378964|\n",
+      "|1.0|1.0| 2.112597498714278|\n",
+      "+---+---+------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "train, test = df.randomSplit([0.75, 0.25], seed=500)\n",
+    "train.limit(3).show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 33,
+   "id": "94d0a75d-1d16-43fe-b52e-b2218ff0f139",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stderr",
+     "output_type": "stream",
+     "text": [
+      "                                                                                "
+     ]
+    },
+    {
+     "data": {
+      "text/plain": [
+       "(76, 24)"
+      ]
+     },
+     "execution_count": 33,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "train.count(), test.count()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 36,
+   "id": "e9ce8383-78cf-4d7b-bbca-56ef6b92f711",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "train.write.format(\"parquet\").mode(\"ignore\").save(\"hdfs://nn:9000/train.parquet\")\n",
+    "test.write.format(\"parquet\").mode(\"ignore\").save(\"hdfs://nn:9000/test.parquet\")"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 38,
+   "id": "f4663892-586a-4840-8bb6-dba83f1530e1",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stderr",
+     "output_type": "stream",
+     "text": [
+      "                                                                                "
+     ]
+    }
+   ],
+   "source": [
+    "train = spark.read.format(\"parquet\").load(\"hdfs://nn:9000/train.parquet\")\n",
+    "test = spark.read.format(\"parquet\").load(\"hdfs://nn:9000/test.parquet\")"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 43,
+   "id": "d33d4f0d-784c-48fe-8c79-adc3e5a4e8ce",
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "DataFrame[x1: double, x2: double, y: double]"
+      ]
+     },
+     "execution_count": 43,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "train"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 45,
+   "id": "a5e8cd78-2184-44f9-b6bd-81a85a72e2fc",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from pyspark.ml.feature import VectorAssembler"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 50,
+   "id": "704606aa-2459-4530-a4c8-e5ec77154cf9",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "va = VectorAssembler(inputCols=[\"x1\", \"x2\"], outputCol=\"features\")\n",
+    "#va.transform(test).limit(15).show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 41,
+   "id": "a907fbea-3520-459a-8feb-4c7c4c826141",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from pyspark.ml.regression import DecisionTreeRegressor       # unfit model\n",
+    "from pyspark.ml.regression import DecisionTreeRegressionModel # fitted model"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 52,
+   "id": "41385841-0e10-4609-8902-818f8160b1c6",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "dt = DecisionTreeRegressor(featuresCol=\"features\", labelCol=\"y\")\n",
+    "model = dt.fit(va.transform(train))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 53,
+   "id": "f5f4bc3c-e256-4a61-a902-3a105ef4f214",
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "(pyspark.ml.regression.DecisionTreeRegressor,\n",
+       " pyspark.ml.regression.DecisionTreeRegressionModel)"
+      ]
+     },
+     "execution_count": 53,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "type(dt), type(model)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 55,
+   "id": "ceb989a5-b33c-415c-b738-e6907d1e5da7",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+---+---+------------------+---------+-------------------+\n",
+      "| x1| x2|                 y| features|         prediction|\n",
+      "+---+---+------------------+---------+-------------------+\n",
+      "|0.0|1.0| 1.825532164689176|[0.0,1.0]| 1.4040414970076673|\n",
+      "|0.0|2.0|2.3378982019998977|[0.0,2.0]| 2.5233870027882084|\n",
+      "|1.0|0.0|1.6481475984177445|[1.0,0.0]| 1.6492029178176435|\n",
+      "|1.0|0.0|1.7915461186682566|[1.0,0.0]| 1.6492029178176435|\n",
+      "|1.0|1.0|2.6028920463022995|[1.0,1.0]| 2.7683675845769957|\n",
+      "|3.0|2.0| 5.350290084086203|[3.0,2.0]| 6.2509689582932975|\n",
+      "|6.0|1.0| 7.520628154018157|[6.0,1.0]|  7.576965189743189|\n",
+      "|8.0|2.0|10.690777404180885|[8.0,2.0]| 10.409642251373787|\n",
+      "|9.0|1.0|10.406420567186862|[9.0,1.0]| 10.000188475606585|\n",
+      "|0.0|0.0|0.6282897829715238|(2,[],[])|0.45968856929425694|\n",
+      "+---+---+------------------+---------+-------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "model.transform(va.transform(test)).limit(10).show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 56,
+   "id": "13b7ccf4-6744-4985-b950-67c731e3d99f",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from pyspark.ml.pipeline import Pipeline      # unfit\n",
+    "from pyspark.ml.pipeline import PipelineModel # fitted"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 59,
+   "id": "30bd9a61-ecd6-40b2-b7e5-03b7b527cc50",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "pipe = Pipeline(stages=[va, dt])\n",
+    "model = pipe.fit(train)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 61,
+   "id": "6bdf936b-a133-466b-b438-fdde79635c3c",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+---+---+------------------+---------+-------------------+\n",
+      "| x1| x2|                 y| features|         prediction|\n",
+      "+---+---+------------------+---------+-------------------+\n",
+      "|0.0|1.0| 1.825532164689176|[0.0,1.0]| 1.4040414970076673|\n",
+      "|0.0|2.0|2.3378982019998977|[0.0,2.0]| 2.5233870027882084|\n",
+      "|1.0|0.0|1.6481475984177445|[1.0,0.0]| 1.6492029178176435|\n",
+      "|1.0|0.0|1.7915461186682566|[1.0,0.0]| 1.6492029178176435|\n",
+      "|1.0|1.0|2.6028920463022995|[1.0,1.0]| 2.7683675845769957|\n",
+      "|3.0|2.0| 5.350290084086203|[3.0,2.0]| 6.2509689582932975|\n",
+      "|6.0|1.0| 7.520628154018157|[6.0,1.0]|  7.576965189743188|\n",
+      "|8.0|2.0|10.690777404180885|[8.0,2.0]| 10.409642251373787|\n",
+      "|9.0|1.0|10.406420567186862|[9.0,1.0]| 10.000188475606585|\n",
+      "|0.0|0.0|0.6282897829715238|(2,[],[])|0.45968856929425694|\n",
+      "+---+---+------------------+---------+-------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "model.transform(test).limit(10).show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 65,
+   "id": "f0a89e75-9aae-4784-9dab-67fec0e8e152",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "DecisionTreeRegressionModel: uid=DecisionTreeRegressor_e9be75a2602a, depth=5, numNodes=51, numFeatures=2\n",
+      "  If (feature 0 <= 4.5)\n",
+      "   If (feature 0 <= 1.5)\n",
+      "    If (feature 1 <= 0.5)\n",
+      "     If (feature 0 <= 0.5)\n",
+      "      Predict: 0.45968856929425694\n",
+      "     Else (feature 0 > 0.5)\n",
+      "      Predict: 1.6492029178176435\n",
+      "    Else (feature 1 > 0.5)\n",
+      "     If (feature 0 <= 0.5)\n",
+      "      If (feature 1 <= 1.5)\n",
+      "       Predict: 1.4040414970076673\n",
+      "      Else (feature 1 > 1.5)\n",
+      "       Predict: 2.5233870027882084\n",
+      "     Else (feature 0 > 0.5)\n",
+      "      If (feature 1 <= 1.5)\n",
+      "       Predict: 2.7683675845769957\n",
+      "      Else (feature 1 > 1.5)\n",
+      "       Predict: 3.5316623010447774\n",
+      "   Else (feature 0 > 1.5)\n",
+      "    If (feature 1 <= 0.5)\n",
+      "     If (feature 0 <= 2.5)\n",
+      "      Predict: 2.3979870121847817\n",
+      "     Else (feature 0 > 2.5)\n",
+      "      If (feature 0 <= 3.5)\n",
+      "       Predict: 3.5133904895373607\n",
+      "      Else (feature 0 > 3.5)\n",
+      "       Predict: 4.317093345475164\n",
+      "    Else (feature 1 > 0.5)\n",
+      "     If (feature 0 <= 2.5)\n",
+      "      If (feature 1 <= 1.5)\n",
+      "       Predict: 3.6707752377825416\n",
+      "      Else (feature 1 > 1.5)\n",
+      "       Predict: 4.6983808565300755\n",
+      "     Else (feature 0 > 2.5)\n",
+      "      If (feature 1 <= 1.5)\n",
+      "       Predict: 5.3954773862665135\n",
+      "      Else (feature 1 > 1.5)\n",
+      "       Predict: 6.2509689582932975\n",
+      "  Else (feature 0 > 4.5)\n",
+      "   If (feature 0 <= 6.5)\n",
+      "    If (feature 1 <= 1.5)\n",
+      "     If (feature 0 <= 5.5)\n",
+      "      If (feature 1 <= 0.5)\n",
+      "       Predict: 5.527235382083779\n",
+      "      Else (feature 1 > 0.5)\n",
+      "       Predict: 6.315046161076428\n",
+      "     Else (feature 0 > 5.5)\n",
+      "      If (feature 1 <= 0.5)\n",
+      "       Predict: 6.376190776816699\n",
+      "      Else (feature 1 > 0.5)\n",
+      "       Predict: 7.576965189743188\n",
+      "    Else (feature 1 > 1.5)\n",
+      "     If (feature 0 <= 5.5)\n",
+      "      Predict: 7.812773028725937\n",
+      "     Else (feature 0 > 5.5)\n",
+      "      Predict: 8.233809052748645\n",
+      "   Else (feature 0 > 6.5)\n",
+      "    If (feature 1 <= 0.5)\n",
+      "     If (feature 0 <= 8.5)\n",
+      "      If (feature 0 <= 7.5)\n",
+      "       Predict: 7.885401379590637\n",
+      "      Else (feature 0 > 7.5)\n",
+      "       Predict: 8.525778987202642\n",
+      "     Else (feature 0 > 8.5)\n",
+      "      Predict: 9.604192552646033\n",
+      "    Else (feature 1 > 0.5)\n",
+      "     If (feature 0 <= 7.5)\n",
+      "      If (feature 1 <= 1.5)\n",
+      "       Predict: 8.533554015788187\n",
+      "      Else (feature 1 > 1.5)\n",
+      "       Predict: 9.55312286519982\n",
+      "     Else (feature 0 > 7.5)\n",
+      "      If (feature 1 <= 1.5)\n",
+      "       Predict: 10.000188475606585\n",
+      "      Else (feature 1 > 1.5)\n",
+      "       Predict: 10.409642251373787\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "print(model.stages[1].toDebugString)"
+   ]
   }
  ],
  "metadata": {
-- 
GitLab