In [1]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
 .master("spark://boss:7077")
 .config("spark.executor.memory", "512M")
 .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/09 17:52:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
sc = spark.sparkContext # provides direct RDD access

In [8]:
nums = list(range(0, 10_000_000))
nums[:5]

[0, 1, 2, 3, 4]

In [9]:
rdd = sc.parallelize(nums)

In [10]:
inverses = rdd.map(lambda x: 1/x) # TRANSFORMATION (lazy)

In [12]:
# head = inverses.take(10) # ACTION (actually does the work)

In [14]:
# inverses.mean() # ACTION

In [15]:
inverses = rdd.filter(lambda x: x > 0).map(lambda x: 1/x) 

In [17]:
# 4 tasks, 0 are done, and 4 are in progress
# [Stage 3:> (0 + 4) / 4]
inverses.mean()

25/03/09 17:59:52 WARN TaskSetManager: Stage 3 contains a task of very large size (12152 KiB). The maximum recommended task size is 1000 KiB.
 

1.669531293539298e-06

In [None]:
# inverses.collect() # ACTION: be careful, if it's too big, we could run out of memory!

In [18]:
rdd.getNumPartitions()

4

In [20]:
# [Stage 5:======================> (20 + 4) / 50]
rdd = sc.parallelize(nums, 50)
inverses = rdd.filter(lambda x: x > 0).map(lambda x: 1/x)
inverses.mean()

 

1.6695312935391358e-06

In [22]:
import time

In [21]:
sample = rdd.sample(True, 0.01) # TRANSFORMATION

In [23]:
# 1st without cache
t0 = time.time()
print(sample.mean())
t1 = time.time()
print(t1-t0)



4995621.997385424
1.9789588451385498


 

In [24]:
# 2nd without cache
t0 = time.time()
print(sample.mean())
t1 = time.time()
print(t1-t0)



4995621.997385424
1.9979238510131836


 

In [25]:
sample.cache()

PythonRDD[11] at RDD at PythonRDD.scala:53

In [26]:
# 1st with cache
t0 = time.time()
print(sample.mean())
t1 = time.time()
print(t1-t0)



4995621.997385424
2.9729838371276855


 

In [27]:
# 2st with cache
t0 = time.time()
print(sample.mean())
t1 = time.time()
print(t1-t0)



4995621.997385424
1.2610077857971191


 

In [29]:
sample = rdd.sample(True, 0.01).repartition(4)

In [30]:
# 1st with cache, and fewer partitions
t0 = time.time()
print(sample.mean())
t1 = time.time()
print(t1-t0)



5005973.481531257
3.3820056915283203


 

In [31]:
# 2nd with cache, and fewer partitions
t0 = time.time()
print(sample.mean())
t1 = time.time()
print(t1-t0)

5005973.481531262
0.2796463966369629


In [None]:
# note about determinism

# ideally: given the same seed, sample the same way
# in reality: even with seed, you can get different results if the number of input partitions changes
# suggestion: be careful, and save your sampled results in a diff file to ensure determinism!
sample = rdd.sample(True, 0.01, 544)

# Spark DataFrames: reading a text file

Spark DataFrames build on Spark SQL, which builds on Spark RDDs.

In [37]:
# !wget https://pages.cs.wisc.edu/~harter/cs544/data/ghcnd-stations.txt

In [None]:
# read/write:
# spark.read.????.????.load() or .text()
# df.write.????.????.saveTable()

In [41]:
! ls /nb/ghcnd-stations.txt

/nb/ghcnd-stations.txt


In [42]:
# df = spark.read.text("ghcnd-stations.txt")

In [43]:
# df.take(3)

In [45]:
!hdfs dfs -cp ghcnd-stations.txt hdfs://nn:9000/ghcnd-stations.txt

In [46]:
!hdfs dfs -ls hdfs://nn:9000/

Found 1 items
-rw-r--r-- 3 root supergroup 10607756 2025-03-09 18:51 hdfs://nn:9000/ghcnd-stations.txt


