{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "e7fed83d-4fd4-4370-a619-db07eb78df21",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "25/03/09 17:52:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "spark = (SparkSession.builder.appName(\"cs544\")\n",
    "         .master(\"spark://boss:7077\")\n",
    "         .config(\"spark.executor.memory\", \"512M\")\n",
    "         .getOrCreate())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "192912cb-d5b3-494e-8b93-e8bc17562680",
   "metadata": {},
   "outputs": [],
   "source": [
    "sc = spark.sparkContext # provides direct RDD access"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "3a4b48c6-5173-41ac-8640-dcd52b712108",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[0, 1, 2, 3, 4]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "nums = list(range(0, 10_000_000))\n",
    "nums[:5]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "9b776732-09d8-4686-9ce4-15e83344118d",
   "metadata": {},
   "outputs": [],
   "source": [
    "rdd = sc.parallelize(nums)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "8b04b8f1-2fe7-4ffd-801c-b69d5cbb0978",
   "metadata": {},
   "outputs": [],
   "source": [
    "inverses = rdd.map(lambda x: 1/x) # TRANSFORMATION (lazy)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "49f94295-d242-4b43-aeb0-61fe7319fcaf",
   "metadata": {},
   "outputs": [],
   "source": [
    "# head = inverses.take(10) # ACTION (actually does the work)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "28a6ec21-419c-4db6-9f88-51b481e12838",
   "metadata": {},
   "outputs": [],
   "source": [
    "# inverses.mean() # ACTION"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "f78e9613-9dd6-45a5-9bf2-e171c9e4b0d1",
   "metadata": {},
   "outputs": [],
   "source": [
    "inverses = rdd.filter(lambda x: x > 0).map(lambda x: 1/x) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "ae670238-47cf-4d6f-a299-18c1d1b1a536",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "25/03/09 17:59:52 WARN TaskSetManager: Stage 3 contains a task of very large size (12152 KiB). The maximum recommended task size is 1000 KiB.\n",
      "                                                                                "
     ]
    },
    {
     "data": {
      "text/plain": [
       "1.669531293539298e-06"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# 4 tasks, 0 are done, and 4 are in progress\n",
    "# [Stage 3:>                                                          (0 + 4) / 4]\n",
    "inverses.mean()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "627099bd-69f1-41f9-81ca-31f73eaae90c",
   "metadata": {},
   "outputs": [],
   "source": [
    "# inverses.collect() # ACTION: be careful, if it's too big, we could run out of memory!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "f336fdc8-a273-49ad-b23c-73f14ccb48d4",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "4"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd.getNumPartitions()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "id": "3655dbe6-b8ee-4f02-863a-ebb3a48b6614",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    },
    {
     "data": {
      "text/plain": [
       "1.6695312935391358e-06"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# [Stage 5:======================>                                  (20 + 4) / 50]\n",
    "rdd = sc.parallelize(nums, 50)\n",
    "inverses = rdd.filter(lambda x: x > 0).map(lambda x: 1/x)\n",
    "inverses.mean()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "id": "c81343cd-83ef-4cae-b5ce-d2eba0b055f1",
   "metadata": {},
   "outputs": [],
   "source": [
    "import time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "id": "754b4356-8026-4e03-91be-82436a8759f3",
   "metadata": {},
   "outputs": [],
   "source": [
    "sample = rdd.sample(True, 0.01) # TRANSFORMATION"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "id": "1735153d-6a9b-45f9-bac7-674853b9f274",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 6:=================================================>       (43 + 4) / 50]"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "4995621.997385424\n",
      "1.9789588451385498\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    }
   ],
   "source": [
    "# 1st without cache\n",
    "t0 = time.time()\n",
    "print(sample.mean())\n",
    "t1 = time.time()\n",
    "print(t1-t0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "id": "0496d02b-6dd5-4df7-97cf-60b286217af4",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 7:===================================================>     (45 + 4) / 50]"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "4995621.997385424\n",
      "1.9979238510131836\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    }
   ],
   "source": [
    "# 2nd without cache\n",
    "t0 = time.time()\n",
    "print(sample.mean())\n",
    "t1 = time.time()\n",
    "print(t1-t0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "id": "2051edd6-92f5-4c5c-a8a1-1547bfeeb0c8",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "PythonRDD[11] at RDD at PythonRDD.scala:53"
      ]
     },
     "execution_count": 25,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sample.cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "id": "c50ed49f-07f7-494d-885b-56de91a0c554",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 8:==================================================>      (44 + 4) / 50]"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "4995621.997385424\n",
      "2.9729838371276855\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    }
   ],
   "source": [
    "# 1st with cache\n",
    "t0 = time.time()\n",
    "print(sample.mean())\n",
    "t1 = time.time()\n",
    "print(t1-t0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "id": "3d602170-51b3-4dbf-a4ba-3a242d9b9ba2",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 9:===================================>                     (31 + 4) / 50]"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "4995621.997385424\n",
      "1.2610077857971191\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    }
   ],
   "source": [
    "# 2st with cache\n",
    "t0 = time.time()\n",
    "print(sample.mean())\n",
    "t1 = time.time()\n",
    "print(t1-t0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "id": "3f0c6999-f48c-4895-876e-8c60eff5a74d",
   "metadata": {},
   "outputs": [],
   "source": [
    "sample = rdd.sample(True, 0.01).repartition(4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "id": "e3d61398-bb60-4f7f-8594-ab876b1aa51b",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 10:====================================================>   (47 + 3) / 50]"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "5005973.481531257\n",
      "3.3820056915283203\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    }
   ],
   "source": [
    "# 1st with cache, and fewer partitions\n",
    "t0 = time.time()\n",
    "print(sample.mean())\n",
    "t1 = time.time()\n",
    "print(t1-t0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "id": "201504ef-5bb4-4436-acfd-03e28c5bfd92",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "5005973.481531262\n",
      "0.2796463966369629\n"
     ]
    }
   ],
   "source": [
    "# 2nd with cache, and fewer partitions\n",
    "t0 = time.time()\n",
    "print(sample.mean())\n",
    "t1 = time.time()\n",
    "print(t1-t0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b47df2f0-98c8-46e5-8c88-2f3600188edb",
   "metadata": {},
   "outputs": [],
   "source": [
    "# note about determinism\n",
    "\n",
    "# ideally: given the same seed, sample the same way\n",
    "# in reality: even with seed, you can get different results if the number of input partitions changes\n",
    "# suggestion: be careful, and save your sampled results in a diff file to ensure determinism!\n",
    "sample = rdd.sample(True, 0.01, 544)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "93036a5e-2066-461e-ba12-8bc549294bde",
   "metadata": {},
   "source": [
    "# Spark DataFrames: reading a text file\n",
    "\n",
    "Spark DataFrames build on Spark SQL, which builds on Spark RDDs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "id": "9141f941-fdaa-4d40-a744-9b831cb84e2b",
   "metadata": {},
   "outputs": [],
   "source": [
    "# !wget https://pages.cs.wisc.edu/~harter/cs544/data/ghcnd-stations.txt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "175cb54b-eb63-4a2e-9877-5a4b1d5d12bf",
   "metadata": {},
   "outputs": [],
   "source": [
    "# read/write:\n",
    "# spark.read.????.????.load() or .text()\n",
    "# df.write.????.????.saveTable()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "id": "8e79a508-4dec-45d1-9dd3-58da5b83063c",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "/nb/ghcnd-stations.txt\n"
     ]
    }
   ],
   "source": [
    "! ls /nb/ghcnd-stations.txt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "id": "7d4516f2-9802-490a-9fca-22111624b63d",
   "metadata": {},
   "outputs": [],
   "source": [
    "# df = spark.read.text(\"ghcnd-stations.txt\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "id": "970432fa-981b-4824-bced-95eeef96afb5",
   "metadata": {},
   "outputs": [],
   "source": [
    "# df.take(3)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "id": "fa910718-36f0-41e1-9ff8-40754a9ae707",
   "metadata": {},
   "outputs": [],
   "source": [
    "!hdfs dfs -cp ghcnd-stations.txt hdfs://nn:9000/ghcnd-stations.txt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "id": "df70d8a3-f302-4663-bcda-adee88155131",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Found 1 items\n",
      "-rw-r--r--   3 root supergroup   10607756 2025-03-09 18:51 hdfs://nn:9000/ghcnd-stations.txt\n"
     ]
    }
   ],
   "source": [
    "!hdfs dfs -ls hdfs://nn:9000/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "id": "12493c05-e5f6-4120-a104-8b5fedf1a623",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read.text(\"hdfs://nn:9000/ghcnd-stations.txt\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "id": "eefc522f-7e8d-4cb6-8785-8064a8938c9c",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    },
    {
     "data": {
      "text/plain": [
       "[Row(value='ACW00011604  17.1167  -61.7833   10.1    ST JOHNS COOLIDGE FLD                       '),\n",
       " Row(value='ACW00011647  17.1333  -61.7833   19.2    ST JOHNS                                    '),\n",
       " Row(value='AE000041196  25.3330   55.5170   34.0    SHARJAH INTER. AIRP            GSN     41196')]"
      ]
     },
     "execution_count": 48,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.take(3)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "id": "65842e71-e8d3-4196-a217-005e98af68ae",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "pyspark.sql.dataframe.DataFrame"
      ]
     },
     "execution_count": 49,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "type(df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "id": "ee8e9d09-6896-4257-899b-2e549e01337a",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "pyspark.rdd.RDD"
      ]
     },
     "execution_count": 51,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "type(df.rdd)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 54,
   "id": "252fa425-9d8c-4792-8e26-e4ffd384a3c5",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'ACW00011604  17.1167  -61.7833   10.1    ST JOHNS COOLIDGE FLD                       '"
      ]
     },
     "execution_count": 54,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.take(1)[0].value"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 55,
   "id": "80d499d6-552e-4afa-ac38-111d31eb6d14",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['ACW00011604',\n",
       " 'ACW00011647',\n",
       " 'AE000041196',\n",
       " 'AEM00041194',\n",
       " 'AEM00041217',\n",
       " 'AEM00041218',\n",
       " 'AF000040930',\n",
       " 'AFM00040938',\n",
       " 'AFM00040948',\n",
       " 'AFM00040990']"
      ]
     },
     "execution_count": 55,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# first 11 characters is the station name\n",
    "df.rdd.map(lambda row: row.value[:11]).take(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 56,
   "id": "10f9c364-5516-409d-8893-4e55bd031d98",
   "metadata": {},
   "outputs": [],
   "source": [
    "# how would we do this in Pandas?  Extract the station names, add that as a column"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 58,
   "id": "2ce8af7b-b4da-4e71-9c5a-a99487d2010e",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>value</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>ACW00011604  17.1167  -61.7833   10.1    ST JO...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>ACW00011647  17.1333  -61.7833   19.2    ST JO...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>AE000041196  25.3330   55.5170   34.0    SHARJ...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>AEM00041194  25.2550   55.3640   10.4    DUBAI...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>AEM00041217  24.4330   54.6510   26.8    ABU D...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>5</th>\n",
       "      <td>AEM00041218  24.2620   55.6090  264.9    AL AI...</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                                               value\n",
       "0  ACW00011604  17.1167  -61.7833   10.1    ST JO...\n",
       "1  ACW00011647  17.1333  -61.7833   19.2    ST JO...\n",
       "2  AE000041196  25.3330   55.5170   34.0    SHARJ...\n",
       "3  AEM00041194  25.2550   55.3640   10.4    DUBAI...\n",
       "4  AEM00041217  24.4330   54.6510   26.8    ABU D...\n",
       "5  AEM00041218  24.2620   55.6090  264.9    AL AI..."
      ]
     },
     "execution_count": 58,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "pandas_df = df.limit(6).toPandas()\n",
    "pandas_df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 63,
   "id": "29be9b4f-0a70-4b29-bd59-111098da36dd",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>value</th>\n",
       "      <th>station</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>ACW00011604  17.1167  -61.7833   10.1    ST JO...</td>\n",
       "      <td>ACW00011604</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>ACW00011647  17.1333  -61.7833   19.2    ST JO...</td>\n",
       "      <td>ACW00011647</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>AE000041196  25.3330   55.5170   34.0    SHARJ...</td>\n",
       "      <td>AE000041196</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>AEM00041194  25.2550   55.3640   10.4    DUBAI...</td>\n",
       "      <td>AEM00041194</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>AEM00041217  24.4330   54.6510   26.8    ABU D...</td>\n",
       "      <td>AEM00041217</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>5</th>\n",
       "      <td>AEM00041218  24.2620   55.6090  264.9    AL AI...</td>\n",
       "      <td>AEM00041218</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                                               value      station\n",
       "0  ACW00011604  17.1167  -61.7833   10.1    ST JO...  ACW00011604\n",
       "1  ACW00011647  17.1333  -61.7833   19.2    ST JO...  ACW00011647\n",
       "2  AE000041196  25.3330   55.5170   34.0    SHARJ...  AE000041196\n",
       "3  AEM00041194  25.2550   55.3640   10.4    DUBAI...  AEM00041194\n",
       "4  AEM00041217  24.4330   54.6510   26.8    ABU D...  AEM00041217\n",
       "5  AEM00041218  24.2620   55.6090  264.9    AL AI...  AEM00041218"
      ]
     },
     "execution_count": 63,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "pandas_df[\"station\"] = pandas_df[\"value\"].str[:11]\n",
    "pandas_df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 64,
   "id": "664669e9-2624-4710-b58e-8303af3e1b07",
   "metadata": {},
   "outputs": [],
   "source": [
    "# not allowed, because df wraps rdd, which is immutable!\n",
    "# df[\"station\"] = ????"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 65,
   "id": "465d5d75-4720-488d-b3fa-60e4246eca16",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import col, expr"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 66,
   "id": "516597a7-fb56-456a-bef6-304ff9cc25f5",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Column<'x'>"
      ]
     },
     "execution_count": 66,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "col(\"x\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 67,
   "id": "db232b87-d14f-43ba-9536-39c1aad1fac5",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Column<'(x + 1)'>"
      ]
     },
     "execution_count": 67,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "col(\"x\") + 1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 68,
   "id": "1ceae7a8-f355-4e5e-9857-ce451997e0f1",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Column<'(x + 1)'>"
      ]
     },
     "execution_count": 68,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "expr(\"x + 1\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 69,
   "id": "08fc588a-97c5-4908-8eb0-2631b3c4bf8f",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Column<'(x + 1) AS plusone'>"
      ]
     },
     "execution_count": 69,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "expr(\"x + 1\").alias(\"plusone\") # similar to SQL \"AS\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 70,
   "id": "93e8e7e9-c9fa-4f72-b086-3e31d3d3c31d",
   "metadata": {},
   "outputs": [],
   "source": [
    "df2 = df.withColumn(\"station\", expr(\"substring(value, 0, 11)\")) # transformation!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 71,
   "id": "f4d812d8-f1fa-4090-bbf2-590463e69c6d",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[value: string]"
      ]
     },
     "execution_count": 71,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 72,
   "id": "f6ca7f4b-31ae-468c-adf8-0d1d6166820e",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[value: string, station: string]"
      ]
     },
     "execution_count": 72,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 73,
   "id": "7f2a04c3-dc21-4dbd-8490-dff279a20df6",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----------+\n",
      "|               value|    station|\n",
      "+--------------------+-----------+\n",
      "|ACW00011604  17.1...|ACW00011604|\n",
      "|ACW00011647  17.1...|ACW00011647|\n",
      "|AE000041196  25.3...|AE000041196|\n",
      "|AEM00041194  25.2...|AEM00041194|\n",
      "|AEM00041217  24.4...|AEM00041217|\n",
      "|AEM00041218  24.2...|AEM00041218|\n",
      "|AF000040930  35.3...|AF000040930|\n",
      "|AFM00040938  34.2...|AFM00040938|\n",
      "|AFM00040948  34.5...|AFM00040948|\n",
      "|AFM00040990  31.5...|AFM00040990|\n",
      "|AG000060390  36.7...|AG000060390|\n",
      "|AG000060590  30.5...|AG000060590|\n",
      "|AG000060611  28.0...|AG000060611|\n",
      "|AG000060680  22.8...|AG000060680|\n",
      "|AGE00135039  35.7...|AGE00135039|\n",
      "|AGE00147704  36.9...|AGE00147704|\n",
      "|AGE00147705  36.7...|AGE00147705|\n",
      "|AGE00147706  36.8...|AGE00147706|\n",
      "|AGE00147707  36.8...|AGE00147707|\n",
      "|AGE00147708  36.7...|AGE00147708|\n",
      "+--------------------+-----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df2.show() # action"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 76,
   "id": "afaff554-089f-43cd-85c3-8f2af5384545",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>value</th>\n",
       "      <th>station</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>ACW00011604  17.1167  -61.7833   10.1    ST JO...</td>\n",
       "      <td>ACW00011604</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>ACW00011647  17.1333  -61.7833   19.2    ST JO...</td>\n",
       "      <td>ACW00011647</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>AE000041196  25.3330   55.5170   34.0    SHARJ...</td>\n",
       "      <td>AE000041196</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>AEM00041194  25.2550   55.3640   10.4    DUBAI...</td>\n",
       "      <td>AEM00041194</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>AEM00041217  24.4330   54.6510   26.8    ABU D...</td>\n",
       "      <td>AEM00041217</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>5</th>\n",
       "      <td>AEM00041218  24.2620   55.6090  264.9    AL AI...</td>\n",
       "      <td>AEM00041218</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>6</th>\n",
       "      <td>AF000040930  35.3170   69.0170 3366.0    NORTH...</td>\n",
       "      <td>AF000040930</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>7</th>\n",
       "      <td>AFM00040938  34.2100   62.2280  977.2    HERAT...</td>\n",
       "      <td>AFM00040938</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>8</th>\n",
       "      <td>AFM00040948  34.5660   69.2120 1791.3    KABUL...</td>\n",
       "      <td>AFM00040948</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>9</th>\n",
       "      <td>AFM00040990  31.5000   65.8500 1010.0    KANDA...</td>\n",
       "      <td>AFM00040990</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                                               value      station\n",
       "0  ACW00011604  17.1167  -61.7833   10.1    ST JO...  ACW00011604\n",
       "1  ACW00011647  17.1333  -61.7833   19.2    ST JO...  ACW00011647\n",
       "2  AE000041196  25.3330   55.5170   34.0    SHARJ...  AE000041196\n",
       "3  AEM00041194  25.2550   55.3640   10.4    DUBAI...  AEM00041194\n",
       "4  AEM00041217  24.4330   54.6510   26.8    ABU D...  AEM00041217\n",
       "5  AEM00041218  24.2620   55.6090  264.9    AL AI...  AEM00041218\n",
       "6  AF000040930  35.3170   69.0170 3366.0    NORTH...  AF000040930\n",
       "7  AFM00040938  34.2100   62.2280  977.2    HERAT...  AFM00040938\n",
       "8  AFM00040948  34.5660   69.2120 1791.3    KABUL...  AFM00040948\n",
       "9  AFM00040990  31.5000   65.8500 1010.0    KANDA...  AFM00040990"
      ]
     },
     "execution_count": 76,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df2.limit(10).toPandas() # limit is a transformation, toPandas is the action!"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f49d5318-f826-4fa0-832e-dc05be8c9426",
   "metadata": {},
   "source": [
    "# Spark: CSVs and Parquet"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 78,
   "id": "f2a328d7-982e-4e47-b145-2a2a79ef744e",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "--2025-03-10 01:40:01--  https://pages.cs.wisc.edu/~harter/cs544/data/sf.zip\n",
      "Resolving pages.cs.wisc.edu (pages.cs.wisc.edu)... 128.105.7.9\n",
      "Connecting to pages.cs.wisc.edu (pages.cs.wisc.edu)|128.105.7.9|:443... connected.\n",
      "200 OKequest sent, awaiting response... \n",
      "Length: 534803160 (510M) [application/zip]\n",
      "Saving to: ‘sf.zip’\n",
      "\n",
      "sf.zip              100%[===================>] 510.03M  21.5MB/s    in 26s     \n",
      "\n",
      "2025-03-10 01:40:27 (19.8 MB/s) - ‘sf.zip’ saved [534803160/534803160]\n",
      "\n"
     ]
    }
   ],
   "source": [
    "! wget https://pages.cs.wisc.edu/~harter/cs544/data/sf.zip"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 82,
   "id": "9b7b7bce-6d2e-4278-829a-a97c1b491195",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Archive:  sf.zip\n",
      "  inflating: sf.csv                  \n"
     ]
    }
   ],
   "source": [
    "! unzip sf.zip"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 83,
   "id": "cc7d6890-85d5-45f2-844e-644c9b6a21f2",
   "metadata": {},
   "outputs": [],
   "source": [
    "! hdfs dfs -cp sf.csv hdfs://nn:9000/sf.csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 84,
   "id": "3d0d1512-1dcb-4b09-9ace-6348791ae0b5",
   "metadata": {},
   "outputs": [],
   "source": [
    "# read/write:\n",
    "# spark.read.????.????.load() or .text()\n",
    "# df.write.????.????.saveTable()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 90,
   "id": "494488f6-6186-4400-a830-89e7528e1357",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 32:>                                                         (0 + 1) / 1]"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 2.54 ms, sys: 2.23 ms, total: 4.76 ms\n",
      "Wall time: 3.37 s\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    }
   ],
   "source": [
    "%%time\n",
    "df = spark.read.format(\"csv\").option(\"header\", True).load(\"hdfs://nn:9000/sf.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 87,
   "id": "61599f7d-6b07-4ad1-baa3-ddea0b2091c7",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 27:=================================================>      (15 + 2) / 17]"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 8.32 ms, sys: 1.02 ms, total: 9.34 ms\n",
      "Wall time: 4.31 s\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    },
    {
     "data": {
      "text/plain": [
       "6016057"
      ]
     },
     "execution_count": 87,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "%%time\n",
    "df.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 91,
   "id": "19af30b2-327a-49eb-bfe9-3777b2ae5120",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>Call Number</th>\n",
       "      <th>Unit ID</th>\n",
       "      <th>Incident Number</th>\n",
       "      <th>Call Type</th>\n",
       "      <th>Call Date</th>\n",
       "      <th>Watch Date</th>\n",
       "      <th>Received DtTm</th>\n",
       "      <th>Entry DtTm</th>\n",
       "      <th>Dispatch DtTm</th>\n",
       "      <th>Response DtTm</th>\n",
       "      <th>...</th>\n",
       "      <th>Call Type Group</th>\n",
       "      <th>Number of Alarms</th>\n",
       "      <th>Unit Type</th>\n",
       "      <th>Unit sequence in call dispatch</th>\n",
       "      <th>Fire Prevention District</th>\n",
       "      <th>Supervisor District</th>\n",
       "      <th>Neighborhooods - Analysis Boundaries</th>\n",
       "      <th>RowID</th>\n",
       "      <th>case_location</th>\n",
       "      <th>Analysis Neighborhoods</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>221210313</td>\n",
       "      <td>E36</td>\n",
       "      <td>22054955</td>\n",
       "      <td>Outside Fire</td>\n",
       "      <td>05/01/2022</td>\n",
       "      <td>04/30/2022</td>\n",
       "      <td>05/01/2022 02:58:25 AM</td>\n",
       "      <td>05/01/2022 02:59:15 AM</td>\n",
       "      <td>05/01/2022 02:59:25 AM</td>\n",
       "      <td>05/01/2022 03:01:06 AM</td>\n",
       "      <td>...</td>\n",
       "      <td>Fire</td>\n",
       "      <td>1</td>\n",
       "      <td>ENGINE</td>\n",
       "      <td>1</td>\n",
       "      <td>2</td>\n",
       "      <td>5</td>\n",
       "      <td>Hayes Valley</td>\n",
       "      <td>221210313-E36</td>\n",
       "      <td>POINT (-122.42316555403964 37.77781524520032)</td>\n",
       "      <td>9</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>220190150</td>\n",
       "      <td>E29</td>\n",
       "      <td>22008871</td>\n",
       "      <td>Alarms</td>\n",
       "      <td>01/19/2022</td>\n",
       "      <td>01/18/2022</td>\n",
       "      <td>01/19/2022 01:42:12 AM</td>\n",
       "      <td>01/19/2022 01:44:13 AM</td>\n",
       "      <td>01/19/2022 01:44:28 AM</td>\n",
       "      <td>01/19/2022 01:46:47 AM</td>\n",
       "      <td>...</td>\n",
       "      <td>Alarm</td>\n",
       "      <td>1</td>\n",
       "      <td>ENGINE</td>\n",
       "      <td>1</td>\n",
       "      <td>3</td>\n",
       "      <td>10</td>\n",
       "      <td>Potrero Hill</td>\n",
       "      <td>220190150-E29</td>\n",
       "      <td>POINT (-122.39469970274361 37.76460987856451)</td>\n",
       "      <td>26</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>211233271</td>\n",
       "      <td>T07</td>\n",
       "      <td>21053032</td>\n",
       "      <td>Alarms</td>\n",
       "      <td>05/03/2021</td>\n",
       "      <td>05/03/2021</td>\n",
       "      <td>05/03/2021 09:28:12 PM</td>\n",
       "      <td>05/03/2021 09:28:12 PM</td>\n",
       "      <td>05/03/2021 09:28:17 PM</td>\n",
       "      <td>05/03/2021 09:29:10 PM</td>\n",
       "      <td>...</td>\n",
       "      <td>Alarm</td>\n",
       "      <td>1</td>\n",
       "      <td>TRUCK</td>\n",
       "      <td>2</td>\n",
       "      <td>2</td>\n",
       "      <td>9</td>\n",
       "      <td>Mission</td>\n",
       "      <td>211233271-T07</td>\n",
       "      <td>POINT (-122.42057572093252 37.76418194637148)</td>\n",
       "      <td>20</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>212933533</td>\n",
       "      <td>B02</td>\n",
       "      <td>21127914</td>\n",
       "      <td>Alarms</td>\n",
       "      <td>10/20/2021</td>\n",
       "      <td>10/20/2021</td>\n",
       "      <td>10/20/2021 10:08:47 PM</td>\n",
       "      <td>10/20/2021 10:09:53 PM</td>\n",
       "      <td>10/20/2021 10:10:07 PM</td>\n",
       "      <td>10/20/2021 10:11:55 PM</td>\n",
       "      <td>...</td>\n",
       "      <td>Alarm</td>\n",
       "      <td>1</td>\n",
       "      <td>CHIEF</td>\n",
       "      <td>3</td>\n",
       "      <td>3</td>\n",
       "      <td>6</td>\n",
       "      <td>Tenderloin</td>\n",
       "      <td>212933533-B02</td>\n",
       "      <td>POINT (-122.41243514072728 37.78347684038771)</td>\n",
       "      <td>36</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>221202543</td>\n",
       "      <td>E41</td>\n",
       "      <td>22054815</td>\n",
       "      <td>Alarms</td>\n",
       "      <td>04/30/2022</td>\n",
       "      <td>04/30/2022</td>\n",
       "      <td>04/30/2022 06:35:58 PM</td>\n",
       "      <td>04/30/2022 06:37:28 PM</td>\n",
       "      <td>04/30/2022 06:37:43 PM</td>\n",
       "      <td>04/30/2022 06:38:17 PM</td>\n",
       "      <td>...</td>\n",
       "      <td>Alarm</td>\n",
       "      <td>1</td>\n",
       "      <td>ENGINE</td>\n",
       "      <td>4</td>\n",
       "      <td>4</td>\n",
       "      <td>2</td>\n",
       "      <td>Russian Hill</td>\n",
       "      <td>221202543-E41</td>\n",
       "      <td>POINT (-122.4233369425531 37.799534868680034)</td>\n",
       "      <td>32</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "<p>5 rows × 35 columns</p>\n",
       "</div>"
      ],
      "text/plain": [
       "  Call Number Unit ID Incident Number     Call Type   Call Date  Watch Date  \\\n",
       "0   221210313     E36        22054955  Outside Fire  05/01/2022  04/30/2022   \n",
       "1   220190150     E29        22008871        Alarms  01/19/2022  01/18/2022   \n",
       "2   211233271     T07        21053032        Alarms  05/03/2021  05/03/2021   \n",
       "3   212933533     B02        21127914        Alarms  10/20/2021  10/20/2021   \n",
       "4   221202543     E41        22054815        Alarms  04/30/2022  04/30/2022   \n",
       "\n",
       "            Received DtTm              Entry DtTm           Dispatch DtTm  \\\n",
       "0  05/01/2022 02:58:25 AM  05/01/2022 02:59:15 AM  05/01/2022 02:59:25 AM   \n",
       "1  01/19/2022 01:42:12 AM  01/19/2022 01:44:13 AM  01/19/2022 01:44:28 AM   \n",
       "2  05/03/2021 09:28:12 PM  05/03/2021 09:28:12 PM  05/03/2021 09:28:17 PM   \n",
       "3  10/20/2021 10:08:47 PM  10/20/2021 10:09:53 PM  10/20/2021 10:10:07 PM   \n",
       "4  04/30/2022 06:35:58 PM  04/30/2022 06:37:28 PM  04/30/2022 06:37:43 PM   \n",
       "\n",
       "            Response DtTm  ... Call Type Group Number of Alarms Unit Type  \\\n",
       "0  05/01/2022 03:01:06 AM  ...            Fire                1    ENGINE   \n",
       "1  01/19/2022 01:46:47 AM  ...           Alarm                1    ENGINE   \n",
       "2  05/03/2021 09:29:10 PM  ...           Alarm                1     TRUCK   \n",
       "3  10/20/2021 10:11:55 PM  ...           Alarm                1     CHIEF   \n",
       "4  04/30/2022 06:38:17 PM  ...           Alarm                1    ENGINE   \n",
       "\n",
       "  Unit sequence in call dispatch Fire Prevention District Supervisor District  \\\n",
       "0                              1                        2                   5   \n",
       "1                              1                        3                  10   \n",
       "2                              2                        2                   9   \n",
       "3                              3                        3                   6   \n",
       "4                              4                        4                   2   \n",
       "\n",
       "  Neighborhooods - Analysis Boundaries          RowID  \\\n",
       "0                         Hayes Valley  221210313-E36   \n",
       "1                         Potrero Hill  220190150-E29   \n",
       "2                              Mission  211233271-T07   \n",
       "3                           Tenderloin  212933533-B02   \n",
       "4                         Russian Hill  221202543-E41   \n",
       "\n",
       "                                   case_location Analysis Neighborhoods  \n",
       "0  POINT (-122.42316555403964 37.77781524520032)                      9  \n",
       "1  POINT (-122.39469970274361 37.76460987856451)                     26  \n",
       "2  POINT (-122.42057572093252 37.76418194637148)                     20  \n",
       "3  POINT (-122.41243514072728 37.78347684038771)                     36  \n",
       "4  POINT (-122.4233369425531 37.799534868680034)                     32  \n",
       "\n",
       "[5 rows x 35 columns]"
      ]
     },
     "execution_count": 91,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.limit(5).toPandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 92,
   "id": "9685c540-f710-4c91-ae79-d3ea75f72201",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('Call Number', 'string'),\n",
       " ('Unit ID', 'string'),\n",
       " ('Incident Number', 'string'),\n",
       " ('Call Type', 'string'),\n",
       " ('Call Date', 'string'),\n",
       " ('Watch Date', 'string'),\n",
       " ('Received DtTm', 'string'),\n",
       " ('Entry DtTm', 'string'),\n",
       " ('Dispatch DtTm', 'string'),\n",
       " ('Response DtTm', 'string'),\n",
       " ('On Scene DtTm', 'string'),\n",
       " ('Transport DtTm', 'string'),\n",
       " ('Hospital DtTm', 'string'),\n",
       " ('Call Final Disposition', 'string'),\n",
       " ('Available DtTm', 'string'),\n",
       " ('Address', 'string'),\n",
       " ('City', 'string'),\n",
       " ('Zipcode of Incident', 'string'),\n",
       " ('Battalion', 'string'),\n",
       " ('Station Area', 'string'),\n",
       " ('Box', 'string'),\n",
       " ('Original Priority', 'string'),\n",
       " ('Priority', 'string'),\n",
       " ('Final Priority', 'string'),\n",
       " ('ALS Unit', 'string'),\n",
       " ('Call Type Group', 'string'),\n",
       " ('Number of Alarms', 'string'),\n",
       " ('Unit Type', 'string'),\n",
       " ('Unit sequence in call dispatch', 'string'),\n",
       " ('Fire Prevention District', 'string'),\n",
       " ('Supervisor District', 'string'),\n",
       " ('Neighborhooods - Analysis Boundaries', 'string'),\n",
       " ('RowID', 'string'),\n",
       " ('case_location', 'string'),\n",
       " ('Analysis Neighborhoods', 'string')]"
      ]
     },
     "execution_count": 92,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.dtypes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 93,
   "id": "50c40bca-2cde-4d40-abdb-398e4e1b50e3",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 35:====================================================>   (16 + 1) / 17]"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 7.83 ms, sys: 3.52 ms, total: 11.4 ms\n",
      "Wall time: 10.9 s\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    }
   ],
   "source": [
    "%%time\n",
    "df = spark.read.format(\"csv\").option(\"header\", True).option(\"inferSchema\", True).load(\"hdfs://nn:9000/sf.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 94,
   "id": "577e1773-4308-45b8-9ddc-c74f43f4c3f8",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('Call Number', 'int'),\n",
       " ('Unit ID', 'string'),\n",
       " ('Incident Number', 'int'),\n",
       " ('Call Type', 'string'),\n",
       " ('Call Date', 'string'),\n",
       " ('Watch Date', 'string'),\n",
       " ('Received DtTm', 'string'),\n",
       " ('Entry DtTm', 'string'),\n",
       " ('Dispatch DtTm', 'string'),\n",
       " ('Response DtTm', 'string'),\n",
       " ('On Scene DtTm', 'string'),\n",
       " ('Transport DtTm', 'string'),\n",
       " ('Hospital DtTm', 'string'),\n",
       " ('Call Final Disposition', 'string'),\n",
       " ('Available DtTm', 'string'),\n",
       " ('Address', 'string'),\n",
       " ('City', 'string'),\n",
       " ('Zipcode of Incident', 'int'),\n",
       " ('Battalion', 'string'),\n",
       " ('Station Area', 'string'),\n",
       " ('Box', 'string'),\n",
       " ('Original Priority', 'string'),\n",
       " ('Priority', 'string'),\n",
       " ('Final Priority', 'int'),\n",
       " ('ALS Unit', 'boolean'),\n",
       " ('Call Type Group', 'string'),\n",
       " ('Number of Alarms', 'int'),\n",
       " ('Unit Type', 'string'),\n",
       " ('Unit sequence in call dispatch', 'int'),\n",
       " ('Fire Prevention District', 'string'),\n",
       " ('Supervisor District', 'string'),\n",
       " ('Neighborhooods - Analysis Boundaries', 'string'),\n",
       " ('RowID', 'string'),\n",
       " ('case_location', 'string'),\n",
       " ('Analysis Neighborhoods', 'int')]"
      ]
     },
     "execution_count": 94,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.dtypes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 96,
   "id": "3f5725e7-2e67-4685-8780-b999c0f7aa17",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "17"
      ]
     },
     "execution_count": 96,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.rdd.getNumPartitions()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 97,
   "id": "5a599aa5-48f4-46ce-8ee2-84d57d4453e1",
   "metadata": {},
   "outputs": [],
   "source": [
    "# example 1: how can we cleanup the strings (upper case), and get date types"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 100,
   "id": "699bb766-6ba5-40a6-9a18-c292f37f8066",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>Call Type</th>\n",
       "      <th>Call Date</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>Outside Fire</td>\n",
       "      <td>05/01/2022</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>Alarms</td>\n",
       "      <td>01/19/2022</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>Alarms</td>\n",
       "      <td>05/03/2021</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "      Call Type   Call Date\n",
       "0  Outside Fire  05/01/2022\n",
       "1        Alarms  01/19/2022\n",
       "2        Alarms  05/03/2021"
      ]
     },
     "execution_count": 100,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.select(\"Call Type\", \"Call Date\").limit(3).toPandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 107,
   "id": "b306d906-0bd7-4cbf-b0fa-856ded1f0725",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>Call_Type</th>\n",
       "      <th>Call_Date</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>OUTSIDE FIRE</td>\n",
       "      <td>2022-05-01</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>ALARMS</td>\n",
       "      <td>2022-01-19</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>ALARMS</td>\n",
       "      <td>2021-05-03</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "      Call_Type   Call_Date\n",
       "0  OUTSIDE FIRE  2022-05-01\n",
       "1        ALARMS  2022-01-19\n",
       "2        ALARMS  2021-05-03"
      ]
     },
     "execution_count": 107,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyspark.sql.functions import col, expr\n",
    "df.select(\n",
    "    expr(\"upper(`Call Type`)\").alias(\"Call_Type\"),\n",
    "    expr(\"to_date(`Call Date`, 'MM/dd/yyyy')\").alias(\"Call_Date\")\n",
    ").limit(3).toPandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 108,
   "id": "f6b0d597-ffcf-44e7-8817-81e850435104",
   "metadata": {},
   "outputs": [],
   "source": [
    "# example 2: convert the CSV to Parquet, with no spaces in the column names"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 111,
   "id": "ba66cbd3-8692-4dfd-b95c-c934f689660d",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Column<'Call Number AS Call_Number'>"
      ]
     },
     "execution_count": 111,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "col(\"Call Number\").alias(\"Call_Number\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 122,
   "id": "c1503637-6e74-4211-91d0-24881b666ccb",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    }
   ],
   "source": [
    "(\n",
    "    df\n",
    "    .select([col(c).alias(c.replace(\" \", \"_\")) for c in df.columns])\n",
    "    .write\n",
    "    .mode(\"overwrite\")\n",
    "    .format(\"parquet\")\n",
    "    .save(\"hdfs://nn:9000/sf.parquet\")\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 123,
   "id": "9842dfa1-b43e-4890-9873-fff11ce76b8a",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Found 18 items\n",
      "-rw-r--r--   3 root supergroup          0 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/_SUCCESS\n",
      "-rw-r--r--   3 root supergroup   27806510 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00000-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   27789781 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00001-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   40478442 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00002-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   36017328 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00003-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   36033379 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00004-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   36082202 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00005-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   35944952 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00006-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   35912043 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00007-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   36436328 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00008-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   35368134 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00009-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   34238988 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00010-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   33948649 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00011-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   33488640 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00012-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   34900131 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00013-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   35715813 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00014-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   35769206 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00015-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n",
      "-rw-r--r--   3 root supergroup   29363174 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00016-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n"
     ]
    }
   ],
   "source": [
    "! hdfs dfs -ls hdfs://nn:9000/sf.parquet/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 124,
   "id": "cef40087-bfd5-43c2-9c6d-9e6644a7079d",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 2.69 ms, sys: 960 μs, total: 3.65 ms\n",
      "Wall time: 231 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "df = spark.read.format(\"parquet\").load(\"hdfs://nn:9000/sf.parquet\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 125,
   "id": "ccc39826-d6f1-433a-a005-269d8fa88e9f",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 719 μs, sys: 993 μs, total: 1.71 ms\n",
      "Wall time: 443 ms\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "6016056"
      ]
     },
     "execution_count": 125,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "%%time\n",
    "df.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 126,
   "id": "574746ea-5e08-4585-be3c-e6d151262e20",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[Call_Number: int, Unit_ID: string, Incident_Number: int, Call_Type: string, Call_Date: string, Watch_Date: string, Received_DtTm: string, Entry_DtTm: string, Dispatch_DtTm: string, Response_DtTm: string, On_Scene_DtTm: string, Transport_DtTm: string, Hospital_DtTm: string, Call_Final_Disposition: string, Available_DtTm: string, Address: string, City: string, Zipcode_of_Incident: int, Battalion: string, Station_Area: string, Box: string, Original_Priority: string, Priority: string, Final_Priority: int, ALS_Unit: boolean, Call_Type_Group: string, Number_of_Alarms: int, Unit_Type: string, Unit_sequence_in_call_dispatch: int, Fire_Prevention_District: string, Supervisor_District: string, Neighborhooods_-_Analysis_Boundaries: string, RowID: string, case_location: string, Analysis_Neighborhoods: int]"
      ]
     },
     "execution_count": 126,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 127,
   "id": "67086ea3-b1e9-4e17-8cbe-acd155371d80",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "6"
      ]
     },
     "execution_count": 127,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.rdd.getNumPartitions()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}