Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cdis/cs/courses/cs544/s25/main
  • zzhang2478/main
  • spark667/main
  • vijayprabhak/main
  • vijayprabhak/544-main
  • wyang338/cs-544-s-25
  • jmin39/main
7 results
Show changes
Showing with 7451 additions and 15 deletions
This diff is collapsed.
This diff is collapsed.
%% Cell type:code id:c8dca847-54af-4284-97d8-0682e88a6e8d tags:
``` python
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
.master("spark://boss:7077")
.config("spark.executor.memory", "2G")
.config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate())
```
%% Output
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/27 01:41:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
%% Cell type:code id:2294e4e0-ab19-496c-980f-31df757e7837 tags:
``` python
!hdfs dfs -cp sf.csv hdfs://nn:9000/sf.csv
```
%% Cell type:code id:cb54bacc-b52a-4c25-93d2-2ba0f61de9b0 tags:
``` python
df = (spark.read.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("hdfs://nn:9000/sf.csv"))
```
%% Output
%% Cell type:code id:c1298818-83f6-444b-b8a0-4be5b16fd6fb tags:
``` python
from pyspark.sql.functions import col, expr
cols = [col(c).alias(c.replace(" ", "_")) for c in df.columns]
df.select(cols).write.format("parquet").save("hdfs://nn:9000/sf.parquet")
```
%% Output
23/10/27 01:43:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
%% Cell type:code id:37d1ded3-ed8a-4e39-94cb-dd3a3272af91 tags:
``` python
!hdfs dfs -rm hdfs://nn:9000/sf.csv
```
%% Cell type:code id:abea48b5-e012-4ae2-a53a-e40350f94e20 tags:
``` python
df = spark.read.format("parquet").load("hdfs://nn:9000/sf.parquet")
```
......@@ -20,7 +20,10 @@ Before starting, please review the [general project directions](../projects.md).
## Clarifications/Corrections
* none yet
* Feb 24: feel free to use different tools to implement Part 2.
* Feb 24: clarify that `bigdata.py` will be used in tests.
* Feb 24: add link to lecture notes on parquet file operations.
* Feb 24: remove port forwarding for `docker run` since we test server with `docker exec`
## Part 1: Communication (gRPC)
......@@ -79,7 +82,7 @@ server like this:
```
docker build . -t p3
docker run -d -m 512m -p 127.0.0.1:5440:5440 p3
docker run -d -m 512m p3
```
The client programs should then be able to communicate with the
......@@ -97,8 +100,8 @@ clients need to run. When we test your code, we will run the clients
in the same container as the server, like this:
```
docker run --name=server -d -m 512m -p 127.0.0.1:5440:5440 -v ./inputs:/inputs p3 # server
docker exec server python3 upload.py /inputs/test1.csv # client
docker run --name=server -d -m 512m -v ./inputs:/inputs p3 # server
docker exec server python3 upload.py /inputs/test1.csv # client
```
Note that you don't need to have an `inputs/test1.csv` file, as the
......@@ -114,7 +117,7 @@ to re-run your container with newer server.py code without rebuilding
first. Here's an example:
```
docker run --rm -m 512m -p 127.0.0.1:5440:5440 -v ./server.py:/server.py p3
docker run --rm -m 512m -v ./server.py:/server.py p3
```
## Part 2: Upload
......@@ -126,7 +129,9 @@ file (for example, you could add the path to some data structure, like a
list or dictionary).
Your server should similarly write the same data to a parquet file
somewhere, using pyarrow.
somewhere, using `pyarrow`, `pandas`, or any available tools. Refer to
the [lecture notes](https://git.doit.wisc.edu/cdis/cs/courses/cs544/s25/main/-/tree/main/lec/14-file-formats?ref_type=heads)
for a few examples of reading/writing parquet files.
## Part 3: Column Sum
......@@ -174,22 +179,24 @@ be a performance depending on which format is used.
Parquet is a column-oriented format, so all the data in a single file
should be adjacent on disk. This means it should be possible to read
a column of data without reading the whole file. See the `columns`
parameter here:
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html
a column of data without reading the whole file. Check out the `columns`
parameter of [`pyarrow.parquet.read_table`](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html).
You can also find an example from the [lecture notes](https://git.doit.wisc.edu/cdis/cs/courses/cs544/s25/main/-/tree/main/lec/14-file-formats?ref_type=heads).
**Requirement:** when the server is asked to sum over the column of a
Parquet file, it should only read the data from that column, not other
columns.
**Note:** we will run your server with a 512-MB limit on RAM. Any
**Note 1:** we will run your server with a 512-MB limit on RAM. Any
individual files we upload will fit within that limit, but the total
size of the files uploaded will exceed that limit. That's why your
server will have to do sums by reading the files (instead of just
keeping all table data in memory). If you want manually test your
code with some bigger uploads, use the `bigdata.py` client. Instead
of uploading files, it randomly generateds lots of CSV-formatted data
and directly uploads it via gRPC.
keeping all table data in memory).
**Note 2:** the `bigdata.py` randomly generates a large volumne of
CSV-formatted data and uploads it vis gRPC. You are *required* to
test your upload implementation with this script and it will be used
as part of our tests.
## Part 4: Locking
......@@ -243,12 +250,13 @@ be able to run your client and server as follows:
docker build . -t p3
# run server in new container
docker run --name=yournetid -d -m 512m -p 127.0.0.1:5440:5440 -v ./inputs:/inputs p3
docker run --name=yournetid -d -m 512m -v ./inputs:/inputs p3
# run clients in same container
docker exec yournetid python3 upload.py /inputs/simple.csv
docker exec yournetid python3 csvsum.py x
docker exec yournetid python3 parquetsum.py x
docker exec yournetid python3 bigdata.py
```
Please do include the files built from the .proto (your Dockerfile
......
venv/
tmp/
*.csv
FROM p4-hdfs
CMD export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob` && \
hdfs datanode -D dfs.datanode.data.dir=/var/datanode -fs hdfs://boss:9000
\ No newline at end of file
FROM ubuntu:24.04
RUN apt-get update; apt-get install -y wget curl openjdk-11-jdk python3-pip iproute2 nano
# HDFS
RUN wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz; tar -xf hadoop-3.3.6.tar.gz; rm hadoop-3.3.6.tar.gz
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ENV PATH="${PATH}:/hadoop-3.3.6/bin"
ENV HADOOP_HOME=/hadoop-3.3.6
FROM mysql:8.4.0-oraclelinux8
WORKDIR /
#COPY init.sql /docker-entrypoint-initdb.d/
COPY hdma-wi-2021.sql.gz /docker-entrypoint-initdb.d/
RUN gzip -d /docker-entrypoint-initdb.d/hdma-wi-2021.sql.gz
CMD ["mysqld"]
FROM p4-hdfs
CMD export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob` && \
hdfs namenode -format &&\
hdfs namenode -D dfs.namenode.stale.datanode.interval=10000 -D dfs.namenode.heartbeat.recheck-interval=30000 -fs hdfs://boss:9000
# P4 (4% of grade): SQL and HDFS
## Overview
In this project, you will depoly a data system consisting of an SQL server, an HDFS cluster, a gRPC server, and a client. You will need to read and filter data from a SQL database and persist it to HDFS. Additionally, you will write a fault-tolerant application that works even when an HDFS DataNode fails (we will test this scenario).
Learning objectives:
* communicate with the SQL Server using SQL queries
* use the WebHDFS API
* utilize PyArrow to interact with HDFS and process Parquet files
* handle data loss scenario
Before starting, please review the [general project directions](../projects.md).
## Corrections/Clarifications
- Mar 5: A hint about HDFS environment variables added; a dataflow diagram added; some minor typos fixed.
- Mar 5: Fix the wrong expected file size in Part 1 and sum of blocks in Part 2.
- Mar 6: Released `autobadger` for `p4` (`0.1.6`)
- Mar 7:
- Some minor updates on p4 `Readme.md`.
- Update `autobadgere` to version `0.1.7`
- Fixed exception handling, now Autobadger can correctly print error messages.
- Expanded the expected file size range in test4 `test_Hdfs_size`.
- Make the error messages clearer.
## Introduction
You'll need to deploy a system including 6 docker containers like this:
<img src="arch.png" width=600>
The data flow roughly follows this:
<img src="dataflow.png" width=600>
We have provided the other components; what you only need is to complete the work within the gRPC server and its Dockerfile.
### Client
This project will use `docker exec` to run the client on the gRPC server's container. Usage of `client.py` is as follows:
```
#Inside the server container
python3 client.py DbToHdfs
python3 client.py BlockLocations -f <file_path>
python3 client.py CalcAvgLoan -c <county_code>
```
### Docker Compose
Take a look at the provided Docker compose file. There are several services, including 3 `datanodes`, a `namenode`, a `SQL server`, a `gRPC Server`. The NameNode service will serve at the host of `boss` within the docker compose network.
### gRPC
You are required to write a server.py and Dockerfile.server based on the provided proto file (you may not modify it).
The gRPC interfaces have already been defined (see `lender.proto` for details). There are no constraints on the return values of `DbToHdfs`, so you may return what you think is helpful.
### Docker image naming
You need to build the Docker image following this naming convention.
```
docker build . -f Dockerfile.hdfs -t p4-hdfs
docker build . -f Dockerfile.namenode -t p4-nn
docker build . -f Dockerfile.datanode -t p4-dn
docker build . -f Dockerfile.mysql -t p4-mysql
docker build . -f Dockerfile.server -t p4-server
```
### PROJECT environment variable
Note that the compose file assumes there is a "PROJECT" environment
variable. You can set it to p4 in your environment:
```
export PROJECT=p4
```
**Hint 1:** The command `docker logs <container-name> -f` might be very useful for troubleshooting. It allows you to view real-time output from a specific container.
**Hint 2:** Think about whether there is any .sh script that will help you quickly test code changes. For example, you may want it to rebuild your Dockerfiles, cleanup an old Compose cluster, and deploy a new cluster.
**Hint 3:** If you're low on disk space, consider running `docker system prune --volumes -f`
## Part 1: `DbToHdfs` gRPC Call
In this part, your task is to implement the `DbToHdfs` gRPC call (you can find the interface definition in the proto file).
**DbToHdfs:** To be more specific, you need to:
1. Connect to the SQL server, with the database name as `CS544` and the password as `abc`. There are two tables in databse: `loans` ,and `loan_types`. The former records all information related to loans, while the latter maps the numbers in the loan_type column of the loans table to their corresponding loan types. There should be **447367** rows in table `loans`. It's like:
```mysql
mysql> show tables;
+-----------------+
| Tables_in_CS544 |
+-----------------+
| loan_types |
| loans |
+-----------------+
mysql> select count(*) from loans;
+----------+
| count(*) |
+----------+
| 447367 |
+----------+
```
2. What are the actual types for those loans?
Perform an inner join on these two tables so that a new column `loan_type_name` added to the `loans` table, where its value is the corresponding `loan_type_name` from the `loan_types` table based on the matching `loan_type_id` in `loans`.
3. Filter all rows where `loan_amount` is **greater than 30,000** and **less than 800,000**. After filtering, this table should have only **426716** rows.
4. Upload the generated table to `/hdma-wi-2021.parquet` in the HDFS, with **2x** replication and a **1-MB** block size, using PyArrow (https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html).
To check whether the upload was correct, you can use `docker exec -it <container_name> bash` to enter the gRPC server's container and use HDFS command `hdfs dfs -du -h <path>`to see the file size. The expected result should like:
```
14.4 M 28.9 M hdfs://nn:9000/hdma-wi-2021.parquet
```
Note: Your file size might have slight difference from this.
>That's because when we join two tables, rows from one table get matches with rows in the other, but the order of output rows is not guaranteed. If we have the same rows in a different order, the compressibility of snappy (used by Parquet by default) will vary because it is based on compression windows, and there may be more or less redundancy in a window depending on row ordering.
**Hint 1:** We used similar tables in lecture: https://git.doit.wisc.edu/cdis/cs/courses/cs544/s25/main/-/tree/main/lec/15-sql
**Hint 2:** To get more familiar with these tables, you can use SQL queries to print the table schema or retrieve sample data. A convenient way to do this is to use `docker exec -it <container name> bash` to enter the SQL Server, then run mysql client `mysql -p CS544` to access the SQL Server and then perform queries.
**Hint 3:** After `docker compose up`, the SQL Server needs some time to load the data before it is ready. Therefore, you need to wait for a while, or preferably, add a retry mechanism for the SQL connection.
**Hint 4:** For PyArrow to be able to connect to HDFS, you'll need to configure some env variables carefully. Look at how Dockerfile.namenode does this for the start CMD, and do the same in your own Dockerfile for your server.
## Part 2: `BlockLocations` gRPC Call
In this part, your task is to implement the `BlockLocations` gRPC call (you can find the interface definition in the proto file).
**BlockLocations:** To be more specific, for a given file path, you need to return a Python dictionary (that is, a `map` in proto), recording how many blocks are stored by each DataNode (key is the **DataNode location** and value is **number** of blocks on that node).
For example, running `docker exec -it p4-server-1 python3 /client.py BlockLocations -f /hdma-wi-2021.parquet` should show something like this:
```
{'7eb74ce67e75': 15, 'f7747b42d254': 7, '39750756065d': 8}
```
Note: DataNode location is the randomly generated container ID for the
container running the DataNode, so yours will be different, and the
distribution of blocks across different nodes will also likely vary.
The documents [here](https://hadoop.apache.org/docs/r3.3.6/hadoop-project-dist/hadoop-hdfs/WebHDFS.html) describe how we can interact with HDFS via web requests. Many [examples](https://requests.readthedocs.io/en/latest/user/quickstart/) show these web requests being made with the curl command, but you'll adapt those examples to use requests.get. By default, WebHDFS runs on port 9870. So use port 9870 instead of 9000 to access HDFS for this part.
Use a `GETFILEBLOCKLOCATIONS` operation to find the block locations.
**Hint:** You have to set appropriate environment variable `CLASSPATH` to access HDFS correctly. See example [here](https://git.doit.wisc.edu/cdis/cs/courses/cs544/s25/main/-/blob/main/lec/18-hdfs/notebook.Dockerfile?ref_type=heads).
## Part 3: `CalcAvgLoan` gRPC Call
In this part, your task is to implement the `CalcAvgLoan` gRPC call (you can find the interface definition in the proto file).
The call should read hdma-wi-2021.parquet, filtering to rows with the specified county code. One way to do this would be to pass a `("column", "=", ????)` tuple inside a `filters` list upon read: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html
The call should return the average loan amount from the filtered table as an integer (rounding down if necessary).
As an optimization, your code should also write the filtered data to a file named `partitions/<county_code>.parquet`. If there are later calls for the same county_code, your code should use the smaller, county-specific Parquet file (instead of filtering the big Parquet file with all loan applications). The county-specific Parquet file should have 1x replication. When `CalcAvgLoan` returns the average, it should also use the "source" field to indicate whether the data came from big Parquet file (`source="create"` because a new county-specific file had to be created) or a county-specific file was previously created (`source="reuse"`).
One easy way to check if the county-specific file already exists is to just try reading it with PyArrow. You should get an `FileNotFoundError` exception if it doesn't exist.
<!--
Imagine a scenario where there could be many queries differentiated by `county`, and one of them is to get the average loan amount for a county. In this case, it might be much more efficient to generate a set of 1x Parquet files filtered by county, and then read data from these partitioned, relatively much smaller tables for computation.
**CalcAvgLoan:** To be more specific, for a given `county_id` , you need to return a int value, indicating the average `loan_amount` of that county. **Note:** You are required to perform this calculation based on the partitioned parquet files generated by `FilterByCounty`. `source` field in proto file can ignored in this part.
-->
After a `DbToHdfs` call and a few `CalcAvgLoan` calls, your HDFS directory structure will look something like this:
```
├── hdma-wi-2021.parquet
├── partitions/
│ ├── 55001.parquet
│ ├── 55003.parquet
│ └── ...
```
## Part 4: Fault Tolerance
A "fault" is something that goes wrong, like a hard disk failing or an entire DataNode crashing. Fault tolerant code continues functioning for some kinds of faults.
In this part, your task is to make `CalcAvgLoan` tolerant to a single DataNode failure (we will kill one during testing!).
Recall that `CalcAvgLoan` sometimes uses small, county-specific Parquet files that have 1x replication, and sometimes it uses the big Parquet file (hdma-wi-2021.parquet) of all loan applications that uses 2x replication. Your fault tolerance strategy should be as follows:
1. hdma-wi-2021.parquet: if you created this with 2x replication earlier, you don't need to do anything else here, because HDFS can automatically handle a single DataNode failure for you
2. partitions/<COUNTY_CODE>.parquet: this data only has 1x replication, so HDFS might lose it when the DataNode fails. That's fine, because all the rows are still in the big Parquet file. You should write code to detect this scenario and recreate the lost/corrupted county-specific file by reading the big file again with the county filter. If you try to read an HDFS file with missing data using PyArrow, the client will retry for a while (perhaps 30 seconds or so), then raise an OSError exception, which you should catch and handle
CalcAvgLoan should now use the "source" field in the return value to indicate how the average was computed: "create" (from the big file, because a county-specific file didn't already exist), "recreate" (from the big file, because a county-specific file was corrupted/lost), or "reuse" (there was a valid county-specific file that was used).
**Hint:** to manually test DataNode failure, you should use `docker kill` to terminate a node and then wait until you confirm that the number of `live DataNodes` has decreased using the `hdfs dfsadmin -fs <hdfs_path> -report` command.
## Submission
Read the directions [here](../projects.md) about how to create the
repo.
You have some flexibility about how you write your code, but we must be able to run it like this:
```
docker build . -f Dockerfile.hdfs -t p4-hdfs
docker build . -f Dockerfile.namenode -t p4-nn
docker build . -f Dockerfile.datanode -t p4-dn
docker build . -f Dockerfile.mysql -t p4-mysql
docker build . -f Dockerfile.server -t p4-server
docker compose up -d
```
Then run the client like this:
```
docker exec p4-server-1 python3 /client.py DbToHdfs
docker exec p4-server-1 python3 /client.py BlockLocations -f /hdma-wi-2021.parquet
docker exec p4-server-1 python3 /client.py CalcAvgLoan -c 55001
```
Note that we will copy in the the provided files (docker-compose.yml, client.py, lender.proto, hdma-wi-2021.sql.gz, etc.), overwriting anything you might have changed. Please do NOT push hdma-wi-2021.sql.gz to your repo because it is large, and we want to keep the repos small.
Please make sure you have `client.py` copied into the p4-server image. We will run client.py in the p4-server-1 container to test your code.
## Tester
Please be sure that your installed `autobadger` is on version `0.1.7`. You can print the version using
```bash
autobadger --info
```
See [projects.md](https://git.doit.wisc.edu/cdis/cs/courses/cs544/s25/main/-/blob/main/projects.md#testing) for more information.
p4/arch.png

276 KiB

import sys
import grpc
import argparse
from concurrent import futures
import lender_pb2, lender_pb2_grpc
import pandas as pd
parser = argparse.ArgumentParser(description="argument parser for p4 clinet")
parser.add_argument("mode", help="which action to take", choices=["DbToHdfs","BlockLocations","CalcAvgLoan"])
parser.add_argument("-c", "--code", type=int, default=0, help="county code to query average loan amount in CalcAvgLoan mode")
parser.add_argument("-f", "--file", type=str, default="", help="file path for BlockLocation")
args = parser.parse_args()
channel = grpc.insecure_channel("server:5000")
stub = lender_pb2_grpc.LenderStub(channel)
if args.mode == "DbToHdfs":
resp = stub.DbToHdfs(lender_pb2.Empty())
print(resp.status)
elif args.mode == "CalcAvgLoan":
resp = stub.CalcAvgLoan(lender_pb2.CalcAvgLoanReq(county_code=args.code))
if resp.error:
print(f"error: {resp.error}")
else:
print(resp.avg_loan)
print(resp.source)
elif args.mode == "BlockLocations":
resp = stub.BlockLocations(lender_pb2.BlockLocationsReq(path=args.file))
if resp.error:
print(f"error: {resp.error}")
else:
print(resp.block_entries)
p4/dataflow.png

290 KiB

name: ${PROJECT}
services:
server:
image: ${PROJECT}-server
deploy:
resources:
limits:
memory: 3g
mysql:
image: ${PROJECT}-mysql
hostname: mysql
environment:
MYSQL_ROOT_PASSWORD: abc
MYSQL_DATABASE: CS544
deploy:
resources:
limits:
memory: 1g
nn:
image: ${PROJECT}-nn
hostname: boss
deploy:
resources:
limits:
memory: 1g
dn:
image: ${PROJECT}-dn
deploy:
replicas: 3
resources:
limits:
memory: 1g
File added
syntax="proto3";
message Empty{}
message BlockLocationsReq {
string path = 1;
}
message BlockLocationsResp {
map <string, int32> block_entries = 1; // list of block locations
string error = 2;
}
message CalcAvgLoanReq {
int32 county_code = 1;
}
message CalcAvgLoanResp {
int32 avg_loan = 1;
string source = 2; // create, reuse, or recreate
string error = 3;
}
message StatusString{
string status= 1;
}
service Lender {
//Load input.data from SQL server and upload it to HDFS
rpc DbToHdfs (Empty) returns (StatusString);
//Get the block locations of the Parquet file in HDFS
rpc BlockLocations (BlockLocationsReq) returns (BlockLocationsResp);
//Calculate the average loan amount for a given county_code
rpc CalcAvgLoan (CalcAvgLoanReq) returns (CalcAvgLoanResp);
}
venv/*
nb/.ipynb_checkpoints/*
nb/data/*
nb/metastore_db/*
nb/derby.log
\ No newline at end of file
# P5 (4% of grade): Spark And Hive
## Overview
In P5, you'll use Spark to analyze competitive programming problems and their solutions. You'll load your data into Hive tables and views for easy querying. The main table contains numerous IDs in columns; you'll need to join these with other tables or views to determine what these IDs mean.
**Important:** You'll answer 10 questions in P5. Write each question number and text (e.g., "#q1: ...") as a comment in your notebook before each answer so we can easily find and grade your work.
Learning objectives:
- Use Spark's RDD, DataFrame, and SQL interfaces to answer questions about data
- Load data into Hive for querying with Spark
- grouping and optimizing queries
- Use PySpark's machine learning API to train a decision tree
Before starting, please review the [general project directions](../projects.md).
## Corrections/Clarifications
* none yet
## Setup
Copy these files from the project into your repository:
- `p5-base.Dockerfile`
- `namenode.Dockerfile`
- `notebook.Dockerfile`
- `datanode.Dockerfile`
- `docker-compose.yml`
- `build.sh`
- `get_data.py`
- `.gitignore`
Create a Python virtual environment and install the [datasets library](https://huggingface.co/docs/datasets/en/index):
```sh
pip3 install datasets==3.3.2
```
Create the directory structure with:
```sh
mkdir -p nb/data
```
Run the provided `get_data.py` script to download the [DeepMind CodeContests dataset](https://huggingface.co/datasets/deepmind/code_contests) and split it into `problems.jsonl` and `solutions.jsonl`.
### Docker Containers
```sh
docker build . -f p5-base.Dockerfile -t p5-base
docker build . -f notebook.Dockerfile -t p5-nb
docker build . -f namenode.Dockerfile -t p5-nn
docker build . -f datanode.Dockerfile -t p5-dn
docker build . -f boss.Dockerfile -t p5-boss
docker build . -f worker.Dockerfile -t p5-worker
```
Note that you need to write a `boss.Dockerfile` and `worker.Dockerfile`. A `build.sh` script is included for your convenience.
You can bring up your cluster like this:
```
export PROJECT=p5
docker compose up -d
```
### Jupyter Container
Connect to JupyterLab inside your container. Within the `nb` directory, create a notebook called `p5.ipynb`.
Run the following shell commands in a cell to upload the data:
```sh
hdfs dfs -D dfs.replication=1 -cp -f data/*.jsonl hdfs://nn:9000/
hdfs dfs -D dfs.replication=1 -cp -f data/*.csv hdfs://nn:9000/
```
### VS Code users
If you are using VS Code and remote SSH to work on your project, then the ports will already be forwarded for you. And you only need to go to: `http://127.0.0.1:5000/lab` in your terminal.
## Part 1: Filtering: RDDs, DataFrames, and Spark
Inside your `p5.ipynb` notebook, create a Spark session (note we're enabling Hive on HDFS):
```python
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
.master("spark://boss:7077")
.config("spark.executor.memory", "1G")
.config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate())
```
Load `hdfs://nn:9000/problems.jsonl` into a DataFrame. To verify success, run:
```python
problems_df.limit(5).show()
```
If loaded properly, you should see:
![image.png](image.png)
#### Q1: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "\_A." (Case Sensitive)? Answer by directly using the RDD API.
Remember that if you have a Spark DataFrame `df`, you can get the underlying RDD using `df.rdd`.
**REMEMBER TO INCLUDE `#q1` AT THE TOP OF THIS CELL**
#### Q2: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "\_A." (Case Sensitive)? Answer by using the DataFrame API.
This is the same question as Q1, and you should get the same answer. This is to give you to interact with Spark different ways.
#### Q3: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "\_A." (Case Sensitive)? Answer by using Spark SQL.
Before you can use `spark.sql`, write the problem data to a Hive table so that you can refer to it by name.
Again, the result after calling `count` should match your answers for Q1 and Q2.
## Part 2: Hive Data Warehouse
#### Q4: Does the query plan for a GROUP BY on solutions data need to shuffle/exchange rows if the data is pre-bucketed?
Write the data from `solutions.jsonl` to a Hive table named `solutions`, like you did for `problems`. This time, though, bucket the data by "language" and use 4 buckets when writing to the table.
Use Spark SQL to explain the query plan for this query:
```sql
SELECT language, COUNT(*)
FROM solutions
GROUP BY language
```
The `explain` output suffices for your answer. Take note (for your own sake) whether any `Exchange` appears in the output. Think about why an exchange/shuffle is or is not needed between the `partial_count` and `count` aggregates.
<!--
After bucketing the solutions, call `.explain` on a query that counts solutions per language. This should output `== Physical Plan ==` followed by the plan details. You've bucketed correctly if `Bucketed: true` appears in the output.
-->
#### Q5: What tables/views are in our warehouse?
You'll notice additional CSV files in `nb/data` that we haven't used yet. Create a Hive view for each using `createOrReplaceTempView`. Use these files:
```python
[
"languages", "problem_tests", "sources", "tags"
]
```
Answer with a Python dict like this:
```python
{'problems': False,
'solutions': False,
'languages': True,
'problem_tests': True,
'sources': True,
'tags': True}
```
The boolean indicates whether it is a temporary view (True) or table (False).
## Part 3: Caching and Transforming Data
#### Q6: How many correct PYTHON3 solutions are from CODEFORCES?
You may use any method for this question. Join the `solutions` table with the `problems` table using an inner join on the `problem_id` column. Note that the `source` column in `problems` is an integer. Join this column with the `source_id` column from the `sources` CSV. Find the number of correct `PYTHON3` solutions from `CODEFORCES`.
Answer Q6 with code and a single integer. **DO NOT HARDCODE THE CODEFORCES ID**.
#### Q7: How many problems are of easy/medium/hard difficulty?
The `problems_df` has a numeric `difficulty` column. For the purpose of categorizing the problems, interpret this number as follows:
- `<= 5` is `Easy`
- `<= 10` is `Medium`
- Otherwise `Hard`
Your answer should return this dictionary:
```python
{'Easy': 409, 'Medium': 5768, 'Hard': 2396}
```
Note (in case you use this dataset for something beyond the course): the actual meaning of the difficulty column depends on the problem source.
**Hint:** https://www.w3schools.com/sql/sql_case.asp
#### Q8: Does caching make it faster to compute averages over a subset of a bigger dataset?
To test the impact of caching, we are going to do the same calculations with and without caching. Implement a query that first filters rows of `problem_tests` to get rows where `is_generated` is `False` -- use a variable to refer to the resulting DataFrame.
Write some code to compute the average `input_chars` and `output_chars` over this DataFrame. Then, write code to do an experiment as follows:
1. compute the averages
2. make a call to cache the data
3. compute the averages
4. compute the averages
5. uncache the data
Measure the number of seconds it takes each of the three times we do the average calculations.s_generated` filtering only. Answer with list of the three times, in order, as follows:
```
[0.9092552661895752, 1.412867546081543, 0.1958458423614502]
```
Your numbers may vary significantly, but the final run should usually be the fastest.
## Part 4: Machine Learning with Spark
The dataset documentation for the `difficulty` field says: "For Codeforces problems, cf_rating is a more reliable measure of difficulty when available".
For this part, you will attempt to estimate the cf_rating for Codeforces problems for which it is unknown. To prepare, filter the problems to `CODEFORCES` problems, then further divide into three DataFrames:
* train dataset: `cf_rating` is >0, and `problem_id` in an EVEN number
* test dataset: `cf_rating` is >0, and `problem_id` in an ODD number
* missing dataset: `cf_rating` is 0
#### Q9: How well can a decision tree predict `cf_rating` based on `difficulty`, `time_limit`, and `memory_limit_bytes`?
Create a Spark Pipeline model with VectorAssembler and DecisionTreeRegression stages. The max tree depth should be 5. Train it on the training data, then compute an R^2 score (`r2_score`) for predictions on the test data. The R^2 score should be your answer for this question.
#### Q10: Do the problems with a missing `cf_score` appear more or less challenging that other problems?
Use your model to predict the `cf_score` in the dataset where it is missing.
Answer with a tuple with 3 numbers:
* average `cf_rating` in the training dataset
* average `cf_rating` in the test dataset
* average **prediction** of `cf_rating in the missing dataset
For example:
(1887.9377431906614, 1893.1106471816283, 1950.4728638818783)
## Submission
We should be able to run the following on your submission to directly create the mini cluster:
```
# data setup...
docker build . -f p5-base.Dockerfile -t p5-base
docker build . -f notebook.Dockerfile -t p5-nb
docker build . -f namenode.Dockerfile -t p5-nn
docker build . -f datanode.Dockerfile -t p5-dn
docker build . -f boss.Dockerfile -t p5-boss
docker build . -f worker.Dockerfile -t p5-worker
export PROJECT=p5
docker compose up -d
```
We should then be able to open `http://localhost:5000/lab`, find your
notebook, and run it.
## Tester
Coming soon...
#!/bin/bash
# Create the host volume directory if it doesn't exist
docker build \
--rm \
-f p5-base.Dockerfile \
-t p5-base .
# Build the child containers
docker build --rm . -f notebook.Dockerfile -t p5-nb
docker build --rm . -f namenode.Dockerfile -t p5-nn
docker build --rm . -f datanode.Dockerfile -t p5-dn
docker build --rm . -f boss.Dockerfile -t p5-boss
docker build --rm . -f worker.Dockerfile -t p5-worker
FROM p5-base
CMD ["bash", "-c", "hdfs datanode -fs hdfs://nn:9000"]