In [47]:
df = spark.read.text("hdfs://nn:9000/ghcnd-stations.txt")

In [48]:
df.take(3)

 

[Row(value='ACW00011604 17.1167 -61.7833 10.1 ST JOHNS COOLIDGE FLD '),
 Row(value='ACW00011647 17.1333 -61.7833 19.2 ST JOHNS '),
 Row(value='AE000041196 25.3330 55.5170 34.0 SHARJAH INTER. AIRP GSN 41196')]

In [49]:
type(df)

pyspark.sql.dataframe.DataFrame

In [51]:
type(df.rdd)

pyspark.rdd.RDD

In [54]:
df.take(1)[0].value

'ACW00011604 17.1167 -61.7833 10.1 ST JOHNS COOLIDGE FLD '

In [55]:
# first 11 characters is the station name
df.rdd.map(lambda row: row.value[:11]).take(10)

['ACW00011604',
 'ACW00011647',
 'AE000041196',
 'AEM00041194',
 'AEM00041217',
 'AEM00041218',
 'AF000040930',
 'AFM00040938',
 'AFM00040948',
 'AFM00040990']

In [56]:
# how would we do this in Pandas? Extract the station names, add that as a column

In [58]:
pandas_df = df.limit(6).toPandas()
pandas_df

Unnamed: 0,value
0,ACW00011604 17.1167 -61.7833 10.1 ST JO...
1,ACW00011647 17.1333 -61.7833 19.2 ST JO...
2,AE000041196 25.3330 55.5170 34.0 SHARJ...
3,AEM00041194 25.2550 55.3640 10.4 DUBAI...
4,AEM00041217 24.4330 54.6510 26.8 ABU D...
5,AEM00041218 24.2620 55.6090 264.9 AL AI...


In [63]:
pandas_df["station"] = pandas_df["value"].str[:11]
pandas_df

Unnamed: 0,value,station
0,ACW00011604 17.1167 -61.7833 10.1 ST JO...,ACW00011604
1,ACW00011647 17.1333 -61.7833 19.2 ST JO...,ACW00011647
2,AE000041196 25.3330 55.5170 34.0 SHARJ...,AE000041196
3,AEM00041194 25.2550 55.3640 10.4 DUBAI...,AEM00041194
4,AEM00041217 24.4330 54.6510 26.8 ABU D...,AEM00041217
5,AEM00041218 24.2620 55.6090 264.9 AL AI...,AEM00041218


In [64]:
# not allowed, because df wraps rdd, which is immutable!
# df["station"] = ????

In [65]:
from pyspark.sql.functions import col, expr

In [66]:
col("x")

Column<'x'>

In [67]:
col("x") + 1

Column<'(x + 1)'>

In [68]:
expr("x + 1")

Column<'(x + 1)'>

In [69]:
expr("x + 1").alias("plusone") # similar to SQL "AS"

Column<'(x + 1) AS plusone'>

In [70]:
df2 = df.withColumn("station", expr("substring(value, 0, 11)")) # transformation!

In [71]:
df

DataFrame[value: string]

In [72]:
df2

DataFrame[value: string, station: string]

In [73]:
df2.show() # action

+--------------------+-----------+
| value| station|
+--------------------+-----------+
|ACW00011604 17.1...|ACW00011604|
|ACW00011647 17.1...|ACW00011647|
|AE000041196 25.3...|AE000041196|
|AEM00041194 25.2...|AEM00041194|
|AEM00041217 24.4...|AEM00041217|
|AEM00041218 24.2...|AEM00041218|
|AF000040930 35.3...|AF000040930|
|AFM00040938 34.2...|AFM00040938|
|AFM00040948 34.5...|AFM00040948|
|AFM00040990 31.5...|AFM00040990|
|AG000060390 36.7...|AG000060390|
|AG000060590 30.5...|AG000060590|
|AG000060611 28.0...|AG000060611|
|AG000060680 22.8...|AG000060680|
|AGE00135039 35.7...|AGE00135039|
|AGE00147704 36.9...|AGE00147704|
|AGE00147705 36.7...|AGE00147705|
|AGE00147706 36.8...|AGE00147706|
|AGE00147707 36.8...|AGE00147707|
|AGE00147708 36.7...|AGE00147708|
+--------------------+-----------+
only showing top 20 rows



In [76]:
df2.limit(10).toPandas() # limit is a transformation, toPandas is the action!

Unnamed: 0,value,station
0,ACW00011604 17.1167 -61.7833 10.1 ST JO...,ACW00011604
1,ACW00011647 17.1333 -61.7833 19.2 ST JO...,ACW00011647
2,AE000041196 25.3330 55.5170 34.0 SHARJ...,AE000041196
3,AEM00041194 25.2550 55.3640 10.4 DUBAI...,AEM00041194
4,AEM00041217 24.4330 54.6510 26.8 ABU D...,AEM00041217
5,AEM00041218 24.2620 55.6090 264.9 AL AI...,AEM00041218
6,AF000040930 35.3170 69.0170 3366.0 NORTH...,AF000040930
7,AFM00040938 34.2100 62.2280 977.2 HERAT...,AFM00040938
8,AFM00040948 34.5660 69.2120 1791.3 KABUL...,AFM00040948
9,AFM00040990 31.5000 65.8500 1010.0 KANDA...,AFM00040990


# Spark: CSVs and Parquet

In [78]:
! wget https://pages.cs.wisc.edu/~harter/cs544/data/sf.zip

--2025-03-10 01:40:01-- https://pages.cs.wisc.edu/~harter/cs544/data/sf.zip
Resolving pages.cs.wisc.edu (pages.cs.wisc.edu)... 128.105.7.9
Connecting to pages.cs.wisc.edu (pages.cs.wisc.edu)|128.105.7.9|:443... connected.
200 OKequest sent, awaiting response... 
Length: 534803160 (510M) [application/zip]
Saving to: ‘sf.zip’


2025-03-10 01:40:27 (19.8 MB/s) - ‘sf.zip’ saved [534803160/534803160]



In [82]:
! unzip sf.zip

Archive: sf.zip
 inflating: sf.csv 


In [83]:
! hdfs dfs -cp sf.csv hdfs://nn:9000/sf.csv

In [84]:
# read/write:
# spark.read.????.????.load() or .text()
# df.write.????.????.saveTable()

In [90]:
%%time
df = spark.read.format("csv").option("header", True).load("hdfs://nn:9000/sf.csv")

[Stage 32:> (0 + 1) / 1]

CPU times: user 2.54 ms, sys: 2.23 ms, total: 4.76 ms
Wall time: 3.37 s


 

In [87]:
%%time
df.count()



CPU times: user 8.32 ms, sys: 1.02 ms, total: 9.34 ms
Wall time: 4.31 s


 

6016057

In [91]:
df.limit(5).toPandas()

Unnamed: 0,Call Number,Unit ID,Incident Number,Call Type,Call Date,Watch Date,Received DtTm,Entry DtTm,Dispatch DtTm,Response DtTm,...,Call Type Group,Number of Alarms,Unit Type,Unit sequence in call dispatch,Fire Prevention District,Supervisor District,Neighborhooods - Analysis Boundaries,RowID,case_location,Analysis Neighborhoods
0,221210313,E36,22054955,Outside Fire,05/01/2022,04/30/2022,05/01/2022 02:58:25 AM,05/01/2022 02:59:15 AM,05/01/2022 02:59:25 AM,05/01/2022 03:01:06 AM,...,Fire,1,ENGINE,1,2,5,Hayes Valley,221210313-E36,POINT (-122.42316555403964 37.77781524520032),9
1,220190150,E29,22008871,Alarms,01/19/2022,01/18/2022,01/19/2022 01:42:12 AM,01/19/2022 01:44:13 AM,01/19/2022 01:44:28 AM,01/19/2022 01:46:47 AM,...,Alarm,1,ENGINE,1,3,10,Potrero Hill,220190150-E29,POINT (-122.39469970274361 37.76460987856451),26
2,211233271,T07,21053032,Alarms,05/03/2021,05/03/2021,05/03/2021 09:28:12 PM,05/03/2021 09:28:12 PM,05/03/2021 09:28:17 PM,05/03/2021 09:29:10 PM,...,Alarm,1,TRUCK,2,2,9,Mission,211233271-T07,POINT (-122.42057572093252 37.76418194637148),20
3,212933533,B02,21127914,Alarms,10/20/2021,10/20/2021,10/20/2021 10:08:47 PM,10/20/2021 10:09:53 PM,10/20/2021 10:10:07 PM,10/20/2021 10:11:55 PM,...,Alarm,1,CHIEF,3,3,6,Tenderloin,212933533-B02,POINT (-122.41243514072728 37.78347684038771),36
4,221202543,E41,22054815,Alarms,04/30/2022,04/30/2022,04/30/2022 06:35:58 PM,04/30/2022 06:37:28 PM,04/30/2022 06:37:43 PM,04/30/2022 06:38:17 PM,...,Alarm,1,ENGINE,4,4,2,Russian Hill,221202543-E41,POINT (-122.4233369425531 37.799534868680034),32


In [92]:
df.dtypes

