Skip to content
Snippets Groups Projects
Commit 2777a10d authored by GABRIEL ORLANSKI's avatar GABRIEL ORLANSKI
Browse files

Updated

parent 4394f24c
No related branches found
No related tags found
No related merge requests found
# DRAFT! Don't start yet.
# DRAFT! Don't start yet.
# P5 (4% of grade): Spark And Hive
......@@ -29,7 +29,6 @@ Copy these files from the project into your repository:
- `datanode.Dockerfile`
- `docker-compose.yml`
- `build.sh`
- `requirements.txt`
- `get_data.py`
- `.gitignore`
......@@ -58,7 +57,7 @@ docker build . -f boss.Dockerfile -t p5-boss
docker build . -f worker.Dockerfile -t p5-worker
```
A `build.sh` script is included for your convenience.
Note that you need to write a `boss.Dockerfile` and `worker.Dockerfile`. A `build.sh` script is included for your convenience.
You can bring up your cluster like this:
......@@ -82,7 +81,6 @@ hdfs dfs -D dfs.replication=1 -cp -f data/*.csv hdfs://nn:9000/
If you are using VS Code and remote SSH to work on your project, then the ports will already be forwarded for you. And you only need to go to: `http://127.0.0.1:5000/lab` in your terminal.
## Part 1: Filtering: RDDs, DataFrames, and Spark
Inside your `p5.ipynb` notebook, create a Spark session (note we're enabling Hive on HDFS):
......@@ -106,17 +104,17 @@ problems_df.limit(5).show()
If loaded properly, you should see:
![image.png](image.png)
#### Q1: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "_A." (Case Sensitive)? Answer by directly using the RDD API.
#### Q1: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "\_A." (Case Sensitive)? Answer by directly using the RDD API.
Remember that if you have a Spark DataFrame `df`, you can get the underlying RDD using `df.rdd`.
**REMEMBER TO INCLUDE `#q1` AT THE TOP OF THIS CELL**
#### Q2: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "_A." (Case Sensitive)? Answer by using the DataFrame API.
#### Q2: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "\_A." (Case Sensitive)? Answer by using the DataFrame API.
This is the same question as Q1, and you should get the same answer. This is to give you to interact with Spark different ways.
This is the same question as Q1, and you should get the same answer. This is to give you to interact with Spark different ways.
#### Q3: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "_A." (Case Sensitive)? Answer by using Spark SQL.
#### Q3: How many problems are there with a `cf_rating` of at least 1600, having `private_tests`, and a name containing "\_A." (Case Sensitive)? Answer by using Spark SQL.
Before you can use `spark.sql`, write the problem data to a Hive table so that you can refer to it by name.
......@@ -126,7 +124,7 @@ Again, the result after calling `count` should match your answers for Q1 and Q2.
#### Q4: Does the query plan for a GROUP BY on solutions data need to shuffle/exchange rows if the data is pre-bucketed?
Write the data from `solutions.jsonl` to a Hive table named `solutions`, like you did for `problems`. This time, though, bucket the data by "language" and use 4 buckets when writing to the table.
Write the data from `solutions.jsonl` to a Hive table named `solutions`, like you did for `problems`. This time, though, bucket the data by "language" and use 4 buckets when writing to the table.
Use Spark SQL to explain the query plan for this query:
......@@ -136,7 +134,7 @@ FROM solutions
GROUP BY language
```
The `explain` output suffices for your answer. Take note (for your own sake) whether any `Exchange` appears in the output. Think about why an exchange/shuffle is or is not needed between the `partial_count` and `count` aggregates.
The `explain` output suffices for your answer. Take note (for your own sake) whether any `Exchange` appears in the output. Think about why an exchange/shuffle is or is not needed between the `partial_count` and `count` aggregates.
<!--
After bucketing the solutions, call `.explain` on a query that counts solutions per language. This should output `== Physical Plan ==` followed by the plan details. You've bucketed correctly if `Bucketed: true` appears in the output.
......@@ -175,7 +173,7 @@ Answer Q6 with code and a single integer. **DO NOT HARDCODE THE CODEFORCES ID**.
#### Q7: How many problems are of easy/medium/hard difficulty?
The `problems_df` has a numeric `difficulty` column. For the purpose of categorizing the problems, interpret this number as follows:
The `problems_df` has a numeric `difficulty` column. For the purpose of categorizing the problems, interpret this number as follows:
- `<= 5` is `Easy`
- `<= 10` is `Medium`
......@@ -193,7 +191,7 @@ Your answer should return this dictionary:
To test the impact of caching, we are going to do the same calculations with and without caching. Implement a query that first filters rows of `problem_tests` to get rows where `is_generated` is `False` -- use a variable to refer to the resulting DataFrame.
Write some code to compute the average `input_chars` and `output_chars` over this DataFrame. Then, write code to do an experiment as follows:
Write some code to compute the average `input_chars` and `output_chars` over this DataFrame. Then, write code to do an experiment as follows:
1. compute the averages
2. make a call to cache the data
......@@ -201,7 +199,7 @@ Write some code to compute the average `input_chars` and `output_chars` over thi
4. compute the averages
5. uncache the data
Measure the number of seconds it takes each of the three times we do the average calculations.s_generated` filtering only. Answer with list of the three times, in order, as follows:
Measure the number of seconds it takes each of the three times we do the average calculations.s_generated` filtering only. Answer with list of the three times, in order, as follows:
```
[0.48511195182800293, 0.47667789459228516, 0.1396317481994629]
......@@ -211,73 +209,43 @@ Your numbers may vary significantly, but the final run should usually be the fas
## Part 4: Machine Learning with Spark
#### Q9: Preparing Data For Machine Learning
#### Q9: How do we turn columns into `DenseVector` features?
Join the query you wrote on the previous question with the `problems_df` on the unique `problem_id` key. You will then use Spark's [`VectorAssembler`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) to create the `features` vector. The input columns are (**order matters**) should be:
```python
['source','input_chars','output_chars','public_tests','private_tests','cf_rating']
```
We have a large set of numerical columns in our dataset, but currently we are unable to use these with a machine learning algorithm as they are not in a [`DenseVector`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.linalg.DenseVector.html). Thankfully Spark has the [`VectorAssembler`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) class that we can use. Write a simple `join` that combines the results from the cache problem with the `problems_df`. Then transform the result with your `VectorAssembler`.
Then transform your joined dataframe with the vector assembler so that there is now a `features` column.
What are the rows for the problems that have only a single public test, at least 6 private tests, and a cf rating of 1500.
Finally, use:
Return your answer using `[{**l.asDict(),'features':l.features.toArray().tolist()} for l in df.<Your filter query>.tail(3)]`. You should get a list of 3 dicts with the schema:
```python
train, test = df.sort('problem_id').randomSplit([0.8, 0.2], seed=2025)
{
'source': int,
'difficulty': int,
'input_chars': float,
'output_chars': float,
'public_tests': int,
'private_tests': int,
'cf_rating': float,
'features': List[Float]
}
```
to split the dataset. Return `[{**l.asDict(),'features':l.features.toArray().tolist()} for l in test.tail(3)]` as your answer. You should get:
#### Q10: How does accuracy change when we ablate with depths `[1,5,10,15,20]`?
First split your dataset into training and test with:
```python
[{'source': 2,
'difficulty': 21,
'input_chars': 10.181818181818182,
'output_chars': 25.545454545454547,
'public_tests': 1,
'private_tests': 10,
'cf_rating': 1300,
'features': [2.0,
1300.0,
10.181818181818182,
25.545454545454547,
1.0,
10.0]},
{'source': 2,
'difficulty': 21,
'input_chars': 12.0,
'output_chars': 16.0,
'public_tests': 2,
'private_tests': 0,
'cf_rating': 3500,
'features': [2.0, 3500.0, 12.0, 16.0, 2.0, 0.0]},
{'source': 2,
'difficulty': 24,
'input_chars': 9.8,
'output_chars': 18.2,
'public_tests': 1,
'private_tests': 9,
'cf_rating': 1700,
'features': [2.0, 1700.0, 9.8, 18.2, 1.0, 9.0]}]
train, test = df.sort('problem_id').randomSplit([0.8, 0.2], seed=2025)
```
**IMPORTANT:** Sorting by the unique `problem_id` and setting the seed to `2025` is CRUCIAL to ensure determinism.
#### Q10: Training a model with depths `[1,5,10,15,20]`
With your transformed dataset, you will train 5 different `DecisionTreeClassifier` models on the `features` column. The task is to predict the `difficulty`. The only parameter of the `DecisionTreeClassifier` you will change is the depth by iterating over `[1,5,10,15,20]`.
Use the models to make predictions on the test data. What are their
*accuracy* (fraction of times the model is correct)?
Use the models to make predictions on the test data. What are their
_accuracy_ (fraction of times the model is correct)?
Answer with the dict:
```python
{'depth=1': 0.2925207756232687,
'depth=5': 0.41440443213296396,
'depth=10': 0.3767313019390582,
'depth=15': 0.32797783933518004,
'depth=20': 0.2919667590027701}
```
Answer with the dict whose keys are in the format `depth=<DEPTH>` and value is the accuracy of that model.
## Submission
......
FROM p5-base
CMD sh -c "/spark-3.5.5-bin-hadoop3/sbin/start-master.sh && tail -f /spark-3.5.5-bin-hadoop3/logs/*.out"
FROM p5-base
CMD sh -c "/spark-3.5.5-bin-hadoop3/sbin/start-worker.sh spark://boss:7077 -c 2 -m 2G && tail -f /spark-3.5.5-bin-hadoop3/logs/*.out"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment