{ "cells": [ { "cell_type": "code", "execution_count": 1, "id": "e7fed83d-4fd4-4370-a619-db07eb78df21", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "25/03/09 17:52:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" ] } ], "source": [ "from pyspark.sql import SparkSession\n", "spark = (SparkSession.builder.appName(\"cs544\")\n", " .master(\"spark://boss:7077\")\n", " .config(\"spark.executor.memory\", \"512M\")\n", " .getOrCreate())" ] }, { "cell_type": "code", "execution_count": 6, "id": "192912cb-d5b3-494e-8b93-e8bc17562680", "metadata": {}, "outputs": [], "source": [ "sc = spark.sparkContext # provides direct RDD access" ] }, { "cell_type": "code", "execution_count": 8, "id": "3a4b48c6-5173-41ac-8640-dcd52b712108", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[0, 1, 2, 3, 4]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "nums = list(range(0, 10_000_000))\n", "nums[:5]" ] }, { "cell_type": "code", "execution_count": 9, "id": "9b776732-09d8-4686-9ce4-15e83344118d", "metadata": {}, "outputs": [], "source": [ "rdd = sc.parallelize(nums)" ] }, { "cell_type": "code", "execution_count": 10, "id": "8b04b8f1-2fe7-4ffd-801c-b69d5cbb0978", "metadata": {}, "outputs": [], "source": [ "inverses = rdd.map(lambda x: 1/x) # TRANSFORMATION (lazy)" ] }, { "cell_type": "code", "execution_count": 12, "id": "49f94295-d242-4b43-aeb0-61fe7319fcaf", "metadata": {}, "outputs": [], "source": [ "# head = inverses.take(10) # ACTION (actually does the work)" ] }, { "cell_type": "code", "execution_count": 14, "id": "28a6ec21-419c-4db6-9f88-51b481e12838", "metadata": {}, "outputs": [], "source": [ "# inverses.mean() # ACTION" ] }, { "cell_type": "code", "execution_count": 15, "id": "f78e9613-9dd6-45a5-9bf2-e171c9e4b0d1", "metadata": {}, "outputs": [], "source": [ "inverses = rdd.filter(lambda x: x > 0).map(lambda x: 1/x) " ] }, { "cell_type": "code", "execution_count": 17, "id": "ae670238-47cf-4d6f-a299-18c1d1b1a536", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "25/03/09 17:59:52 WARN TaskSetManager: Stage 3 contains a task of very large size (12152 KiB). The maximum recommended task size is 1000 KiB.\n", " " ] }, { "data": { "text/plain": [ "1.669531293539298e-06" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 4 tasks, 0 are done, and 4 are in progress\n", "# [Stage 3:> (0 + 4) / 4]\n", "inverses.mean()" ] }, { "cell_type": "code", "execution_count": null, "id": "627099bd-69f1-41f9-81ca-31f73eaae90c", "metadata": {}, "outputs": [], "source": [ "# inverses.collect() # ACTION: be careful, if it's too big, we could run out of memory!" ] }, { "cell_type": "code", "execution_count": 18, "id": "f336fdc8-a273-49ad-b23c-73f14ccb48d4", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "4" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd.getNumPartitions()" ] }, { "cell_type": "code", "execution_count": 20, "id": "3655dbe6-b8ee-4f02-863a-ebb3a48b6614", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " " ] }, { "data": { "text/plain": [ "1.6695312935391358e-06" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# [Stage 5:======================> (20 + 4) / 50]\n", "rdd = sc.parallelize(nums, 50)\n", "inverses = rdd.filter(lambda x: x > 0).map(lambda x: 1/x)\n", "inverses.mean()" ] }, { "cell_type": "code", "execution_count": 22, "id": "c81343cd-83ef-4cae-b5ce-d2eba0b055f1", "metadata": {}, "outputs": [], "source": [ "import time" ] }, { "cell_type": "code", "execution_count": 21, "id": "754b4356-8026-4e03-91be-82436a8759f3", "metadata": {}, "outputs": [], "source": [ "sample = rdd.sample(True, 0.01) # TRANSFORMATION" ] }, { "cell_type": "code", "execution_count": 23, "id": "1735153d-6a9b-45f9-bac7-674853b9f274", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 6:=================================================> (43 + 4) / 50]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "4995621.997385424\n", "1.9789588451385498\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] } ], "source": [ "# 1st without cache\n", "t0 = time.time()\n", "print(sample.mean())\n", "t1 = time.time()\n", "print(t1-t0)" ] }, { "cell_type": "code", "execution_count": 24, "id": "0496d02b-6dd5-4df7-97cf-60b286217af4", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 7:===================================================> (45 + 4) / 50]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "4995621.997385424\n", "1.9979238510131836\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] } ], "source": [ "# 2nd without cache\n", "t0 = time.time()\n", "print(sample.mean())\n", "t1 = time.time()\n", "print(t1-t0)" ] }, { "cell_type": "code", "execution_count": 25, "id": "2051edd6-92f5-4c5c-a8a1-1547bfeeb0c8", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "PythonRDD[11] at RDD at PythonRDD.scala:53" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sample.cache()" ] }, { "cell_type": "code", "execution_count": 26, "id": "c50ed49f-07f7-494d-885b-56de91a0c554", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 8:==================================================> (44 + 4) / 50]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "4995621.997385424\n", "2.9729838371276855\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] } ], "source": [ "# 1st with cache\n", "t0 = time.time()\n", "print(sample.mean())\n", "t1 = time.time()\n", "print(t1-t0)" ] }, { "cell_type": "code", "execution_count": 27, "id": "3d602170-51b3-4dbf-a4ba-3a242d9b9ba2", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 9:===================================> (31 + 4) / 50]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "4995621.997385424\n", "1.2610077857971191\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] } ], "source": [ "# 2st with cache\n", "t0 = time.time()\n", "print(sample.mean())\n", "t1 = time.time()\n", "print(t1-t0)" ] }, { "cell_type": "code", "execution_count": 29, "id": "3f0c6999-f48c-4895-876e-8c60eff5a74d", "metadata": {}, "outputs": [], "source": [ "sample = rdd.sample(True, 0.01).repartition(4)" ] }, { "cell_type": "code", "execution_count": 30, "id": "e3d61398-bb60-4f7f-8594-ab876b1aa51b", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 10:====================================================> (47 + 3) / 50]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "5005973.481531257\n", "3.3820056915283203\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] } ], "source": [ "# 1st with cache, and fewer partitions\n", "t0 = time.time()\n", "print(sample.mean())\n", "t1 = time.time()\n", "print(t1-t0)" ] }, { "cell_type": "code", "execution_count": 31, "id": "201504ef-5bb4-4436-acfd-03e28c5bfd92", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "5005973.481531262\n", "0.2796463966369629\n" ] } ], "source": [ "# 2nd with cache, and fewer partitions\n", "t0 = time.time()\n", "print(sample.mean())\n", "t1 = time.time()\n", "print(t1-t0)" ] }, { "cell_type": "code", "execution_count": null, "id": "b47df2f0-98c8-46e5-8c88-2f3600188edb", "metadata": {}, "outputs": [], "source": [ "# note about determinism\n", "\n", "# ideally: given the same seed, sample the same way\n", "# in reality: even with seed, you can get different results if the number of input partitions changes\n", "# suggestion: be careful, and save your sampled results in a diff file to ensure determinism!\n", "sample = rdd.sample(True, 0.01, 544)" ] }, { "cell_type": "markdown", "id": "93036a5e-2066-461e-ba12-8bc549294bde", "metadata": {}, "source": [ "# Spark DataFrames: reading a text file\n", "\n", "Spark DataFrames build on Spark SQL, which builds on Spark RDDs." ] }, { "cell_type": "code", "execution_count": 37, "id": "9141f941-fdaa-4d40-a744-9b831cb84e2b", "metadata": {}, "outputs": [], "source": [ "# !wget https://pages.cs.wisc.edu/~harter/cs544/data/ghcnd-stations.txt" ] }, { "cell_type": "code", "execution_count": null, "id": "175cb54b-eb63-4a2e-9877-5a4b1d5d12bf", "metadata": {}, "outputs": [], "source": [ "# read/write:\n", "# spark.read.????.????.load() or .text()\n", "# df.write.????.????.saveTable()" ] }, { "cell_type": "code", "execution_count": 41, "id": "8e79a508-4dec-45d1-9dd3-58da5b83063c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "/nb/ghcnd-stations.txt\n" ] } ], "source": [ "! ls /nb/ghcnd-stations.txt" ] }, { "cell_type": "code", "execution_count": 42, "id": "7d4516f2-9802-490a-9fca-22111624b63d", "metadata": {}, "outputs": [], "source": [ "# df = spark.read.text(\"ghcnd-stations.txt\")" ] }, { "cell_type": "code", "execution_count": 43, "id": "970432fa-981b-4824-bced-95eeef96afb5", "metadata": {}, "outputs": [], "source": [ "# df.take(3)" ] }, { "cell_type": "code", "execution_count": 45, "id": "fa910718-36f0-41e1-9ff8-40754a9ae707", "metadata": {}, "outputs": [], "source": [ "!hdfs dfs -cp ghcnd-stations.txt hdfs://nn:9000/ghcnd-stations.txt" ] }, { "cell_type": "code", "execution_count": 46, "id": "df70d8a3-f302-4663-bcda-adee88155131", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Found 1 items\n", "-rw-r--r-- 3 root supergroup 10607756 2025-03-09 18:51 hdfs://nn:9000/ghcnd-stations.txt\n" ] } ], "source": [ "!hdfs dfs -ls hdfs://nn:9000/" ] }, { "cell_type": "code", "execution_count": 47, "id": "12493c05-e5f6-4120-a104-8b5fedf1a623", "metadata": {}, "outputs": [], "source": [ "df = spark.read.text(\"hdfs://nn:9000/ghcnd-stations.txt\")" ] }, { "cell_type": "code", "execution_count": 48, "id": "eefc522f-7e8d-4cb6-8785-8064a8938c9c", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " " ] }, { "data": { "text/plain": [ "[Row(value='ACW00011604 17.1167 -61.7833 10.1 ST JOHNS COOLIDGE FLD '),\n", " Row(value='ACW00011647 17.1333 -61.7833 19.2 ST JOHNS '),\n", " Row(value='AE000041196 25.3330 55.5170 34.0 SHARJAH INTER. AIRP GSN 41196')]" ] }, "execution_count": 48, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.take(3)" ] }, { "cell_type": "code", "execution_count": 49, "id": "65842e71-e8d3-4196-a217-005e98af68ae", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "pyspark.sql.dataframe.DataFrame" ] }, "execution_count": 49, "metadata": {}, "output_type": "execute_result" } ], "source": [ "type(df)" ] }, { "cell_type": "code", "execution_count": 51, "id": "ee8e9d09-6896-4257-899b-2e549e01337a", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "pyspark.rdd.RDD" ] }, "execution_count": 51, "metadata": {}, "output_type": "execute_result" } ], "source": [ "type(df.rdd)" ] }, { "cell_type": "code", "execution_count": 54, "id": "252fa425-9d8c-4792-8e26-e4ffd384a3c5", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'ACW00011604 17.1167 -61.7833 10.1 ST JOHNS COOLIDGE FLD '" ] }, "execution_count": 54, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.take(1)[0].value" ] }, { "cell_type": "code", "execution_count": 55, "id": "80d499d6-552e-4afa-ac38-111d31eb6d14", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['ACW00011604',\n", " 'ACW00011647',\n", " 'AE000041196',\n", " 'AEM00041194',\n", " 'AEM00041217',\n", " 'AEM00041218',\n", " 'AF000040930',\n", " 'AFM00040938',\n", " 'AFM00040948',\n", " 'AFM00040990']" ] }, "execution_count": 55, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# first 11 characters is the station name\n", "df.rdd.map(lambda row: row.value[:11]).take(10)" ] }, { "cell_type": "code", "execution_count": 56, "id": "10f9c364-5516-409d-8893-4e55bd031d98", "metadata": {}, "outputs": [], "source": [ "# how would we do this in Pandas? Extract the station names, add that as a column" ] }, { "cell_type": "code", "execution_count": 58, "id": "2ce8af7b-b4da-4e71-9c5a-a99487d2010e", "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>value</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>ACW00011604 17.1167 -61.7833 10.1 ST JO...</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>ACW00011647 17.1333 -61.7833 19.2 ST JO...</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>AE000041196 25.3330 55.5170 34.0 SHARJ...</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>AEM00041194 25.2550 55.3640 10.4 DUBAI...</td>\n", " </tr>\n", " <tr>\n", " <th>4</th>\n", " <td>AEM00041217 24.4330 54.6510 26.8 ABU D...</td>\n", " </tr>\n", " <tr>\n", " <th>5</th>\n", " <td>AEM00041218 24.2620 55.6090 264.9 AL AI...</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " value\n", "0 ACW00011604 17.1167 -61.7833 10.1 ST JO...\n", "1 ACW00011647 17.1333 -61.7833 19.2 ST JO...\n", "2 AE000041196 25.3330 55.5170 34.0 SHARJ...\n", "3 AEM00041194 25.2550 55.3640 10.4 DUBAI...\n", "4 AEM00041217 24.4330 54.6510 26.8 ABU D...\n", "5 AEM00041218 24.2620 55.6090 264.9 AL AI..." ] }, "execution_count": 58, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pandas_df = df.limit(6).toPandas()\n", "pandas_df" ] }, { "cell_type": "code", "execution_count": 63, "id": "29be9b4f-0a70-4b29-bd59-111098da36dd", "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>value</th>\n", " <th>station</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>ACW00011604 17.1167 -61.7833 10.1 ST JO...</td>\n", " <td>ACW00011604</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>ACW00011647 17.1333 -61.7833 19.2 ST JO...</td>\n", " <td>ACW00011647</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>AE000041196 25.3330 55.5170 34.0 SHARJ...</td>\n", " <td>AE000041196</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>AEM00041194 25.2550 55.3640 10.4 DUBAI...</td>\n", " <td>AEM00041194</td>\n", " </tr>\n", " <tr>\n", " <th>4</th>\n", " <td>AEM00041217 24.4330 54.6510 26.8 ABU D...</td>\n", " <td>AEM00041217</td>\n", " </tr>\n", " <tr>\n", " <th>5</th>\n", " <td>AEM00041218 24.2620 55.6090 264.9 AL AI...</td>\n", " <td>AEM00041218</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " value station\n", "0 ACW00011604 17.1167 -61.7833 10.1 ST JO... ACW00011604\n", "1 ACW00011647 17.1333 -61.7833 19.2 ST JO... ACW00011647\n", "2 AE000041196 25.3330 55.5170 34.0 SHARJ... AE000041196\n", "3 AEM00041194 25.2550 55.3640 10.4 DUBAI... AEM00041194\n", "4 AEM00041217 24.4330 54.6510 26.8 ABU D... AEM00041217\n", "5 AEM00041218 24.2620 55.6090 264.9 AL AI... AEM00041218" ] }, "execution_count": 63, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pandas_df[\"station\"] = pandas_df[\"value\"].str[:11]\n", "pandas_df" ] }, { "cell_type": "code", "execution_count": 64, "id": "664669e9-2624-4710-b58e-8303af3e1b07", "metadata": {}, "outputs": [], "source": [ "# not allowed, because df wraps rdd, which is immutable!\n", "# df[\"station\"] = ????" ] }, { "cell_type": "code", "execution_count": 65, "id": "465d5d75-4720-488d-b3fa-60e4246eca16", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import col, expr" ] }, { "cell_type": "code", "execution_count": 66, "id": "516597a7-fb56-456a-bef6-304ff9cc25f5", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Column<'x'>" ] }, "execution_count": 66, "metadata": {}, "output_type": "execute_result" } ], "source": [ "col(\"x\")" ] }, { "cell_type": "code", "execution_count": 67, "id": "db232b87-d14f-43ba-9536-39c1aad1fac5", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Column<'(x + 1)'>" ] }, "execution_count": 67, "metadata": {}, "output_type": "execute_result" } ], "source": [ "col(\"x\") + 1" ] }, { "cell_type": "code", "execution_count": 68, "id": "1ceae7a8-f355-4e5e-9857-ce451997e0f1", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Column<'(x + 1)'>" ] }, "execution_count": 68, "metadata": {}, "output_type": "execute_result" } ], "source": [ "expr(\"x + 1\")" ] }, { "cell_type": "code", "execution_count": 69, "id": "08fc588a-97c5-4908-8eb0-2631b3c4bf8f", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Column<'(x + 1) AS plusone'>" ] }, "execution_count": 69, "metadata": {}, "output_type": "execute_result" } ], "source": [ "expr(\"x + 1\").alias(\"plusone\") # similar to SQL \"AS\"" ] }, { "cell_type": "code", "execution_count": 70, "id": "93e8e7e9-c9fa-4f72-b086-3e31d3d3c31d", "metadata": {}, "outputs": [], "source": [ "df2 = df.withColumn(\"station\", expr(\"substring(value, 0, 11)\")) # transformation!" ] }, { "cell_type": "code", "execution_count": 71, "id": "f4d812d8-f1fa-4090-bbf2-590463e69c6d", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[value: string]" ] }, "execution_count": 71, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df" ] }, { "cell_type": "code", "execution_count": 72, "id": "f6ca7f4b-31ae-468c-adf8-0d1d6166820e", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[value: string, station: string]" ] }, "execution_count": 72, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df2" ] }, { "cell_type": "code", "execution_count": 73, "id": "7f2a04c3-dc21-4dbd-8490-dff279a20df6", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+-----------+\n", "| value| station|\n", "+--------------------+-----------+\n", "|ACW00011604 17.1...|ACW00011604|\n", "|ACW00011647 17.1...|ACW00011647|\n", "|AE000041196 25.3...|AE000041196|\n", "|AEM00041194 25.2...|AEM00041194|\n", "|AEM00041217 24.4...|AEM00041217|\n", "|AEM00041218 24.2...|AEM00041218|\n", "|AF000040930 35.3...|AF000040930|\n", "|AFM00040938 34.2...|AFM00040938|\n", "|AFM00040948 34.5...|AFM00040948|\n", "|AFM00040990 31.5...|AFM00040990|\n", "|AG000060390 36.7...|AG000060390|\n", "|AG000060590 30.5...|AG000060590|\n", "|AG000060611 28.0...|AG000060611|\n", "|AG000060680 22.8...|AG000060680|\n", "|AGE00135039 35.7...|AGE00135039|\n", "|AGE00147704 36.9...|AGE00147704|\n", "|AGE00147705 36.7...|AGE00147705|\n", "|AGE00147706 36.8...|AGE00147706|\n", "|AGE00147707 36.8...|AGE00147707|\n", "|AGE00147708 36.7...|AGE00147708|\n", "+--------------------+-----------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "df2.show() # action" ] }, { "cell_type": "code", "execution_count": 76, "id": "afaff554-089f-43cd-85c3-8f2af5384545", "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>value</th>\n", " <th>station</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>ACW00011604 17.1167 -61.7833 10.1 ST JO...</td>\n", " <td>ACW00011604</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>ACW00011647 17.1333 -61.7833 19.2 ST JO...</td>\n", " <td>ACW00011647</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>AE000041196 25.3330 55.5170 34.0 SHARJ...</td>\n", " <td>AE000041196</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>AEM00041194 25.2550 55.3640 10.4 DUBAI...</td>\n", " <td>AEM00041194</td>\n", " </tr>\n", " <tr>\n", " <th>4</th>\n", " <td>AEM00041217 24.4330 54.6510 26.8 ABU D...</td>\n", " <td>AEM00041217</td>\n", " </tr>\n", " <tr>\n", " <th>5</th>\n", " <td>AEM00041218 24.2620 55.6090 264.9 AL AI...</td>\n", " <td>AEM00041218</td>\n", " </tr>\n", " <tr>\n", " <th>6</th>\n", " <td>AF000040930 35.3170 69.0170 3366.0 NORTH...</td>\n", " <td>AF000040930</td>\n", " </tr>\n", " <tr>\n", " <th>7</th>\n", " <td>AFM00040938 34.2100 62.2280 977.2 HERAT...</td>\n", " <td>AFM00040938</td>\n", " </tr>\n", " <tr>\n", " <th>8</th>\n", " <td>AFM00040948 34.5660 69.2120 1791.3 KABUL...</td>\n", " <td>AFM00040948</td>\n", " </tr>\n", " <tr>\n", " <th>9</th>\n", " <td>AFM00040990 31.5000 65.8500 1010.0 KANDA...</td>\n", " <td>AFM00040990</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " value station\n", "0 ACW00011604 17.1167 -61.7833 10.1 ST JO... ACW00011604\n", "1 ACW00011647 17.1333 -61.7833 19.2 ST JO... ACW00011647\n", "2 AE000041196 25.3330 55.5170 34.0 SHARJ... AE000041196\n", "3 AEM00041194 25.2550 55.3640 10.4 DUBAI... AEM00041194\n", "4 AEM00041217 24.4330 54.6510 26.8 ABU D... AEM00041217\n", "5 AEM00041218 24.2620 55.6090 264.9 AL AI... AEM00041218\n", "6 AF000040930 35.3170 69.0170 3366.0 NORTH... AF000040930\n", "7 AFM00040938 34.2100 62.2280 977.2 HERAT... AFM00040938\n", "8 AFM00040948 34.5660 69.2120 1791.3 KABUL... AFM00040948\n", "9 AFM00040990 31.5000 65.8500 1010.0 KANDA... AFM00040990" ] }, "execution_count": 76, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df2.limit(10).toPandas() # limit is a transformation, toPandas is the action!" ] }, { "cell_type": "markdown", "id": "f49d5318-f826-4fa0-832e-dc05be8c9426", "metadata": {}, "source": [ "# Spark: CSVs and Parquet" ] }, { "cell_type": "code", "execution_count": 78, "id": "f2a328d7-982e-4e47-b145-2a2a79ef744e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--2025-03-10 01:40:01-- https://pages.cs.wisc.edu/~harter/cs544/data/sf.zip\n", "Resolving pages.cs.wisc.edu (pages.cs.wisc.edu)... 128.105.7.9\n", "Connecting to pages.cs.wisc.edu (pages.cs.wisc.edu)|128.105.7.9|:443... connected.\n", "200 OKequest sent, awaiting response... \n", "Length: 534803160 (510M) [application/zip]\n", "Saving to: ‘sf.zip’\n", "\n", "sf.zip 100%[===================>] 510.03M 21.5MB/s in 26s \n", "\n", "2025-03-10 01:40:27 (19.8 MB/s) - ‘sf.zip’ saved [534803160/534803160]\n", "\n" ] } ], "source": [ "! wget https://pages.cs.wisc.edu/~harter/cs544/data/sf.zip" ] }, { "cell_type": "code", "execution_count": 82, "id": "9b7b7bce-6d2e-4278-829a-a97c1b491195", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Archive: sf.zip\n", " inflating: sf.csv \n" ] } ], "source": [ "! unzip sf.zip" ] }, { "cell_type": "code", "execution_count": 83, "id": "cc7d6890-85d5-45f2-844e-644c9b6a21f2", "metadata": {}, "outputs": [], "source": [ "! hdfs dfs -cp sf.csv hdfs://nn:9000/sf.csv" ] }, { "cell_type": "code", "execution_count": 84, "id": "3d0d1512-1dcb-4b09-9ace-6348791ae0b5", "metadata": {}, "outputs": [], "source": [ "# read/write:\n", "# spark.read.????.????.load() or .text()\n", "# df.write.????.????.saveTable()" ] }, { "cell_type": "code", "execution_count": 90, "id": "494488f6-6186-4400-a830-89e7528e1357", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 32:> (0 + 1) / 1]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 2.54 ms, sys: 2.23 ms, total: 4.76 ms\n", "Wall time: 3.37 s\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] } ], "source": [ "%%time\n", "df = spark.read.format(\"csv\").option(\"header\", True).load(\"hdfs://nn:9000/sf.csv\")" ] }, { "cell_type": "code", "execution_count": 87, "id": "61599f7d-6b07-4ad1-baa3-ddea0b2091c7", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 27:=================================================> (15 + 2) / 17]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 8.32 ms, sys: 1.02 ms, total: 9.34 ms\n", "Wall time: 4.31 s\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] }, { "data": { "text/plain": [ "6016057" ] }, "execution_count": 87, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "df.count()" ] }, { "cell_type": "code", "execution_count": 91, "id": "19af30b2-327a-49eb-bfe9-3777b2ae5120", "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>Call Number</th>\n", " <th>Unit ID</th>\n", " <th>Incident Number</th>\n", " <th>Call Type</th>\n", " <th>Call Date</th>\n", " <th>Watch Date</th>\n", " <th>Received DtTm</th>\n", " <th>Entry DtTm</th>\n", " <th>Dispatch DtTm</th>\n", " <th>Response DtTm</th>\n", " <th>...</th>\n", " <th>Call Type Group</th>\n", " <th>Number of Alarms</th>\n", " <th>Unit Type</th>\n", " <th>Unit sequence in call dispatch</th>\n", " <th>Fire Prevention District</th>\n", " <th>Supervisor District</th>\n", " <th>Neighborhooods - Analysis Boundaries</th>\n", " <th>RowID</th>\n", " <th>case_location</th>\n", " <th>Analysis Neighborhoods</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>221210313</td>\n", " <td>E36</td>\n", " <td>22054955</td>\n", " <td>Outside Fire</td>\n", " <td>05/01/2022</td>\n", " <td>04/30/2022</td>\n", " <td>05/01/2022 02:58:25 AM</td>\n", " <td>05/01/2022 02:59:15 AM</td>\n", " <td>05/01/2022 02:59:25 AM</td>\n", " <td>05/01/2022 03:01:06 AM</td>\n", " <td>...</td>\n", " <td>Fire</td>\n", " <td>1</td>\n", " <td>ENGINE</td>\n", " <td>1</td>\n", " <td>2</td>\n", " <td>5</td>\n", " <td>Hayes Valley</td>\n", " <td>221210313-E36</td>\n", " <td>POINT (-122.42316555403964 37.77781524520032)</td>\n", " <td>9</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>220190150</td>\n", " <td>E29</td>\n", " <td>22008871</td>\n", " <td>Alarms</td>\n", " <td>01/19/2022</td>\n", " <td>01/18/2022</td>\n", " <td>01/19/2022 01:42:12 AM</td>\n", " <td>01/19/2022 01:44:13 AM</td>\n", " <td>01/19/2022 01:44:28 AM</td>\n", " <td>01/19/2022 01:46:47 AM</td>\n", " <td>...</td>\n", " <td>Alarm</td>\n", " <td>1</td>\n", " <td>ENGINE</td>\n", " <td>1</td>\n", " <td>3</td>\n", " <td>10</td>\n", " <td>Potrero Hill</td>\n", " <td>220190150-E29</td>\n", " <td>POINT (-122.39469970274361 37.76460987856451)</td>\n", " <td>26</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>211233271</td>\n", " <td>T07</td>\n", " <td>21053032</td>\n", " <td>Alarms</td>\n", " <td>05/03/2021</td>\n", " <td>05/03/2021</td>\n", " <td>05/03/2021 09:28:12 PM</td>\n", " <td>05/03/2021 09:28:12 PM</td>\n", " <td>05/03/2021 09:28:17 PM</td>\n", " <td>05/03/2021 09:29:10 PM</td>\n", " <td>...</td>\n", " <td>Alarm</td>\n", " <td>1</td>\n", " <td>TRUCK</td>\n", " <td>2</td>\n", " <td>2</td>\n", " <td>9</td>\n", " <td>Mission</td>\n", " <td>211233271-T07</td>\n", " <td>POINT (-122.42057572093252 37.76418194637148)</td>\n", " <td>20</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>212933533</td>\n", " <td>B02</td>\n", " <td>21127914</td>\n", " <td>Alarms</td>\n", " <td>10/20/2021</td>\n", " <td>10/20/2021</td>\n", " <td>10/20/2021 10:08:47 PM</td>\n", " <td>10/20/2021 10:09:53 PM</td>\n", " <td>10/20/2021 10:10:07 PM</td>\n", " <td>10/20/2021 10:11:55 PM</td>\n", " <td>...</td>\n", " <td>Alarm</td>\n", " <td>1</td>\n", " <td>CHIEF</td>\n", " <td>3</td>\n", " <td>3</td>\n", " <td>6</td>\n", " <td>Tenderloin</td>\n", " <td>212933533-B02</td>\n", " <td>POINT (-122.41243514072728 37.78347684038771)</td>\n", " <td>36</td>\n", " </tr>\n", " <tr>\n", " <th>4</th>\n", " <td>221202543</td>\n", " <td>E41</td>\n", " <td>22054815</td>\n", " <td>Alarms</td>\n", " <td>04/30/2022</td>\n", " <td>04/30/2022</td>\n", " <td>04/30/2022 06:35:58 PM</td>\n", " <td>04/30/2022 06:37:28 PM</td>\n", " <td>04/30/2022 06:37:43 PM</td>\n", " <td>04/30/2022 06:38:17 PM</td>\n", " <td>...</td>\n", " <td>Alarm</td>\n", " <td>1</td>\n", " <td>ENGINE</td>\n", " <td>4</td>\n", " <td>4</td>\n", " <td>2</td>\n", " <td>Russian Hill</td>\n", " <td>221202543-E41</td>\n", " <td>POINT (-122.4233369425531 37.799534868680034)</td>\n", " <td>32</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "<p>5 rows × 35 columns</p>\n", "</div>" ], "text/plain": [ " Call Number Unit ID Incident Number Call Type Call Date Watch Date \\\n", "0 221210313 E36 22054955 Outside Fire 05/01/2022 04/30/2022 \n", "1 220190150 E29 22008871 Alarms 01/19/2022 01/18/2022 \n", "2 211233271 T07 21053032 Alarms 05/03/2021 05/03/2021 \n", "3 212933533 B02 21127914 Alarms 10/20/2021 10/20/2021 \n", "4 221202543 E41 22054815 Alarms 04/30/2022 04/30/2022 \n", "\n", " Received DtTm Entry DtTm Dispatch DtTm \\\n", "0 05/01/2022 02:58:25 AM 05/01/2022 02:59:15 AM 05/01/2022 02:59:25 AM \n", "1 01/19/2022 01:42:12 AM 01/19/2022 01:44:13 AM 01/19/2022 01:44:28 AM \n", "2 05/03/2021 09:28:12 PM 05/03/2021 09:28:12 PM 05/03/2021 09:28:17 PM \n", "3 10/20/2021 10:08:47 PM 10/20/2021 10:09:53 PM 10/20/2021 10:10:07 PM \n", "4 04/30/2022 06:35:58 PM 04/30/2022 06:37:28 PM 04/30/2022 06:37:43 PM \n", "\n", " Response DtTm ... Call Type Group Number of Alarms Unit Type \\\n", "0 05/01/2022 03:01:06 AM ... Fire 1 ENGINE \n", "1 01/19/2022 01:46:47 AM ... Alarm 1 ENGINE \n", "2 05/03/2021 09:29:10 PM ... Alarm 1 TRUCK \n", "3 10/20/2021 10:11:55 PM ... Alarm 1 CHIEF \n", "4 04/30/2022 06:38:17 PM ... Alarm 1 ENGINE \n", "\n", " Unit sequence in call dispatch Fire Prevention District Supervisor District \\\n", "0 1 2 5 \n", "1 1 3 10 \n", "2 2 2 9 \n", "3 3 3 6 \n", "4 4 4 2 \n", "\n", " Neighborhooods - Analysis Boundaries RowID \\\n", "0 Hayes Valley 221210313-E36 \n", "1 Potrero Hill 220190150-E29 \n", "2 Mission 211233271-T07 \n", "3 Tenderloin 212933533-B02 \n", "4 Russian Hill 221202543-E41 \n", "\n", " case_location Analysis Neighborhoods \n", "0 POINT (-122.42316555403964 37.77781524520032) 9 \n", "1 POINT (-122.39469970274361 37.76460987856451) 26 \n", "2 POINT (-122.42057572093252 37.76418194637148) 20 \n", "3 POINT (-122.41243514072728 37.78347684038771) 36 \n", "4 POINT (-122.4233369425531 37.799534868680034) 32 \n", "\n", "[5 rows x 35 columns]" ] }, "execution_count": 91, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.limit(5).toPandas()" ] }, { "cell_type": "code", "execution_count": 92, "id": "9685c540-f710-4c91-ae79-d3ea75f72201", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Call Number', 'string'),\n", " ('Unit ID', 'string'),\n", " ('Incident Number', 'string'),\n", " ('Call Type', 'string'),\n", " ('Call Date', 'string'),\n", " ('Watch Date', 'string'),\n", " ('Received DtTm', 'string'),\n", " ('Entry DtTm', 'string'),\n", " ('Dispatch DtTm', 'string'),\n", " ('Response DtTm', 'string'),\n", " ('On Scene DtTm', 'string'),\n", " ('Transport DtTm', 'string'),\n", " ('Hospital DtTm', 'string'),\n", " ('Call Final Disposition', 'string'),\n", " ('Available DtTm', 'string'),\n", " ('Address', 'string'),\n", " ('City', 'string'),\n", " ('Zipcode of Incident', 'string'),\n", " ('Battalion', 'string'),\n", " ('Station Area', 'string'),\n", " ('Box', 'string'),\n", " ('Original Priority', 'string'),\n", " ('Priority', 'string'),\n", " ('Final Priority', 'string'),\n", " ('ALS Unit', 'string'),\n", " ('Call Type Group', 'string'),\n", " ('Number of Alarms', 'string'),\n", " ('Unit Type', 'string'),\n", " ('Unit sequence in call dispatch', 'string'),\n", " ('Fire Prevention District', 'string'),\n", " ('Supervisor District', 'string'),\n", " ('Neighborhooods - Analysis Boundaries', 'string'),\n", " ('RowID', 'string'),\n", " ('case_location', 'string'),\n", " ('Analysis Neighborhoods', 'string')]" ] }, "execution_count": 92, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.dtypes" ] }, { "cell_type": "code", "execution_count": 93, "id": "50c40bca-2cde-4d40-abdb-398e4e1b50e3", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 35:====================================================> (16 + 1) / 17]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 7.83 ms, sys: 3.52 ms, total: 11.4 ms\n", "Wall time: 10.9 s\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] } ], "source": [ "%%time\n", "df = spark.read.format(\"csv\").option(\"header\", True).option(\"inferSchema\", True).load(\"hdfs://nn:9000/sf.csv\")" ] }, { "cell_type": "code", "execution_count": 94, "id": "577e1773-4308-45b8-9ddc-c74f43f4c3f8", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Call Number', 'int'),\n", " ('Unit ID', 'string'),\n", " ('Incident Number', 'int'),\n", " ('Call Type', 'string'),\n", " ('Call Date', 'string'),\n", " ('Watch Date', 'string'),\n", " ('Received DtTm', 'string'),\n", " ('Entry DtTm', 'string'),\n", " ('Dispatch DtTm', 'string'),\n", " ('Response DtTm', 'string'),\n", " ('On Scene DtTm', 'string'),\n", " ('Transport DtTm', 'string'),\n", " ('Hospital DtTm', 'string'),\n", " ('Call Final Disposition', 'string'),\n", " ('Available DtTm', 'string'),\n", " ('Address', 'string'),\n", " ('City', 'string'),\n", " ('Zipcode of Incident', 'int'),\n", " ('Battalion', 'string'),\n", " ('Station Area', 'string'),\n", " ('Box', 'string'),\n", " ('Original Priority', 'string'),\n", " ('Priority', 'string'),\n", " ('Final Priority', 'int'),\n", " ('ALS Unit', 'boolean'),\n", " ('Call Type Group', 'string'),\n", " ('Number of Alarms', 'int'),\n", " ('Unit Type', 'string'),\n", " ('Unit sequence in call dispatch', 'int'),\n", " ('Fire Prevention District', 'string'),\n", " ('Supervisor District', 'string'),\n", " ('Neighborhooods - Analysis Boundaries', 'string'),\n", " ('RowID', 'string'),\n", " ('case_location', 'string'),\n", " ('Analysis Neighborhoods', 'int')]" ] }, "execution_count": 94, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.dtypes" ] }, { "cell_type": "code", "execution_count": 96, "id": "3f5725e7-2e67-4685-8780-b999c0f7aa17", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "17" ] }, "execution_count": 96, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.rdd.getNumPartitions()" ] }, { "cell_type": "code", "execution_count": 97, "id": "5a599aa5-48f4-46ce-8ee2-84d57d4453e1", "metadata": {}, "outputs": [], "source": [ "# example 1: how can we cleanup the strings (upper case), and get date types" ] }, { "cell_type": "code", "execution_count": 100, "id": "699bb766-6ba5-40a6-9a18-c292f37f8066", "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>Call Type</th>\n", " <th>Call Date</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>Outside Fire</td>\n", " <td>05/01/2022</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>Alarms</td>\n", " <td>01/19/2022</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>Alarms</td>\n", " <td>05/03/2021</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " Call Type Call Date\n", "0 Outside Fire 05/01/2022\n", "1 Alarms 01/19/2022\n", "2 Alarms 05/03/2021" ] }, "execution_count": 100, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.select(\"Call Type\", \"Call Date\").limit(3).toPandas()" ] }, { "cell_type": "code", "execution_count": 107, "id": "b306d906-0bd7-4cbf-b0fa-856ded1f0725", "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>Call_Type</th>\n", " <th>Call_Date</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>OUTSIDE FIRE</td>\n", " <td>2022-05-01</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>ALARMS</td>\n", " <td>2022-01-19</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>ALARMS</td>\n", " <td>2021-05-03</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " Call_Type Call_Date\n", "0 OUTSIDE FIRE 2022-05-01\n", "1 ALARMS 2022-01-19\n", "2 ALARMS 2021-05-03" ] }, "execution_count": 107, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.sql.functions import col, expr\n", "df.select(\n", " expr(\"upper(`Call Type`)\").alias(\"Call_Type\"),\n", " expr(\"to_date(`Call Date`, 'MM/dd/yyyy')\").alias(\"Call_Date\")\n", ").limit(3).toPandas()" ] }, { "cell_type": "code", "execution_count": 108, "id": "f6b0d597-ffcf-44e7-8817-81e850435104", "metadata": {}, "outputs": [], "source": [ "# example 2: convert the CSV to Parquet, with no spaces in the column names" ] }, { "cell_type": "code", "execution_count": 111, "id": "ba66cbd3-8692-4dfd-b95c-c934f689660d", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Column<'Call Number AS Call_Number'>" ] }, "execution_count": 111, "metadata": {}, "output_type": "execute_result" } ], "source": [ "col(\"Call Number\").alias(\"Call_Number\")" ] }, { "cell_type": "code", "execution_count": 122, "id": "c1503637-6e74-4211-91d0-24881b666ccb", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " " ] } ], "source": [ "(\n", " df\n", " .select([col(c).alias(c.replace(\" \", \"_\")) for c in df.columns])\n", " .write\n", " .mode(\"overwrite\")\n", " .format(\"parquet\")\n", " .save(\"hdfs://nn:9000/sf.parquet\")\n", ")" ] }, { "cell_type": "code", "execution_count": 123, "id": "9842dfa1-b43e-4890-9873-fff11ce76b8a", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Found 18 items\n", "-rw-r--r-- 3 root supergroup 0 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/_SUCCESS\n", "-rw-r--r-- 3 root supergroup 27806510 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00000-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 27789781 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00001-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 40478442 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00002-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 36017328 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00003-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 36033379 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00004-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 36082202 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00005-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 35944952 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00006-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 35912043 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00007-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 36436328 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00008-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 35368134 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00009-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 34238988 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00010-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 33948649 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00011-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 33488640 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00012-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 34900131 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00013-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 35715813 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00014-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 35769206 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00015-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n", "-rw-r--r-- 3 root supergroup 29363174 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00016-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet\n" ] } ], "source": [ "! hdfs dfs -ls hdfs://nn:9000/sf.parquet/" ] }, { "cell_type": "code", "execution_count": 124, "id": "cef40087-bfd5-43c2-9c6d-9e6644a7079d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 2.69 ms, sys: 960 μs, total: 3.65 ms\n", "Wall time: 231 ms\n" ] } ], "source": [ "%%time\n", "df = spark.read.format(\"parquet\").load(\"hdfs://nn:9000/sf.parquet\")" ] }, { "cell_type": "code", "execution_count": 125, "id": "ccc39826-d6f1-433a-a005-269d8fa88e9f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 719 μs, sys: 993 μs, total: 1.71 ms\n", "Wall time: 443 ms\n" ] }, { "data": { "text/plain": [ "6016056" ] }, "execution_count": 125, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "df.count()" ] }, { "cell_type": "code", "execution_count": 126, "id": "574746ea-5e08-4585-be3c-e6d151262e20", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[Call_Number: int, Unit_ID: string, Incident_Number: int, Call_Type: string, Call_Date: string, Watch_Date: string, Received_DtTm: string, Entry_DtTm: string, Dispatch_DtTm: string, Response_DtTm: string, On_Scene_DtTm: string, Transport_DtTm: string, Hospital_DtTm: string, Call_Final_Disposition: string, Available_DtTm: string, Address: string, City: string, Zipcode_of_Incident: int, Battalion: string, Station_Area: string, Box: string, Original_Priority: string, Priority: string, Final_Priority: int, ALS_Unit: boolean, Call_Type_Group: string, Number_of_Alarms: int, Unit_Type: string, Unit_sequence_in_call_dispatch: int, Fire_Prevention_District: string, Supervisor_District: string, Neighborhooods_-_Analysis_Boundaries: string, RowID: string, case_location: string, Analysis_Neighborhoods: int]" ] }, "execution_count": 126, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df" ] }, { "cell_type": "code", "execution_count": 127, "id": "67086ea3-b1e9-4e17-8cbe-acd155371d80", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "6" ] }, "execution_count": 127, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.rdd.getNumPartitions()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.3" } }, "nbformat": 4, "nbformat_minor": 5 }