[('Call Number', 'string'),
 ('Unit ID', 'string'),
 ('Incident Number', 'string'),
 ('Call Type', 'string'),
 ('Call Date', 'string'),
 ('Watch Date', 'string'),
 ('Received DtTm', 'string'),
 ('Entry DtTm', 'string'),
 ('Dispatch DtTm', 'string'),
 ('Response DtTm', 'string'),
 ('On Scene DtTm', 'string'),
 ('Transport DtTm', 'string'),
 ('Hospital DtTm', 'string'),
 ('Call Final Disposition', 'string'),
 ('Available DtTm', 'string'),
 ('Address', 'string'),
 ('City', 'string'),
 ('Zipcode of Incident', 'string'),
 ('Battalion', 'string'),
 ('Station Area', 'string'),
 ('Box', 'string'),
 ('Original Priority', 'string'),
 ('Priority', 'string'),
 ('Final Priority', 'string'),
 ('ALS Unit', 'string'),
 ('Call Type Group', 'string'),
 ('Number of Alarms', 'string'),
 ('Unit Type', 'string'),
 ('Unit sequence in call dispatch', 'string'),
 ('Fire Prevention District', 'string'),
 ('Supervisor District', 'string'),
 ('Neighborhooods - Analysis Boundaries', 'string'),
 ('RowID', 'string')

In [93]:
%%time
df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("hdfs://nn:9000/sf.csv")



CPU times: user 7.83 ms, sys: 3.52 ms, total: 11.4 ms
Wall time: 10.9 s


 

In [94]:
df.dtypes

[('Call Number', 'int'),
 ('Unit ID', 'string'),
 ('Incident Number', 'int'),
 ('Call Type', 'string'),
 ('Call Date', 'string'),
 ('Watch Date', 'string'),
 ('Received DtTm', 'string'),
 ('Entry DtTm', 'string'),
 ('Dispatch DtTm', 'string'),
 ('Response DtTm', 'string'),
 ('On Scene DtTm', 'string'),
 ('Transport DtTm', 'string'),
 ('Hospital DtTm', 'string'),
 ('Call Final Disposition', 'string'),
 ('Available DtTm', 'string'),
 ('Address', 'string'),
 ('City', 'string'),
 ('Zipcode of Incident', 'int'),
 ('Battalion', 'string'),
 ('Station Area', 'string'),
 ('Box', 'string'),
 ('Original Priority', 'string'),
 ('Priority', 'string'),
 ('Final Priority', 'int'),
 ('ALS Unit', 'boolean'),
 ('Call Type Group', 'string'),
 ('Number of Alarms', 'int'),
 ('Unit Type', 'string'),
 ('Unit sequence in call dispatch', 'int'),
 ('Fire Prevention District', 'string'),
 ('Supervisor District', 'string'),
 ('Neighborhooods - Analysis Boundaries', 'string'),
 ('RowID', 'string'),
 ('case_locatio

In [96]:
df.rdd.getNumPartitions()

17

In [97]:
# example 1: how can we cleanup the strings (upper case), and get date types

In [100]:
df.select("Call Type", "Call Date").limit(3).toPandas()

Unnamed: 0,Call Type,Call Date
0,Outside Fire,05/01/2022
1,Alarms,01/19/2022
2,Alarms,05/03/2021


In [107]:
from pyspark.sql.functions import col, expr
df.select(
 expr("upper(`Call Type`)").alias("Call_Type"),
 expr("to_date(`Call Date`, 'MM/dd/yyyy')").alias("Call_Date")
).limit(3).toPandas()

Unnamed: 0,Call_Type,Call_Date
0,OUTSIDE FIRE,2022-05-01
1,ALARMS,2022-01-19
2,ALARMS,2021-05-03


In [108]:
# example 2: convert the CSV to Parquet, with no spaces in the column names

In [111]:
col("Call Number").alias("Call_Number")

Column<'Call Number AS Call_Number'>

In [122]:
(
 df
 .select([col(c).alias(c.replace(" ", "_")) for c in df.columns])
 .write
 .mode("overwrite")
 .format("parquet")
 .save("hdfs://nn:9000/sf.parquet")
)

 

In [123]:
! hdfs dfs -ls hdfs://nn:9000/sf.parquet/

Found 18 items
-rw-r--r-- 3 root supergroup 0 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/_SUCCESS
-rw-r--r-- 3 root supergroup 27806510 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00000-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet
-rw-r--r-- 3 root supergroup 27789781 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00001-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet
-rw-r--r-- 3 root supergroup 40478442 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00002-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet
-rw-r--r-- 3 root supergroup 36017328 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00003-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet
-rw-r--r-- 3 root supergroup 36033379 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00004-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet
-rw-r--r-- 3 root supergroup 36082202 2025-03-10 01:54 hdfs://nn:9000/sf.parquet/part-00005-cade4df6-026c-40f3-9f8d-8394a0e146ef-c000.snappy.parquet
-rw-r--r

In [124]:
%%time
df = spark.read.format("parquet").load("hdfs://nn:9000/sf.parquet")

CPU times: user 2.69 ms, sys: 960 μs, total: 3.65 ms
Wall time: 231 ms


In [125]:
%%time
df.count()

CPU times: user 719 μs, sys: 993 μs, total: 1.71 ms
Wall time: 443 ms


6016056

In [126]:
df

DataFrame[Call_Number: int, Unit_ID: string, Incident_Number: int, Call_Type: string, Call_Date: string, Watch_Date: string, Received_DtTm: string, Entry_DtTm: string, Dispatch_DtTm: string, Response_DtTm: string, On_Scene_DtTm: string, Transport_DtTm: string, Hospital_DtTm: string, Call_Final_Disposition: string, Available_DtTm: string, Address: string, City: string, Zipcode_of_Incident: int, Battalion: string, Station_Area: string, Box: string, Original_Priority: string, Priority: string, Final_Priority: int, ALS_Unit: boolean, Call_Type_Group: string, Number_of_Alarms: int, Unit_Type: string, Unit_sequence_in_call_dispatch: int, Fire_Prevention_District: string, Supervisor_District: string, Neighborhooods_-_Analysis_Boundaries: string, RowID: string, case_location: string, Analysis_Neighborhoods: int]

In [127]:
df.rdd.getNumPartitions()

6