Skip to content
Snippets Groups Projects
Commit b7f2059b authored by tylerharter's avatar tylerharter
Browse files

P5 draft

parent 935a941b
No related branches found
No related tags found
No related merge requests found
venv/*
nb/.ipynb_checkpoints/*
nb/data/*
nb/metastore_db/*
nb/derby.log
\ No newline at end of file
# DRAFT! Don't start yet.
# P5 (4% of grade): Spark And Hive
## Overview
In P5, you'll use Spark to analyze competitive programming problems and their solutions. You'll load your data into Hive tables and views for easy querying. The main table contains numerous IDs in columns; you'll need to join these with other tables or views to determine what these IDs mean.
**Important:** You'll answer 10 questions in P5. Write each question number and text (e.g., "#q1: ...") as a comment in your notebook before each answer so we can easily find and grade your work.
Learning objectives:
- Use Spark's RDD, DataFrame, and SQL interfaces to answer questions about data
- Load data into Hive for querying with Spark
- grouping and optimizing queries
- Use PySpark's machine learning API to train a `DecisionTreeClassifier`
Before starting, please review the [general project directions](../projects.md).
## Corrections/Clarifications
## Setup
Copy these files from the project into your repository:
- `p5-base.Dockerfile`
- `namenode.Dockerfile`
- `notebook.Dockerfile`
- `datanode.Dockerfile`
- `docker-compose.yml`
- `build.sh`
- `requirements.txt`
- `get_data.py`
- `.gitignore`
Create a Python virtual environment and install the [datasets library](https://huggingface.co/docs/datasets/en/index):
```sh
pip3 install datasets==3.3.2
```
Create the directory structure with:
```sh
mkdir -p nb/data
```
Run the provided `get_data.py` script to download the [DeepMind CodeContests dataset](https://huggingface.co/datasets/deepmind/code_contests) and split it into `problems.jsonl` and `solutions.jsonl`.
### Docker Containers
```sh
docker build . -f p5-base.Dockerfile -t p5-base
docker build . -f notebook.Dockerfile -t p5-nb
docker build . -f namenode.Dockerfile -t p5-nn
docker build . -f datanode.Dockerfile -t p5-dn
docker build . -f boss.Dockerfile -t p5-boss
docker build . -f worker.Dockerfile -t p5-worker
```
A `build.sh` script is included for your convenience.
You can bring up your cluster like this:
```
export PROJECT=p5
docker compose up -d
```
### Jupyter Container
Connect to JupyterLab inside your container. Within the `nb` directory, create a notebook called `p5.ipynb`.
Run the following shell commands in a cell to upload the data:
```sh
hdfs dfs -D dfs.replication=1 -cp -f data/*.jsonl hdfs://nn:9000/
hdfs dfs -D dfs.replication=1 -cp -f data/*.csv hdfs://nn:9000/
```
### VS Code users
If you are using VS Code and remote SSH to work on your project, then the ports will already be forwarded for you. And you only need to go to: `http://127.0.0.1:5000/lab` in your terminal.
## Part 1: Filtering: RDDs, DataFrames, and Spark
Inside your `p5.ipynb` notebook, create a Spark session (note we're enabling Hive on HDFS):
```python
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
.master("spark://boss:7077")
.config("spark.executor.memory", "1G")
.config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate())
```
Load `hdfs://nn:9000/problems.jsonl` into a DataFrame. To verify success, run:
```python
problems_df.limit(5).show()
```
If loaded properly, you should see:
![image.png](image.png)
#### Q1: Filtering with **RDD**
Find the number of problems that meet all these criteria:
- `cf_rating` of at least 1600
- Has `private_tests`
- The problem `name` contains `"_A."` (**Case Sensitive**)
You must use `problems_df.rdd` to answer this question. After calling `filter`, call `count` to get the final answer.
**REMEMBER TO INCLUDE `#q1` AT THE TOP OF THIS CELL**
#### Q2: Filtering with **DataFrames** and **expr**
Solve the same problem as Q1, but you **must** use `problems_df.filter` and `expr`. Again, call `count` to get the final answer. If done correctly, you should get the same answer as in Q1.
#### Q3: Filtering with **Spark SQL**
Solve the same problem as the previous two questions, but this time use `spark.sql`. First, write the problems to a table:
```python
problems_df.write.saveAsTable("problems", mode="overwrite")
```
Again, the result after calling `count` should match your answers for Q1 and Q2.
## Part 2: Hive Data Warehouse
#### Q4: Bucketing `solutions` table
We've already added the `problems` table to Hive. Now, create the `solutions` table from `solutions.jsonl`.
Unlike the `problems` table, use [`bucketBy`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.bucketBy.html) with your `writeTable` call to create 4 buckets on the column `"language"`. This divides your data into 4 buckets/groups, with all rows having the same language in the same bucket. This makes some queries faster (for example, when you `GROUP BY` on `language`, Spark might avoid shuffling data across partitions/machines).
After bucketing the solutions, call `.explain` on a query that counts solutions per language. This should output `== Physical Plan ==` followed by the plan details. You've bucketed correctly if `Bucketed: true` appears in the output.
#### Q5: What tables are in our warehouse?
You'll notice additional CSV files in `nb/data` that we haven't used yet. Create a Hive view for each using `createOrReplaceTempView`. Use these files:
```python
[
"languages", "problem_tests", "sources", "tags"
]
```
Answer with a Python dict like this:
```python
{'problems': False,
'solutions': False,
'languages': True,
'problem_tests': True,
'sources': True,
'tags': True}
```
The boolean indicates whether it is a temporary view (True) or table (False).
## Part 3: Caching and Transforming Data
#### Q6: How many correct PYTHON3 solutions are from CODEFORCES?
You may use any method for this question. Join the `solutions` table with the `problems` table using an inner join on the `problem_id` column. Note that the `source` column in `problems` is an integer. Join this column with the `source_id` column from the `sources` CSV. Find the number of correct `PYTHON3` solutions from `CODEFORCES`.
Answer Q6 with code and a single integer. **DO NOT HARDCODE THE CODEFORCES ID**.
#### Q7: Converting `difficulty` to a human-readable string
The `problems_df` has a `difficulty` column. Create a new column using the [`withColumn` function](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumn.html) called `difficulty_str` with these rules:
- `<= 5` is `Easy`
- `<= 10` is `Medium`
- Otherwise `Hard`
Your answer should return this dictionary:
```python
{'Easy': 409, 'Medium': 5768, 'Hard': 2396}
```
**Hint:** https://www.w3schools.com/sql/sql_case.asp
#### Q8: Spark Caching
To test the impact of caching, we are going to run the same query with and without cached data. Implement the query that first filters rows where `is_generated` is `False`, then get the average `input_chars` and `output_chars` by problem id. You should cache the `is_generated` filtering only.
TODO: 3 runs
## Part 4: Machine Learning with Spark
#### Q9: Preparing Data For Machine Learning
Join the query you wrote on the previous question with the `problems_df` on the unique `problem_id` key. You will then use Spark's [`VectorAssembler`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) to create the `features` vector. The input columns are (**order matters**) should be:
```python
['source','input_chars','output_chars','public_tests','private_tests','cf_rating']
```
Then transform your joined dataframe with the vector assembler so that there is now a `features` column.
Finally, use:
```python
train, test = df.sort('problem_id').randomSplit([0.8, 0.2], seed=2025)
```
to split the dataset. Return `[{**l.asDict(),'features':l.features.toArray().tolist()} for l in test.tail(3)]` as your answer. You should get:
```python
[{'source': 2,
'difficulty': 21,
'input_chars': 10.181818181818182,
'output_chars': 25.545454545454547,
'public_tests': 1,
'private_tests': 10,
'cf_rating': 1300,
'features': [2.0,
1300.0,
10.181818181818182,
25.545454545454547,
1.0,
10.0]},
{'source': 2,
'difficulty': 21,
'input_chars': 12.0,
'output_chars': 16.0,
'public_tests': 2,
'private_tests': 0,
'cf_rating': 3500,
'features': [2.0, 3500.0, 12.0, 16.0, 2.0, 0.0]},
{'source': 2,
'difficulty': 24,
'input_chars': 9.8,
'output_chars': 18.2,
'public_tests': 1,
'private_tests': 9,
'cf_rating': 1700,
'features': [2.0, 1700.0, 9.8, 18.2, 1.0, 9.0]}]
```
**IMPORTANT:** Sorting by the unique `problem_id` and setting the seed to `2025` is CRUCIAL to ensure determinism.
#### Q10: Training a model with depths `[1,5,10,15,20]`
With your transformed dataset, you will train 5 different `DecisionTreeClassifier` models on the `features` column. The task is to predict the `difficulty`. The only parameter of the `DecisionTreeClassifier` you will change is the depth by iterating over `[1,5,10,15,20]`.
Use the models to make predictions on the test data. What are their
*accuracy* (fraction of times the model is correct)?
Answer with the dict:
```python
{'depth=1': 0.2925207756232687,
'depth=5': 0.41440443213296396,
'depth=10': 0.3767313019390582,
'depth=15': 0.32797783933518004,
'depth=20': 0.2919667590027701}
```
## Submission
We should be able to run the following on your submission to directly create the mini cluster:
```
docker build . -f p5-base.Dockerfile -t p5-base
docker build . -f notebook.Dockerfile -t p5-nb
docker build . -f namenode.Dockerfile -t p5-nn
docker build . -f datanode.Dockerfile -t p5-dn
docker build . -f boss.Dockerfile -t p5-boss
docker build . -f worker.Dockerfile -t p5-worker
export PROJECT=p5
docker compose up -d
```
We should then be able to open `http://localhost:5000/lab`, find your
notebook, and run it.
## Tester
Please be sure that your installed `autobadger` is on version `0.1.8`. You can print the version using
```bash
autobadger --info
```
See [projects.md](https://git.doit.wisc.edu/cdis/cs/courses/cs544/s25/main/-/blob/main/projects.md#testing) for more information.
FROM p5-base
CMD sh -c "/spark-3.5.5-bin-hadoop3/sbin/start-master.sh && tail -f /spark-3.5.5-bin-hadoop3/logs/*.out"
#!/bin/bash
# Create the host volume directory if it doesn't exist
docker build \
--rm \
-f p5-base.Dockerfile \
-t p5-base .
# Build the child containers
docker build --rm . -f notebook.Dockerfile -t p5-nb
docker build --rm . -f namenode.Dockerfile -t p5-nn
docker build --rm . -f datanode.Dockerfile -t p5-dn
docker build --rm . -f boss.Dockerfile -t p5-boss
docker build --rm . -f worker.Dockerfile -t p5-worker
FROM p5-base
CMD ["bash", "-c", "hdfs datanode -fs hdfs://nn:9000"]
name: ${PROJECT}
services:
nb:
image: ${PROJECT}-nb
ports:
- "127.0.0.1:5000:5000"
- "127.0.0.1:4040:4040"
volumes:
- "./nb:/nb"
deploy:
resources:
limits:
memory: 1.5G
nn:
image: ${PROJECT}-nn
hostname: nn
deploy:
resources:
limits:
memory: 1G
dn:
image: ${PROJECT}-dn
depends_on:
- nn
deploy:
resources:
limits:
memory: 1G
spark-boss:
image: ${PROJECT}-boss
hostname: boss
deploy:
resources:
limits:
memory: 0.5G
spark-worker:
image: ${PROJECT}-worker
deploy:
replicas: 2
resources:
limits:
memory: 2G
from datasets import load_dataset # Import Hugging Face's datasets library
from pathlib import Path # For handling file paths in a cross-platform way
import tempfile # For creating temporary directories
import random # For random sampling and shuffling
import json # For reading/writing CSV files
from collections import defaultdict # For grouping solutions by language
from tqdm import tqdm
# Get the directory where the script is located
current_file = Path(__file__).parent
# Check if the script is being run from the correct directory
if not current_file.name.startswith("p5"):
print("Please run this script from the p5 directory!!")
print(f"Current directory: {current_file.absolute()}")
exit(1)
# Check if the required 'nb' directory exists
if not Path("nb").exists():
print("No 'nb' directory found. Refer to the README.md and make it.")
exit(1)
# Check if the required 'nb/data' directory exists
if not Path("nb/data").exists():
print("No 'nb/data' directory found. Refer to the README.md and make it.")
exit(1)
print("Splitting the CodeContests dataset into 'problems.jsonl' and 'solutions.jsonl'")
SEED = 42 # Set a random seed for reproducibility
random.seed(SEED)
# Define a mapping from numerical language IDs to human-readable language names
LANGUAGE_MAP = {
0: "UNKNOWN_LANGUAGE",
1: "PYTHON2",
2: "CPP",
3: "PYTHON3",
4: "JAVA",
}
with open("nb/data/languages.csv", "w") as f:
f.write("language,language_name\n")
for k, v in LANGUAGE_MAP.items():
f.write(f"{k},{v}\n")
SOURCE_MAP = {
0: "UNKNOWN_SOURCE",
1: "CODECHEF",
2: "CODEFORCES",
3: "HACKEREARTH",
4: "CODEJAM",
5: "ATCODER",
6: "AIZU",
}
with open("nb/data/sources.csv", "w") as f:
f.write("source,source_name\n")
for k, v in SOURCE_MAP.items():
f.write(f"{k},{v}\n")
# Define the set of keys that will be extracted for problem data
problem_keys = {
"name",
"source",
"difficulty",
"cf_contest_id",
"cf_index",
"cf_points",
"cf_rating",
"is_description_translated",
"memory_limit_bytes",
}
TAG_MAPS = {}
PROB_ID_MAP = {}
# Define output file paths
problems_path = Path("nb/data/problems.jsonl")
solutions_path = Path("nb/data/solutions.jsonl")
num_removed = 0
prob_id_counter = 0
# Create a temporary directory to download and cache the dataset
with tempfile.TemporaryDirectory() as tmpdirname:
# Load the DeepMind Code Contests dataset
dataset = load_dataset(
"deepmind/code_contests",
split="train", # Use the training split
streaming=True, # Stream the dataset to handle its large size
cache_dir=tmpdirname, # Store the cache in the temporary directory
)
dataset = dataset.shuffle(SEED)
with Path("nb/data/problem_tests.csv").open("w") as test_file:
test_file.write(
"problem_id,input_chars,output_chars,is_public,is_generated,is_private,output_is_number\n"
)
# Open both output files for writing
with problems_path.open("w") as problems_fd:
with solutions_path.open("w") as solutions_fd:
problems_saved = 0 # Counter for saved problems
# Process each problem in the dataset
for task in tqdm(dataset, total=10_000, desc="Processing problems"):
# Extract problem data for the relevant keys
problem_id = prob_id_counter
prob_id_counter += 1
prob_dict = {
"problem_id": problem_id,
**{k: task[k] for k in problem_keys},
}
if prob_dict["difficulty"] == 0:
num_removed += 1
continue
total_tests = 0
# Check if test data is available for each test type
for t_name in ["public", "private", "generated"]:
num_save = 0
for ti, to in zip(
task[f"{t_name}_tests"]["input"],
task[f"{t_name}_tests"]["output"],
):
test_file.write(
",".join(
(
str(problem_id),
f"{len(ti)}",
f"{len(to)}",
f"{t_name == 'public'}",
f"{t_name == 'generated'}",
f"{t_name == 'private'}",
f"{to.isnumeric()}",
)
)
+ "\n"
)
num_save += 1
if t_name in {"public", "private"}:
total_tests += 1
if num_save >= 30:
break
prob_dict[f"{t_name}_tests"] = len(
task.get(f"{t_name}_tests", {"input": []})["input"]
)
if total_tests == 0:
num_removed += 1
continue
prob_dict["cf_tags"] = []
for t in task["cf_tags"]:
if t not in TAG_MAPS:
TAG_MAPS[t] = len(TAG_MAPS)
prob_dict["cf_tags"].append(TAG_MAPS[t])
# Extract time limit (if available)
prob_dict["time_limit"] = (
-1
if task["time_limit"] is None
else task["time_limit"]["seconds"]
)
sols = [] # Initialize solutions list (note: not used later)
# Process both correct and incorrect solutions
for p, sol_dict in [
(True, task["solutions"]), # Correct solutions
(False, task["incorrect_solutions"]), # Incorrect solutions
]:
# Group solutions by programming language
language_sols = defaultdict(list)
for i, sol in enumerate(sol_dict["solution"]):
language_sols[sol_dict["language"][i]].append(sol)
has_printed = False
# For each language, randomly sample a small number of solutions
# (to save space, we're not keeping all solutions)
for lang, sols in language_sols.items():
# Take between 1-3 random solutions per language
to_save = random.sample(
sols, k=min(len(sols), random.randint(1, 3))
)
for sol in to_save:
# Truncate solutions that are too long
if len(sol) > 4096:
sol = sol[:4096] + "...TRUNCATED"
save_sol_dict = {
"problem_id": problem_id,
"language": LANGUAGE_MAP[
lang
], # Convert language ID to name
"is_correct": p, # Whether this is a correct solution
"solution": sol, # The code solution
}
# Write the solution to the CSV
solutions_fd.write(json.dumps(save_sol_dict) + "\n")
# Write the problem data to the CSV
problems_fd.write(json.dumps(prob_dict) + "\n")
problems_saved += 1
if problems_saved >= 10_000:
break
with Path("nb/data/tags.csv").open("w") as f:
f.write("tag_id,tag\n")
for k, v in TAG_MAPS.items():
if not k:
continue
f.write(f"{v},{k}\n")
print(f"Removed {num_removed:,} problems")
p5/image.png

119 KiB

FROM p5-base
RUN hdfs namenode -format -force
CMD ["bash", "-c", "hdfs namenode -format -force && hdfs namenode -fs hdfs://nn:9000"]
FROM p5-base
RUN apt-get update; apt-get install -y unzip
CMD ["python3", "-m", "jupyterlab", "--no-browser", "--ip=0.0.0.0", "--port=5000", "--allow-root", "--NotebookApp.token=''"]
FROM ubuntu:22.04
# Install required packages
RUN apt-get update && apt-get install -y \
wget \
curl \
openjdk-11-jdk \
python3-pip \
net-tools \
lsof \
nano \
sudo
# Copy requirements file
COPY requirements.txt /requirements.txt
# Install Python dependencies
RUN pip3 install -r /requirements.txt
# Download and extract Hadoop
RUN wget https://dlcdn.apache.org/hadoop/common/hadoop-3.4.1/hadoop-3.4.1.tar.gz && \
tar -xf hadoop-3.4.1.tar.gz && \
rm hadoop-3.4.1.tar.gz
# Download and extract Spark
RUN wget https://dlcdn.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz && \
tar -xf spark-3.5.5-bin-hadoop3.tgz && \
rm spark-3.5.5-bin-hadoop3.tgz
# Set environment variables
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ENV PATH="${PATH}:/hadoop-3.4.1/bin"
ENV HADOOP_HOME=/hadoop-3.4.1
FROM p5-base
CMD sh -c "/spark-3.5.5-bin-hadoop3/sbin/start-worker.sh spark://boss:7077 -c 2 -m 2G && tail -f /spark-3.5.5-bin-hadoop3/logs/*.out"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment