Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# DRAFT! Don't start yet.
# P5 (4% of grade): Spark And Hive
## Overview
In P5, you'll use Spark to analyze competitive programming problems and their solutions. You'll load your data into Hive tables and views for easy querying. The main table contains numerous IDs in columns; you'll need to join these with other tables or views to determine what these IDs mean.
**Important:** You'll answer 10 questions in P5. Write each question number and text (e.g., "#q1: ...") as a comment in your notebook before each answer so we can easily find and grade your work.
Learning objectives:
- Use Spark's RDD, DataFrame, and SQL interfaces to answer questions about data
- Load data into Hive for querying with Spark
- grouping and optimizing queries
- Use PySpark's machine learning API to train a `DecisionTreeClassifier`
Before starting, please review the [general project directions](../projects.md).
## Corrections/Clarifications
## Setup
Copy these files from the project into your repository:
- `p5-base.Dockerfile`
- `namenode.Dockerfile`
- `notebook.Dockerfile`
- `datanode.Dockerfile`
- `docker-compose.yml`
- `build.sh`
- `requirements.txt`
- `get_data.py`
- `.gitignore`
Create a Python virtual environment and install the [datasets library](https://huggingface.co/docs/datasets/en/index):
```sh
pip3 install datasets==3.3.2
```
Create the directory structure with:
```sh
mkdir -p nb/data
```
Run the provided `get_data.py` script to download the [DeepMind CodeContests dataset](https://huggingface.co/datasets/deepmind/code_contests) and split it into `problems.jsonl` and `solutions.jsonl`.
### Docker Containers
```sh
docker build . -f p5-base.Dockerfile -t p5-base
docker build . -f notebook.Dockerfile -t p5-nb
docker build . -f namenode.Dockerfile -t p5-nn
docker build . -f datanode.Dockerfile -t p5-dn
docker build . -f boss.Dockerfile -t p5-boss
docker build . -f worker.Dockerfile -t p5-worker
```
A `build.sh` script is included for your convenience.
You can bring up your cluster like this:
```
export PROJECT=p5
docker compose up -d
```
### Jupyter Container
Connect to JupyterLab inside your container. Within the `nb` directory, create a notebook called `p5.ipynb`.
Run the following shell commands in a cell to upload the data:
```sh
hdfs dfs -D dfs.replication=1 -cp -f data/*.jsonl hdfs://nn:9000/
hdfs dfs -D dfs.replication=1 -cp -f data/*.csv hdfs://nn:9000/
```
### VS Code users
If you are using VS Code and remote SSH to work on your project, then the ports will already be forwarded for you. And you only need to go to: `http://127.0.0.1:5000/lab` in your terminal.
## Part 1: Filtering: RDDs, DataFrames, and Spark
Inside your `p5.ipynb` notebook, create a Spark session (note we're enabling Hive on HDFS):
```python
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
.master("spark://boss:7077")
.config("spark.executor.memory", "1G")
.config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate())
```
Load `hdfs://nn:9000/problems.jsonl` into a DataFrame. To verify success, run:
```python
problems_df.limit(5).show()
```
If loaded properly, you should see:

#### Q1: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "_A." (Case Sensitive)? Answer by directly using the RDD API.
Remember that if you have a Spark DataFrame `df`, you can get the underlying RDD using `df.rdd`.
#### Q2: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "_A." (Case Sensitive)? Answer by directly using the RDD API. Answer by using the DataFrame API.
This is the same question as Q1, and you should get the same answer. This is to give you to interact with Spark different ways.
#### Q3: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "_A." (Case Sensitive)? Answer by directly using the RDD API. Answer by using Spark SQL.
Before you can use `spark.sql`, write the problem data to a Hive table so that you can refer to it by name.
Again, the result after calling `count` should match your answers for Q1 and Q2.
## Part 2: Hive Data Warehouse
#### Q4: Does the query plan for a GROUP BY on solutions data need to shuffle/exchange rows if the data is pre-buckted?
Write the data from `solutions.jsonl` to a Hive table named `solutions`, like you did for `problems`. This time, though, bucket the data by "language" and use 4 buckets when writing to the table.
Use Spark SQL to explain the query plan for this query:
```sql
SELECT language, COUNT(*)
FROM solutions
GROUP BY language
```
The `explain` output suffices for your answer. Take note (for your own sake) whether any `Exchange` appears in the output. Think about why an exchange/shuffle is or is not needed between the `partial_count` and `count` aggregates.
After bucketing the solutions, call `.explain` on a query that counts solutions per language. This should output `== Physical Plan ==` followed by the plan details. You've bucketed correctly if `Bucketed: true` appears in the output.
#### Q5: What tables/views are in our warehouse?
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
You'll notice additional CSV files in `nb/data` that we haven't used yet. Create a Hive view for each using `createOrReplaceTempView`. Use these files:
```python
[
"languages", "problem_tests", "sources", "tags"
]
```
Answer with a Python dict like this:
```python
{'problems': False,
'solutions': False,
'languages': True,
'problem_tests': True,
'sources': True,
'tags': True}
```
The boolean indicates whether it is a temporary view (True) or table (False).
## Part 3: Caching and Transforming Data
#### Q6: How many correct PYTHON3 solutions are from CODEFORCES?
You may use any method for this question. Join the `solutions` table with the `problems` table using an inner join on the `problem_id` column. Note that the `source` column in `problems` is an integer. Join this column with the `source_id` column from the `sources` CSV. Find the number of correct `PYTHON3` solutions from `CODEFORCES`.
Answer Q6 with code and a single integer. **DO NOT HARDCODE THE CODEFORCES ID**.
#### Q7: How many problems are of easy/medium/hard difficulty?
The `problems_df` has a numeric `difficulty` column. For the purpose of categorizing the problems, interpret this number as follows:
- `<= 5` is `Easy`
- `<= 10` is `Medium`
- Otherwise `Hard`
Your answer should return this dictionary:
```python
{'Easy': 409, 'Medium': 5768, 'Hard': 2396}
```
**Hint:** https://www.w3schools.com/sql/sql_case.asp
#### Q8: Does caching make it faster to compute averages over a subset of a bigger dataset?
To test the impact of caching, we are going to do the same calculations with and without caching. Implement a query that first filters rows of `problem_tests` to get rows where `is_generated` is `False` -- use a variable to refer to the resulting DataFrame.
Write some code to compute the average `input_chars` and `output_chars` over this DataFrame. Then, write code to do an experiment as follows:
1. compute the averages
2. make a call to cache the data
3. compute the averages
4. compute the averages
5. uncache the data
Measure the number of seconds it takes each of the three times we do the average calculations.s_generated` filtering only. Answer with list of the three times, in order, as follows:
```
[0.48511195182800293, 0.47667789459228516, 0.1396317481994629]
```
Your numbers may vary significantly, but the final run should usually be the fastest.
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
## Part 4: Machine Learning with Spark
#### Q9: Preparing Data For Machine Learning
Join the query you wrote on the previous question with the `problems_df` on the unique `problem_id` key. You will then use Spark's [`VectorAssembler`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) to create the `features` vector. The input columns are (**order matters**) should be:
```python
['source','input_chars','output_chars','public_tests','private_tests','cf_rating']
```
Then transform your joined dataframe with the vector assembler so that there is now a `features` column.
Finally, use:
```python
train, test = df.sort('problem_id').randomSplit([0.8, 0.2], seed=2025)
```
to split the dataset. Return `[{**l.asDict(),'features':l.features.toArray().tolist()} for l in test.tail(3)]` as your answer. You should get:
```python
[{'source': 2,
'difficulty': 21,
'input_chars': 10.181818181818182,
'output_chars': 25.545454545454547,
'public_tests': 1,
'private_tests': 10,
'cf_rating': 1300,
'features': [2.0,
1300.0,
10.181818181818182,
25.545454545454547,
1.0,
10.0]},
{'source': 2,
'difficulty': 21,
'input_chars': 12.0,
'output_chars': 16.0,
'public_tests': 2,
'private_tests': 0,
'cf_rating': 3500,
'features': [2.0, 3500.0, 12.0, 16.0, 2.0, 0.0]},
{'source': 2,
'difficulty': 24,
'input_chars': 9.8,
'output_chars': 18.2,
'public_tests': 1,
'private_tests': 9,
'cf_rating': 1700,
'features': [2.0, 1700.0, 9.8, 18.2, 1.0, 9.0]}]
```
**IMPORTANT:** Sorting by the unique `problem_id` and setting the seed to `2025` is CRUCIAL to ensure determinism.
#### Q10: Training a model with depths `[1,5,10,15,20]`
With your transformed dataset, you will train 5 different `DecisionTreeClassifier` models on the `features` column. The task is to predict the `difficulty`. The only parameter of the `DecisionTreeClassifier` you will change is the depth by iterating over `[1,5,10,15,20]`.
Use the models to make predictions on the test data. What are their
*accuracy* (fraction of times the model is correct)?
Answer with the dict:
```python
{'depth=1': 0.2925207756232687,
'depth=5': 0.41440443213296396,
'depth=10': 0.3767313019390582,
'depth=15': 0.32797783933518004,
'depth=20': 0.2919667590027701}
```
## Submission
We should be able to run the following on your submission to directly create the mini cluster:
```
docker build . -f p5-base.Dockerfile -t p5-base
docker build . -f notebook.Dockerfile -t p5-nb
docker build . -f namenode.Dockerfile -t p5-nn
docker build . -f datanode.Dockerfile -t p5-dn
docker build . -f boss.Dockerfile -t p5-boss
docker build . -f worker.Dockerfile -t p5-worker
export PROJECT=p5
docker compose up -d
```
We should then be able to open `http://localhost:5000/lab`, find your
notebook, and run it.
## Tester
Please be sure that your installed `autobadger` is on version `0.1.8`. You can print the version using
```bash
autobadger --info
```
See [projects.md](https://git.doit.wisc.edu/cdis/cs/courses/cs544/s25/main/-/blob/main/projects.md#testing) for more information.