Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cdis/cs/courses/cs544/s25/main
  • zzhang2478/main
  • spark667/main
  • vijayprabhak/main
  • vijayprabhak/544-main
  • wyang338/cs-544-s-25
  • jmin39/main
7 results
Show changes
Commits on Source (20)
This diff is collapsed.
FROM ubuntu:24.04
RUN apt-get update; apt-get install -y wget curl openjdk-11-jdk python3-pip nano
# SPARK
#RUN wget https://archive.apache.org/dist/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz && tar -xf spark-3.5.5-bin-hadoop3.tgz && rm spark-3.5.5-bin-hadoop3.tgz
RUN wget https://dlcdn.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz && tar -xf spark-3.5.5-bin-hadoop3.tgz && rm spark-3.5.5-bin-hadoop3.tgz
# HDFS
RUN wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz && tar -xf hadoop-3.3.6.tar.gz && rm hadoop-3.3.6.tar.gz
# Jupyter
RUN pip3 install jupyterlab==4.3.5 pandas==2.2.3 pyspark==3.5.5 matplotlib==3.10.1 --break-system-packages
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ENV PATH="${PATH}:/hadoop-3.3.6/bin"
ENV HADOOP_HOME=/hadoop-3.3.6
services:
nb:
image: spark-demo
ports:
- "127.0.0.1:5000:5000"
- "127.0.0.1:4040:4040"
volumes:
- "./nb:/nb"
command: python3 -m jupyterlab --no-browser --ip=0.0.0.0 --port=5000 --allow-root --NotebookApp.token=''
nn:
image: spark-demo
hostname: nn
command: sh -c "hdfs namenode -format -force && hdfs namenode -D dfs.replication=1 -fs hdfs://nn:9000"
dn:
image: spark-demo
command: hdfs datanode -fs hdfs://nn:9000
spark-boss:
image: spark-demo
hostname: boss
command: sh -c "/spark-3.5.5-bin-hadoop3/sbin/start-master.sh && sleep infinity"
spark-worker:
image: spark-demo
command: sh -c "/spark-3.5.5-bin-hadoop3/sbin/start-worker.sh spark://boss:7077 -c 2 -m 2g && sleep infinity"
deploy:
replicas: 2
date,holiday
01/01/2013,New Year's Day
01/01/2014,New Year's Day
01/01/2015,New Year's Day
01/01/2016,New Year's Day
01/01/2018,New Year's Day
01/01/2019,New Year's Day
01/01/2020,New Year's Day
01/01/2021,New Year's Day
01/02/2012,New Year's Day
01/02/2017,New Year's Day
01/15/2018,"Birthday of Martin Luther King, Jr."
01/16/2012,"Birthday of Martin Luther King, Jr."
01/16/2017,"Birthday of Martin Luther King, Jr."
01/17/2011,"Birthday of Martin Luther King, Jr."
01/17/2022,"Birthday of Martin Luther King, Jr."
01/18/2016,"Birthday of Martin Luther King, Jr."
01/18/2021,"Birthday of Martin Luther King, Jr."
01/19/2015,"Birthday of Martin Luther King, Jr."
01/20/2014,"Birthday of Martin Luther King, Jr."
01/20/2020,"Birthday of Martin Luther King, Jr."
01/20/2021,Inauguration Day
01/21/2013,"Birthday of Martin Luther King, Jr."
01/21/2019,"Birthday of Martin Luther King, Jr."
02/15/2016,Washington's Birthday
02/15/2021,Washington's Birthday
02/16/2015,Washington's Birthday
02/17/2014,Washington's Birthday
02/17/2020,Washington's Birthday
02/18/2013,Washington's Birthday
02/18/2019,Washington's Birthday
02/19/2018,Washington's Birthday
02/20/2012,Washington's Birthday
02/20/2017,Washington's Birthday
02/21/2011,Washington's Birthday
02/21/2022,Washington's Birthday
05/25/2015,Memorial Day
05/25/2020,Memorial Day
05/26/2014,Memorial Day
05/27/2013,Memorial Day
05/27/2019,Memorial Day
05/28/2012,Memorial Day
05/28/2018,Memorial Day
05/29/2017,Memorial Day
05/30/2011,Memorial Day
05/30/2016,Memorial Day
05/30/2022,Memorial Day
05/31/2021,Memorial Day
06/18/2021,Juneteenth National Independence Day
06/20/2022,Juneteenth National Independence Day
07/03/2015,Independence Day
07/03/2020,Independence Day
07/04/2011,Independence Day
07/04/2012,Independence Day
07/04/2013,Independence Day
07/04/2014,Independence Day
07/04/2016,Independence Day
07/04/2017,Independence Day
07/04/2018,Independence Day
07/04/2019,Independence Day
07/04/2022,Independence Day
07/05/2021,Independence Day
09/01/2014,Labor Day
09/02/2013,Labor Day
09/02/2019,Labor Day
09/03/2012,Labor Day
09/03/2018,Labor Day
09/04/2017,Labor Day
09/05/2011,Labor Day
09/05/2016,Labor Day
09/05/2022,Labor Day
09/06/2021,Labor Day
09/07/2015,Labor Day
09/07/2020,Labor Day
10/08/2012,Columbus Day
10/08/2018,Columbus Day
10/09/2017,Columbus Day
10/10/2011,Columbus Day
10/10/2016,Columbus Day
10/10/2022,Columbus Day
10/11/2021,Columbus Day
10/12/2015,Columbus Day
10/12/2020,Columbus Day
10/13/2014,Columbus Day
10/14/2013,Columbus Day
10/14/2019,Columbus Day
11/10/2017,Veterans Day
11/11/2011,Veterans Day
11/11/2013,Veterans Day
11/11/2014,Veterans Day
11/11/2015,Veterans Day
11/11/2016,Veterans Day
11/11/2019,Veterans Day
11/11/2020,Veterans Day
11/11/2021,Veterans Day
11/11/2022,Veterans Day
11/12/2012,Veterans Day
11/12/2018,Veterans Day
11/22/2012,Thanksgiving Day
11/22/2018,Thanksgiving Day
11/23/2017,Thanksgiving Day
11/24/2011,Thanksgiving Day
11/24/2016,Thanksgiving Day
11/24/2022,Thanksgiving Day
11/25/2021,Thanksgiving Day
11/26/2015,Thanksgiving Day
11/26/2020,Thanksgiving Day
11/27/2014,Thanksgiving Day
11/28/2013,Thanksgiving Day
11/28/2019,Thanksgiving Day
12/24/2021,Christmas Day
12/25/2012,Christmas Day
12/25/2013,Christmas Day
12/25/2014,Christmas Day
12/25/2015,Christmas Day
12/25/2017,Christmas Day
12/25/2018,Christmas Day
12/25/2019,Christmas Day
12/25/2020,Christmas Day
12/26/2011,Christmas Day
12/26/2016,Christmas Day
12/26/2022,Christmas Day
12/31/2022,New Year's Day
This diff is collapsed.
This diff is collapsed.
%% Cell type:code id:c8dca847-54af-4284-97d8-0682e88a6e8d tags:
``` python
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
.master("spark://boss:7077")
.config("spark.executor.memory", "2G")
.config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate())
```
%% Output
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/27 01:41:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
%% Cell type:code id:2294e4e0-ab19-496c-980f-31df757e7837 tags:
``` python
!hdfs dfs -cp sf.csv hdfs://nn:9000/sf.csv
```
%% Cell type:code id:cb54bacc-b52a-4c25-93d2-2ba0f61de9b0 tags:
``` python
df = (spark.read.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("hdfs://nn:9000/sf.csv"))
```
%% Output
%% Cell type:code id:c1298818-83f6-444b-b8a0-4be5b16fd6fb tags:
``` python
from pyspark.sql.functions import col, expr
cols = [col(c).alias(c.replace(" ", "_")) for c in df.columns]
df.select(cols).write.format("parquet").save("hdfs://nn:9000/sf.parquet")
```
%% Output
23/10/27 01:43:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
%% Cell type:code id:37d1ded3-ed8a-4e39-94cb-dd3a3272af91 tags:
``` python
!hdfs dfs -rm hdfs://nn:9000/sf.csv
```
%% Cell type:code id:abea48b5-e012-4ae2-a53a-e40350f94e20 tags:
``` python
df = spark.read.format("parquet").load("hdfs://nn:9000/sf.parquet")
```
......@@ -18,6 +18,17 @@ Before starting, please review the [general project directions](../projects.md).
- Mar 5: Fix the wrong expected file size in Part 1 and sum of blocks in Part 2.
- Mar 6: Released `autobadger` for `p4` (`0.1.6`)
- Mar 7:
- Some minor updates on p4 `Readme.md`.
- Update `autobadgere` to version `0.1.7`
- Fixed exception handling, now Autobadger can correctly print error messages.
- Expanded the expected file size range in test4 `test_Hdfs_size`.
- Make the error messages clearer.
## Introduction
You'll need to deploy a system including 6 docker containers like this:
......@@ -30,7 +41,7 @@ The data flow roughly follows this:
We have provided the other components; what you only need is to complete the work within the gRPC server and its Dockerfile.
### Client
This project will use `docker exec -it` to run the client on the gRPC server's container. Usage of `client.py` is as follows:
This project will use `docker exec` to run the client on the gRPC server's container. Usage of `client.py` is as follows:
```
#Inside the server container
python3 client.py DbToHdfs
......@@ -69,7 +80,7 @@ export PROJECT=p4
**Hint 2:** Think about whether there is any .sh script that will help you quickly test code changes. For example, you may want it to rebuild your Dockerfiles, cleanup an old Compose cluster, and deploy a new cluster.
**Hint 3:** If you're low on disk space, consider running `docker system prune -a --volumes -f`
**Hint 3:** If you're low on disk space, consider running `docker system prune --volumes -f`
## Part 1: `DbToHdfs` gRPC Call
......@@ -97,15 +108,17 @@ In this part, your task is to implement the `DbToHdfs` gRPC call (you can find t
3. Filter all rows where `loan_amount` is **greater than 30,000** and **less than 800,000**. After filtering, this table should have only **426716** rows.
4. Upload the generated table to `/hdma-wi-2021.parquet` in the HDFS, with **2x** replication and a **1-MB** block size, using PyArrow (https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html).
To check whether the upload was correct, you can use `docker exec -it` to enter the gRPC server's container and use HDFS command `hdfs dfs -du -h <path>`to see the file size. The expected result is:
To check whether the upload was correct, you can use `docker exec -it <container_name> bash` to enter the gRPC server's container and use HDFS command `hdfs dfs -du -h <path>`to see the file size. The expected result should like:
```
14.4 M 28.9 M hdfs://nn:9000/hdma-wi-2021.parquet
```
Note: Your file size might have slight difference from this.
>That's because when we join two tables, rows from one table get matches with rows in the other, but the order of output rows is not guaranteed. If we have the same rows in a different order, the compressibility of snappy (used by Parquet by default) will vary because it is based on compression windows, and there may be more or less redundancy in a window depending on row ordering.
**Hint 1:** We used similar tables in lecture: https://git.doit.wisc.edu/cdis/cs/courses/cs544/s25/main/-/tree/main/lec/15-sql
**Hint 2:** To get more familiar with these tables, you can use SQL queries to print the table schema or retrieve sample data. A convenient way to do this is to use `docker exec -it` to enter the SQL Server, then run mysql client `mysql -p CS544` to access the SQL Server and then perform queries.
**Hint 2:** To get more familiar with these tables, you can use SQL queries to print the table schema or retrieve sample data. A convenient way to do this is to use `docker exec -it <container name> bash` to enter the SQL Server, then run mysql client `mysql -p CS544` to access the SQL Server and then perform queries.
**Hint 3:** After `docker compose up`, the SQL Server needs some time to load the data before it is ready. Therefore, you need to wait for a while, or preferably, add a retry mechanism for the SQL connection.
......@@ -195,9 +208,9 @@ docker compose up -d
Then run the client like this:
```
docker exec -it p4-server-1 python3 /client.py DbToHdfs
docker exec -it p4-server-1 python3 /client.py BlockLocations -f /hdma-wi-2021.parquet
docker exec -it p4-server-1 python3 /client.py CalcAvgLoan -c 55001
docker exec p4-server-1 python3 /client.py DbToHdfs
docker exec p4-server-1 python3 /client.py BlockLocations -f /hdma-wi-2021.parquet
docker exec p4-server-1 python3 /client.py CalcAvgLoan -c 55001
```
Note that we will copy in the the provided files (docker-compose.yml, client.py, lender.proto, hdma-wi-2021.sql.gz, etc.), overwriting anything you might have changed. Please do NOT push hdma-wi-2021.sql.gz to your repo because it is large, and we want to keep the repos small.
......@@ -206,4 +219,11 @@ Please make sure you have `client.py` copied into the p4-server image. We will r
## Tester
Not released yet.
Please be sure that your installed `autobadger` is on version `0.1.7`. You can print the version using
```bash
autobadger --info
```
See [projects.md](https://git.doit.wisc.edu/cdis/cs/courses/cs544/s25/main/-/blob/main/projects.md#testing) for more information.
venv/*
nb/.ipynb_checkpoints/*
nb/data/*
nb/metastore_db/*
nb/derby.log
\ No newline at end of file
# P5 (4% of grade): Spark And Hive
## Overview
In P5, you'll use Spark to analyze competitive programming problems and their solutions. You'll load your data into Hive tables and views for easy querying. The main table contains numerous IDs in columns; you'll need to join these with other tables or views to determine what these IDs mean.
**Important:** You'll answer 10 questions in P5. Write each question number and text (e.g., "#q1: ...") as a comment in your notebook before each answer so we can easily find and grade your work.
Learning objectives:
- Use Spark's RDD, DataFrame, and SQL interfaces to answer questions about data
- Load data into Hive for querying with Spark
- grouping and optimizing queries
- Use PySpark's machine learning API to train a decision tree
Before starting, please review the [general project directions](../projects.md).
## Corrections/Clarifications
* none yet
## Setup
Copy these files from the project into your repository:
- `p5-base.Dockerfile`
- `namenode.Dockerfile`
- `notebook.Dockerfile`
- `datanode.Dockerfile`
- `docker-compose.yml`
- `build.sh`
- `get_data.py`
- `.gitignore`
Create a Python virtual environment and install the [datasets library](https://huggingface.co/docs/datasets/en/index):
```sh
pip3 install datasets==3.3.2
```
Create the directory structure with:
```sh
mkdir -p nb/data
```
Run the provided `get_data.py` script to download the [DeepMind CodeContests dataset](https://huggingface.co/datasets/deepmind/code_contests) and split it into `problems.jsonl` and `solutions.jsonl`.
### Docker Containers
```sh
docker build . -f p5-base.Dockerfile -t p5-base
docker build . -f notebook.Dockerfile -t p5-nb
docker build . -f namenode.Dockerfile -t p5-nn
docker build . -f datanode.Dockerfile -t p5-dn
docker build . -f boss.Dockerfile -t p5-boss
docker build . -f worker.Dockerfile -t p5-worker
```
Note that you need to write a `boss.Dockerfile` and `worker.Dockerfile`. A `build.sh` script is included for your convenience.
You can bring up your cluster like this:
```
export PROJECT=p5
docker compose up -d
```
### Jupyter Container
Connect to JupyterLab inside your container. Within the `nb` directory, create a notebook called `p5.ipynb`.
Run the following shell commands in a cell to upload the data:
```sh
hdfs dfs -D dfs.replication=1 -cp -f data/*.jsonl hdfs://nn:9000/
hdfs dfs -D dfs.replication=1 -cp -f data/*.csv hdfs://nn:9000/
```
### VS Code users
If you are using VS Code and remote SSH to work on your project, then the ports will already be forwarded for you. And you only need to go to: `http://127.0.0.1:5000/lab` in your terminal.
## Part 1: Filtering: RDDs, DataFrames, and Spark
Inside your `p5.ipynb` notebook, create a Spark session (note we're enabling Hive on HDFS):
```python
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
.master("spark://boss:7077")
.config("spark.executor.memory", "1G")
.config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate())
```
Load `hdfs://nn:9000/problems.jsonl` into a DataFrame. To verify success, run:
```python
problems_df.limit(5).show()
```
If loaded properly, you should see:
![image.png](image.png)
#### Q1: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "\_A." (Case Sensitive)? Answer by directly using the RDD API.
Remember that if you have a Spark DataFrame `df`, you can get the underlying RDD using `df.rdd`.
**REMEMBER TO INCLUDE `#q1` AT THE TOP OF THIS CELL**
#### Q2: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "\_A." (Case Sensitive)? Answer by using the DataFrame API.
This is the same question as Q1, and you should get the same answer. This is to give you to interact with Spark different ways.
#### Q3: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "\_A." (Case Sensitive)? Answer by using Spark SQL.
Before you can use `spark.sql`, write the problem data to a Hive table so that you can refer to it by name.
Again, the result after calling `count` should match your answers for Q1 and Q2.
## Part 2: Hive Data Warehouse
#### Q4: Does the query plan for a GROUP BY on solutions data need to shuffle/exchange rows if the data is pre-bucketed?
Write the data from `solutions.jsonl` to a Hive table named `solutions`, like you did for `problems`. This time, though, bucket the data by "language" and use 4 buckets when writing to the table.
Use Spark SQL to explain the query plan for this query:
```sql
SELECT language, COUNT(*)
FROM solutions
GROUP BY language
```
The `explain` output suffices for your answer. Take note (for your own sake) whether any `Exchange` appears in the output. Think about why an exchange/shuffle is or is not needed between the `partial_count` and `count` aggregates.
<!--
After bucketing the solutions, call `.explain` on a query that counts solutions per language. This should output `== Physical Plan ==` followed by the plan details. You've bucketed correctly if `Bucketed: true` appears in the output.
-->
#### Q5: What tables/views are in our warehouse?
You'll notice additional CSV files in `nb/data` that we haven't used yet. Create a Hive view for each using `createOrReplaceTempView`. Use these files:
```python
[
"languages", "problem_tests", "sources", "tags"
]
```
Answer with a Python dict like this:
```python
{'problems': False,
'solutions': False,
'languages': True,
'problem_tests': True,
'sources': True,
'tags': True}
```
The boolean indicates whether it is a temporary view (True) or table (False).
## Part 3: Caching and Transforming Data
#### Q6: How many correct PYTHON3 solutions are from CODEFORCES?
You may use any method for this question. Join the `solutions` table with the `problems` table using an inner join on the `problem_id` column. Note that the `source` column in `problems` is an integer. Join this column with the `source_id` column from the `sources` CSV. Find the number of correct `PYTHON3` solutions from `CODEFORCES`.
Answer Q6 with code and a single integer. **DO NOT HARDCODE THE CODEFORCES ID**.
#### Q7: How many problems are of easy/medium/hard difficulty?
The `problems_df` has a numeric `difficulty` column. For the purpose of categorizing the problems, interpret this number as follows:
- `<= 5` is `Easy`
- `<= 10` is `Medium`
- Otherwise `Hard`
Your answer should return this dictionary:
```python
{'Easy': 409, 'Medium': 5768, 'Hard': 2396}
```
Note (in case you use this dataset for something beyond the course): the actual meaning of the difficulty column depends on the problem source.
**Hint:** https://www.w3schools.com/sql/sql_case.asp
#### Q8: Does caching make it faster to compute averages over a subset of a bigger dataset?
To test the impact of caching, we are going to do the same calculations with and without caching. Implement a query that first filters rows of `problem_tests` to get rows where `is_generated` is `False` -- use a variable to refer to the resulting DataFrame.
Write some code to compute the average `input_chars` and `output_chars` over this DataFrame. Then, write code to do an experiment as follows:
1. compute the averages
2. make a call to cache the data
3. compute the averages
4. compute the averages
5. uncache the data
Measure the number of seconds it takes each of the three times we do the average calculations.s_generated` filtering only. Answer with list of the three times, in order, as follows:
```
[0.9092552661895752, 1.412867546081543, 0.1958458423614502]
```
Your numbers may vary significantly, but the final run should usually be the fastest.
## Part 4: Machine Learning with Spark
The dataset documentation for the `difficulty` field says: "For Codeforces problems, cf_rating is a more reliable measure of difficulty when available".
For this part, you will attempt to estimate the cf_rating for Codeforces problems for which it is unknown. To prepare, filter the problems to `CODEFORCES` problems, then further divide into three DataFrames:
* train dataset: `cf_rating` is >0, and `problem_id` in an EVEN number
* test dataset: `cf_rating` is >0, and `problem_id` in an ODD number
* missing dataset: `cf_rating` is 0
#### Q9: How well can a decision tree predict `cf_rating` based on `difficulty`, `time_limit`, and `memory_limit_bytes`?
Create a Spark Pipeline model with VectorAssembler and DecisionTreeRegression stages. The max tree depth should be 5. Train it on the training data, then compute an R^2 score (`r2_score`) for predictions on the test data. The R^2 score should be your answer for this question.
#### Q10: Do the problems with a missing `cf_score` appear more or less challenging that other problems?
Use your model to predict the `cf_score` in the dataset where it is missing.
Answer with a tuple with 3 numbers:
* average `cf_rating` in the training dataset
* average `cf_rating` in the test dataset
* average **prediction** of `cf_rating in the missing dataset
For example:
(1887.9377431906614, 1893.1106471816283, 1950.4728638818783)
## Submission
We should be able to run the following on your submission to directly create the mini cluster:
```
# data setup...
docker build . -f p5-base.Dockerfile -t p5-base
docker build . -f notebook.Dockerfile -t p5-nb
docker build . -f namenode.Dockerfile -t p5-nn
docker build . -f datanode.Dockerfile -t p5-dn
docker build . -f boss.Dockerfile -t p5-boss
docker build . -f worker.Dockerfile -t p5-worker
export PROJECT=p5
docker compose up -d
```
We should then be able to open `http://localhost:5000/lab`, find your
notebook, and run it.
## Tester
Coming soon...
#!/bin/bash
# Create the host volume directory if it doesn't exist
docker build \
--rm \
-f p5-base.Dockerfile \
-t p5-base .
# Build the child containers
docker build --rm . -f notebook.Dockerfile -t p5-nb
docker build --rm . -f namenode.Dockerfile -t p5-nn
docker build --rm . -f datanode.Dockerfile -t p5-dn
docker build --rm . -f boss.Dockerfile -t p5-boss
docker build --rm . -f worker.Dockerfile -t p5-worker
FROM p5-base
CMD ["bash", "-c", "hdfs datanode -fs hdfs://nn:9000"]
name: ${PROJECT}
services:
nb:
image: ${PROJECT}-nb
ports:
- "127.0.0.1:5000:5000"
- "127.0.0.1:4040:4040"
volumes:
- "./nb:/nb"
deploy:
resources:
limits:
memory: 1.5G
nn:
image: ${PROJECT}-nn
hostname: nn
deploy:
resources:
limits:
memory: 1G
dn:
image: ${PROJECT}-dn
depends_on:
- nn
deploy:
resources:
limits:
memory: 1G
spark-boss:
image: ${PROJECT}-boss
hostname: boss
deploy:
resources:
limits:
memory: 0.5G
spark-worker:
image: ${PROJECT}-worker
deploy:
replicas: 2
resources:
limits:
memory: 2G
from datasets import load_dataset # Import Hugging Face's datasets library
from pathlib import Path # For handling file paths in a cross-platform way
import tempfile # For creating temporary directories
import random # For random sampling and shuffling
import json # For reading/writing CSV files
from collections import defaultdict # For grouping solutions by language
from tqdm import tqdm
# Get the directory where the script is located
current_file = Path(__file__).parent
# Check if the script is being run from the correct directory
if not current_file.name.startswith("p5"):
print("Please run this script from the p5 directory!!")
print(f"Current directory: {current_file.absolute()}")
exit(1)
# Check if the required 'nb' directory exists
if not Path("nb").exists():
print("No 'nb' directory found. Refer to the README.md and make it.")
exit(1)
# Check if the required 'nb/data' directory exists
if not Path("nb/data").exists():
print("No 'nb/data' directory found. Refer to the README.md and make it.")
exit(1)
print("Splitting the CodeContests dataset into 'problems.jsonl' and 'solutions.jsonl'")
SEED = 42 # Set a random seed for reproducibility
random.seed(SEED)
# Define a mapping from numerical language IDs to human-readable language names
LANGUAGE_MAP = {
0: "UNKNOWN_LANGUAGE",
1: "PYTHON2",
2: "CPP",
3: "PYTHON3",
4: "JAVA",
}
with open("nb/data/languages.csv", "w") as f:
f.write("language,language_name\n")
for k, v in LANGUAGE_MAP.items():
f.write(f"{k},{v}\n")
SOURCE_MAP = {
0: "UNKNOWN_SOURCE",
1: "CODECHEF",
2: "CODEFORCES",
3: "HACKEREARTH",
4: "CODEJAM",
5: "ATCODER",
6: "AIZU",
}
with open("nb/data/sources.csv", "w") as f:
f.write("source,source_name\n")
for k, v in SOURCE_MAP.items():
f.write(f"{k},{v}\n")
# Define the set of keys that will be extracted for problem data
problem_keys = {
"name",
"source",
"difficulty",
"cf_contest_id",
"cf_index",
"cf_points",
"cf_rating",
"is_description_translated",
"memory_limit_bytes",
}
TAG_MAPS = {}
PROB_ID_MAP = {}
# Define output file paths
problems_path = Path("nb/data/problems.jsonl")
solutions_path = Path("nb/data/solutions.jsonl")
num_removed = 0
prob_id_counter = 0
# Create a temporary directory to download and cache the dataset
with tempfile.TemporaryDirectory() as tmpdirname:
# Load the DeepMind Code Contests dataset
dataset = load_dataset(
"deepmind/code_contests",
split="train", # Use the training split
streaming=True, # Stream the dataset to handle its large size
cache_dir=tmpdirname, # Store the cache in the temporary directory
)
dataset = dataset.shuffle(SEED)
with Path("nb/data/problem_tests.csv").open("w") as test_file:
test_file.write(
"problem_id,input_chars,output_chars,is_public,is_generated,is_private,output_is_number\n"
)
# Open both output files for writing
with problems_path.open("w") as problems_fd:
with solutions_path.open("w") as solutions_fd:
problems_saved = 0 # Counter for saved problems
# Process each problem in the dataset
for task in tqdm(dataset, total=10_000, desc="Processing problems"):
# Extract problem data for the relevant keys
problem_id = prob_id_counter
prob_id_counter += 1
prob_dict = {
"problem_id": problem_id,
**{k: task[k] for k in problem_keys},
}
if prob_dict["difficulty"] == 0:
num_removed += 1
continue
total_tests = 0
# Check if test data is available for each test type
for t_name in ["public", "private", "generated"]:
num_save = 0
for ti, to in zip(
task[f"{t_name}_tests"]["input"],
task[f"{t_name}_tests"]["output"],
):
test_file.write(
",".join(
(
str(problem_id),
f"{len(ti)}",
f"{len(to)}",
f"{t_name == 'public'}",
f"{t_name == 'generated'}",
f"{t_name == 'private'}",
f"{to.isnumeric()}",
)
)
+ "\n"
)
num_save += 1
if t_name in {"public", "private"}:
total_tests += 1
if num_save >= 30:
break
prob_dict[f"{t_name}_tests"] = len(
task.get(f"{t_name}_tests", {"input": []})["input"]
)
if total_tests == 0:
num_removed += 1
continue
prob_dict["cf_tags"] = []
for t in task["cf_tags"]:
if t not in TAG_MAPS:
TAG_MAPS[t] = len(TAG_MAPS)
prob_dict["cf_tags"].append(TAG_MAPS[t])
# Extract time limit (if available)
prob_dict["time_limit"] = (
-1
if task["time_limit"] is None
else task["time_limit"]["seconds"]
)
sols = [] # Initialize solutions list (note: not used later)
# Process both correct and incorrect solutions
for p, sol_dict in [
(True, task["solutions"]), # Correct solutions
(False, task["incorrect_solutions"]), # Incorrect solutions
]:
# Group solutions by programming language
language_sols = defaultdict(list)
for i, sol in enumerate(sol_dict["solution"]):
language_sols[sol_dict["language"][i]].append(sol)
has_printed = False
# For each language, randomly sample a small number of solutions
# (to save space, we're not keeping all solutions)
for lang, sols in language_sols.items():
# Take between 1-3 random solutions per language
to_save = random.sample(
sols, k=min(len(sols), random.randint(1, 3))
)
for sol in to_save:
# Truncate solutions that are too long
if len(sol) > 4096:
sol = sol[:4096] + "...TRUNCATED"
save_sol_dict = {
"problem_id": problem_id,
"language": LANGUAGE_MAP[
lang
], # Convert language ID to name
"is_correct": p, # Whether this is a correct solution
"solution": sol, # The code solution
}
# Write the solution to the CSV
solutions_fd.write(json.dumps(save_sol_dict) + "\n")
# Write the problem data to the CSV
problems_fd.write(json.dumps(prob_dict) + "\n")
problems_saved += 1
if problems_saved >= 10_000:
break
with Path("nb/data/tags.csv").open("w") as f:
f.write("tag_id,tag\n")
for k, v in TAG_MAPS.items():
if not k:
continue
f.write(f"{v},{k}\n")
print(f"Removed {num_removed:,} problems")
p5/image.png

119 KiB

FROM p5-base
RUN hdfs namenode -format -force
CMD ["bash", "-c", "hdfs namenode -format -force && hdfs namenode -fs hdfs://nn:9000"]
FROM p5-base
RUN apt-get update; apt-get install -y unzip
CMD ["python3", "-m", "jupyterlab", "--no-browser", "--ip=0.0.0.0", "--port=5000", "--allow-root", "--NotebookApp.token=''"]
FROM ubuntu:22.04
# Install required packages
RUN apt-get update && apt-get install -y \
wget \
curl \
openjdk-11-jdk \
python3-pip \
net-tools \
lsof \
nano \
sudo
# Copy requirements file
COPY requirements.txt /requirements.txt
# Install Python dependencies
RUN pip3 install -r /requirements.txt
# Download and extract Hadoop
RUN wget https://dlcdn.apache.org/hadoop/common/hadoop-3.4.1/hadoop-3.4.1.tar.gz && \
tar -xf hadoop-3.4.1.tar.gz && \
rm hadoop-3.4.1.tar.gz
# Download and extract Spark
RUN wget https://dlcdn.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz && \
tar -xf spark-3.5.5-bin-hadoop3.tgz && \
rm spark-3.5.5-bin-hadoop3.tgz
# Set environment variables
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ENV PATH="${PATH}:/hadoop-3.4.1/bin"
ENV HADOOP_HOME=/hadoop-3.4.1