Skip to content
Snippets Groups Projects
Commit 564b9103 authored by TYLER CARAZA-HARTER's avatar TYLER CARAZA-HARTER
Browse files

part 3

parent 1eb2ee88
No related branches found
No related tags found
No related merge requests found
%% Cell type:code id:df568295-31af-4fde-b402-8adecdf57f13 tags:
``` python
from sqlalchemy import create_engine, text
engine = create_engine("mysql+mysqlconnector://root:abc@127.0.0.1:3306/cs544")
conn = engine.connect()
```
%% Cell type:code id:8cabbb35-3d75-44ee-886c-6aa871a01d68 tags:
``` python
list(conn.execute(text("""
show tables
""")))
```
%% Output
[]
%% Cell type:code id:b68ba375-c9c9-4677-9d9d-310d1927a276 tags:
``` python
# table: users
# columns: id, name, phone
# name is required
# id uniquely identifies row
conn.execute(text("""
create table users (
id int,
name text NOT NULL,
phone text,
PRIMARY KEY (id)
)
"""))
```
%% Output
<sqlalchemy.engine.cursor.CursorResult at 0x765a7c3ddb00>
<sqlalchemy.engine.cursor.CursorResult at 0x7c45d0474de0>
%% Cell type:code id:00a5b5b9-8e91-4d90-99ad-e85a1756ea88 tags:
``` python
conn.execute(text("""
create table accounts (
user_id int,
name text NOT NULL,
amount int NOT NULL,
FOREIGN KEY (user_id) references users(id)
)
"""))
```
%% Output
<sqlalchemy.engine.cursor.CursorResult at 0x765a714c6820>
<sqlalchemy.engine.cursor.CursorResult at 0x7c45c9711e80>
%% Cell type:code id:9739d58c-8004-4844-a609-7ed95bdbf9aa tags:
``` python
list(conn.execute(text("""
show tables
""")))
```
%% Output
[('accounts',), ('users',)]
%% Cell type:code id:1fc2171c-9f09-4b40-b5e0-fcb84870ec7d tags:
``` python
conn.execute(text("""
INSERT INTO users (id, name) VALUES (1, "tyler")
"""))
```
%% Output
<sqlalchemy.engine.cursor.CursorResult at 0x765a714c6f90>
<sqlalchemy.engine.cursor.CursorResult at 0x7c45c97122e0>
%% Cell type:code id:a96a9978-0927-4886-bd72-225150f9a5a2 tags:
``` python
# conn.execute(text("""
# INSERT INTO users (id, name) VALUES (1, "tyler")
# """))
```
%% Cell type:code id:0dbc816b-4f66-4b19-bcb6-e1b5212ef469 tags:
``` python
list(conn.execute(text("""
SELECT *
FROM users
""")))
```
%% Output
[(1, 'tyler', None)]
%% Cell type:code id:45c24702-0285-43fe-ae0e-b9d01adc2a37 tags:
``` python
conn.commit()
```
%% Cell type:code id:2a18a93d-20c1-4f02-875a-e6154ad7aef5 tags:
``` python
conn.execute(text("""
INSERT INTO accounts (user_id, name, amount)
VALUES (1, "A", 10), (1, "B", 20)
"""))
```
%% Output
<sqlalchemy.engine.cursor.CursorResult at 0x765a842ece50>
<sqlalchemy.engine.cursor.CursorResult at 0x7c45c9712d60>
%% Cell type:code id:753e807c-51d8-4190-9e8b-92e95bc8030b tags:
``` python
conn.commit()
```
%% Cell type:code id:ca9c9379-ba53-42f0-9e11-eed9921adc01 tags:
``` python
# this would break an invariant, so it's not allowed!
# foreign keys are still referencing user id 1
#
# conn.execute(text("""
# DELETE FROM users WHERE id = 1;
# """))
```
%% Cell type:code id:75ae1cff-684a-4c04-9ce3-f619edea898c tags:
``` python
import pandas as pd
```
%% Cell type:code id:de914434-5eb7-465b-aec4-8bec6953b623 tags:
``` python
url = "https://raw.githubusercontent.com/cfpb/api/master/resources/datasets/hmda/code_sheets/"
df = pd.read_csv(url + "action_taken.csv")
df.to_sql("actions", conn, index=False, if_exists="replace")
df = pd.read_csv(url + "loan_type.csv")
df.to_sql("loan_types", conn, index=False, if_exists="replace")
df = pd.read_csv(url + "loan_purpose.csv")
df.to_sql("purposes", conn, index=False, if_exists="replace")
conn.commit()
```
%% Cell type:code id:4c24c47e-8f03-4b84-9647-6fd1559a4b0b tags:
``` python
import pyarrow as pa
import pyarrow.csv, pyarrow.parquet
t = pa.parquet.read_table(
"loans.parquet",
columns=["lei", "action_taken", "loan_type",
"loan_amount", "interest_rate", "loan_purpose", "income"
]
)
```
%% Cell type:code id:a0f419e7-b0d7-4bf1-94ad-4c4ca961ada6 tags:
``` python
t.to_pandas().to_sql("loans", conn, index=False,
if_exists="replace", chunksize=10_000)
```
%% Output
447367
%% Cell type:code id:96846d26-3aa8-414a-a925-eafa9fe60f50 tags:
``` python
conn.commit()
```
%% Cell type:markdown id:24e3be3e-5296-44fa-a850-c1e0df34cd38 tags:
# Transactions
%% Cell type:code id:791ad3aa-0a41-4c5c-b1ee-6d9b5b95e023 tags:
``` python
conn.execute(text("""
update accounts set amount = amount + 5 where name = 'B'
"""))
conn.execute(text("""
update accounts set amount = amount - 5 where name = 'A'
"""))
# invariant: account cannot go negative
remaining_amount = list(conn.execute(text(
"select amount from accounts where name = 'A'"
)))[0][0]
print("remaining:", remaining_amount)
if remaining_amount >= 0:
print("commit!")
conn.commit()
else:
print("rollback!")
conn.rollback()
```
%% Output
remaining: -2
rollback!
remaining: 5
commit!
%% Cell type:code id:10f7d983-69fc-4800-b529-b7f19e0cdb73 tags:
``` python
# conn.rollback() or conn.commit()
```
%% Cell type:markdown id:49392cee-e7b9-4ce0-9ce3-961965443b3d tags:
# Analyze/Query the Data
%% Cell type:code id:ed5f92fc-ac76-4307-bd0c-6b714ed5a699 tags:
``` python
# what are all the possible actions? Practice SELECT/FROM.
pd.read_sql("""
SELECT *
FROM actions
""", conn)
```
%% Output
id action_taken
0 1 Loan originated
1 2 Application approved but not accepted
2 3 Application denied by financial institution
3 4 Application withdrawn by applicant
4 5 File closed for incompleteness
5 6 Loan purchased by the institution
6 7 Preapproval request denied by financial instit...
7 8 Preapproval request approved but not accepted
%% Cell type:code id:8349be60-02f4-43bb-9cf2-568bc70d2c75 tags:
``` python
# what are the first 10 loans? Practice LIMIT.
pd.read_sql("""
SELECT *
FROM loans
LIMIT 10
""", conn)
```
%% Output
lei action_taken loan_type loan_amount interest_rate \
0 54930034MNPILHP25H80 6 1 305000.0 3.875
1 54930034MNPILHP25H80 4 1 65000.0 NA
2 54930034MNPILHP25H80 6 1 75000.0 3.25
3 54930034MNPILHP25H80 1 1 155000.0 4.0
4 54930034MNPILHP25H80 1 1 305000.0 3.25
5 54930034MNPILHP25H80 1 1 175000.0 3.375
6 54930034MNPILHP25H80 1 1 575000.0 4.5
7 54930034MNPILHP25H80 1 1 105000.0 5.375
8 54930034MNPILHP25H80 1 1 85000.0 3.375
9 549300FQ2SN6TRRGB032 1 1 405000.0 Exempt
loan_purpose income
0 1 108.0
1 1 103.0
2 1 146.0
3 32 70.0
4 1 71.0
5 1 117.0
6 1 180.0
7 1 180.0
8 1 136.0
9 1 NaN
%% Cell type:code id:f99ed83c-d50c-43f7-bf3c-7307bb30801b tags:
``` python
# projection: choosing what columns (SELECT)
```
%% Cell type:code id:d2fe1388-b308-4f8c-8a63-c426c0f1b787 tags:
``` python
# selection: filtering rows (WHERE)
```
%% Cell type:code id:0da82d35-5e7f-481b-a3fe-9d828b541794 tags:
``` python
# what are the first 10 interest rates and loan amounts (in thousands)? Practice SELECT.
pd.read_sql("""
SELECT interest_rate, loan_amount / 1000 AS loan_thousands
FROM loans
LIMIT 10
""", conn)
```
%% Output
interest_rate loan_thousands
0 3.875 305.0
1 NA 65.0
2 3.25 75.0
3 4.0 155.0
4 3.25 305.0
5 3.375 175.0
6 4.5 575.0
7 5.375 105.0
8 3.375 85.0
9 Exempt 405.0
%% Cell type:code id:4ee4b0cb-7671-4570-9e18-fcd4b8c0394c tags:
``` python
# what are the loans for individuals with income over $1 million? Practice WHERE.
pd.read_sql("""
SELECT *
FROM loans
WHERE income > 1000000
""", conn)
```
%% Output
lei action_taken loan_type loan_amount interest_rate \
0 254900IER2H3R8YLBW04 1 1 105000.0 2.875
1 3Y4U8VZURTYWI1W2K376 3 1 7455000.0 NA
2 549300CS1XP28EERR469 1 1 75000.0 4.99
3 549300CS1XP28EERR469 1 1 205000.0 3.75
loan_purpose income
0 31 1530000.0
1 4 94657029.0
2 4 2030000.0
3 1 7291000.0
%% Cell type:code id:d64a2a5e-e2d4-42a3-a894-9283160d2636 tags:
``` python
# what are the five biggest loans in terms of dollar amount? Practice ORDER BY.
pd.read_sql("""
SELECT *
FROM loans
ORDER BY loan_amount DESC
LIMIT 5
""", conn)
```
%% Output
lei action_taken loan_type loan_amount interest_rate \
0 549300XWUSRVVOHPRY47 6 1 264185000.0 NA
1 AD6GFRVSDT01YPT1CS68 1 1 74755000.0 1.454
2 AD6GFRVSDT01YPT1CS68 4 2 66005000.0 NA
3 YQI2CPR3Z44KAR0HG822 1 1 65005000.0 3.0
4 254900YA1AQXNM8QVZ06 1 2 63735000.0 2.99
loan_purpose income
0 1 None
1 1 None
2 1 None
3 1 None
4 2 None
%% Cell type:code id:c01b796c-ded3-4e11-81e7-69d7660da9cb tags:
``` python
# what are the actions taken and types for those loans (show the text, not numbers)? Practice INNER JOIN.
pd.read_sql("""
SELECT actions.action_taken, loan_types.loan_type, loans.lei, loans.loan_amount, loans.interest_rate
FROM loans
INNER JOIN actions ON loans.action_taken = actions.id
INNER JOIN loan_types ON loans.loan_type = loan_types.id
ORDER BY loan_amount DESC
LIMIT 5
""", conn)
```
%% Output
action_taken loan_type lei \
0 Loan purchased by the institution Conventional 549300XWUSRVVOHPRY47
1 Loan originated Conventional AD6GFRVSDT01YPT1CS68
2 Application withdrawn by applicant FHA-insured AD6GFRVSDT01YPT1CS68
3 Loan originated Conventional YQI2CPR3Z44KAR0HG822
4 Loan originated FHA-insured 254900YA1AQXNM8QVZ06
loan_amount interest_rate
0 264185000.0 NA
1 74755000.0 1.454
2 66005000.0 NA
3 65005000.0 3.0
4 63735000.0 2.99
%% Cell type:code id:b7d4a687-70bf-46e7-a715-013977358d05 tags:
``` python
# what is a loan_purpose that doesn't appear in the loans table? Practice LEFT/RIGHT JOIN.
pd.read_sql("""
SELECT *
FROM loans
RIGHT JOIN purposes ON loans.loan_purpose = purposes.id
WHERE loans.loan_purpose IS NULL
""", conn)
```
%% Output
lei action_taken loan_type loan_amount interest_rate loan_purpose income \
0 None None None None None None None
id loan_purpose
0 3 Refinancing
%% Cell type:code id:fc73517c-cf57-4a91-bb9d-2fbfc76544d5 tags:
``` python
# how many rows are in the table? Practice COUNT(*).
pd.read_sql("""
SELECT COUNT(*)
FROM loans
""", conn)
```
%% Output
COUNT(*)
0 447367
%% Cell type:code id:e91feeee-8689-4f57-a991-f6ca3fee2a6d tags:
``` python
# how many non-null values are in the income column? Practice COUNT(column).
pd.read_sql("""
SELECT COUNT(income)
FROM loans
""", conn)
```
%% Output
COUNT(income)
0 399948
%% Cell type:code id:d688a1a3-3740-4dcf-8de5-b8f9f3e928b3 tags:
``` python
pd.read_sql("""
SELECT *
FROM loan_types
""", conn)
```
%% Output
id loan_type
0 1 Conventional
1 2 FHA-insured
2 3 VA-guaranteed
3 4 FSA/RHS-guaranteed
%% Cell type:code id:12333532-0618-4ed4-b71b-260a4f35e581 tags:
``` python
# what is the average interest rate for loans of type "Conventional"? Practice AVG.
pd.read_sql("""
SELECT AVG(interest_rate)
FROM loans
INNER JOIN loan_types ON loans.loan_type = loan_types.id
WHERE loan_types.loan_type = 'Conventional'
""", conn)
```
%% Output
AVG(interest_rate)
0 2.21657
%% Cell type:code id:23c00af3-e385-435a-8bf6-f5cfe64f02db tags:
``` python
# how many loans are there of each type? Practice GROUP BY.
pd.read_sql("""
SELECT loan_types.loan_type, AVG(interest_rate), COUNT(*)
FROM loans
INNER JOIN loan_types ON loans.loan_type = loan_types.id
GROUP BY loan_types.loan_type
""", conn)
```
%% Output
loan_type AVG(interest_rate) COUNT(*)
0 Conventional 2.216570 389217
1 VA-guaranteed 1.919140 24551
2 FHA-insured 2.211670 30496
3 FSA/RHS-guaranteed 2.523942 3103
%% Cell type:code id:2400a6a4-7056-47e0-b202-3bbb85a77b2f tags:
``` python
# which loan types appear at least 10,000 times? Practice HAVING.
pd.read_sql("""
SELECT loan_types.loan_type, AVG(interest_rate), COUNT(*) as count
FROM loans
INNER JOIN loan_types ON loans.loan_type = loan_types.id
GROUP BY loan_types.loan_type
HAVING count >= 10000
""", conn)
```
%% Output
loan_type AVG(interest_rate) count
0 Conventional 2.21657 389217
1 VA-guaranteed 1.91914 24551
2 FHA-insured 2.21167 30496
......
......@@ -37,7 +37,7 @@ python3 client.py CalcAvgLoan -c <county_code>
### Docker Compose
Take a look at the provided Docker compose file. There are several services, including `datanodes` with 3 replicas, a `namenode`, a `SQL server`, a `gRPC Server`. The NameNode service will serve at the host of `boss` within the docker compose network.
Take a look at the provided Docker compose file. There are several services, including 3 `datanodes`, a `namenode`, a `SQL server`, a `gRPC Server`. The NameNode service will serve at the host of `boss` within the docker compose network.
### gRPC
......@@ -63,25 +63,12 @@ variable. You can set it to p4 in your environment:
export PROJECT=p4
```
**Hint 1:** The command `docker logs <container-name> -f` might be very useful for troubleshooting. It allows you to view real-time output from a specific container.
**Hint 2:** Think about whether there is any .sh script that will help you quickly test code changes. For example, you may want it to rebuild your Dockerfiles, cleanup an old Compose cluster, and deploy a new cluster.
**Hint 3:** If you're low on disk space, consider running `docker system prune -a --volumes -f`
<!--
**Hint 3:** You might find it really helpful to use these command below to clean up the disk space occupied by Docker iamges/containers/networks/volumes artifacts. during the development of this project.
```bash
docker image prune -a -f
docker container prune -f
docker network prune -f
docker volume prune -f
docker system prune -a --volumes -f #Equivalent to the combination of all cmd above
```
-->
## Part 1: `DbToHdfs` gRPC Call
In this part, your task is to implement the `DbToHdfs` gRPC call (you can find the interface definition in the proto file).
......@@ -106,11 +93,11 @@ In this part, your task is to implement the `DbToHdfs` gRPC call (you can find t
2. What are the actual types for those loans?
Perform an inner join on these two tables so that a new column `loan_type_name` added to the `loans` table, where its value is the corresponding `loan_type_name` from the `loan_types` table based on the matching `loan_type_id` in `loans`.
3. Filter all rows where `loan_amount` is **greater than 30,000** and **less than 800,000**. After filtering, this table should have only **426716** rows.
4. Upload the generated table to `/hdma-wi-2021.parquet` in the HDFS, with **3x** replication and a **1-MB** block size, using PyArrow (https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html).
4. Upload the generated table to `/hdma-wi-2021.parquet` in the HDFS, with **2x** replication and a **1-MB** block size, using PyArrow (https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html).
To check whether the upload was correct, you can use `docker exec -it` to enter the gRPC server's container and use HDFS command `hdfs dfs -du -h <path>`to see the file size. The expected result is:
```
14.4 M 43.2 M hdfs://boss:9000/hdma-wi-2021.parquet
15.3 M 30.5 M hdfs://nn:9000/hdma-wi-2021.parquet
```
**Hint 1:** We used similar tables in lecture: https://git.doit.wisc.edu/cdis/cs/courses/cs544/s25/main/-/tree/main/lec/15-sql
......@@ -130,42 +117,45 @@ In this part, your task is to implement the `BlockLocations` gRPC call (you can
For example, running `docker exec -it p4-server-1 python3 /client.py BlockLocations -f /hdma-wi-2021.parquet` should show something like this:
```
{'49a28b8287ad': 16, 'fe1d14755eed': 16, 'b0db22d30950': 16}
{'7eb74ce67e75': 15, 'f7747b42d254': 6, '39750756065d': 11}
```
Note: DataNode location is the randomly generated container ID for the
container running the DataNode, so yours will be different, and the
distribution of blocks across different nodes may also vary.
distribution of blocks across different nodes will also likely vary.
The documents [here](https://hadoop.apache.org/docs/r3.3.6/hadoop-project-dist/hadoop-hdfs/WebHDFS.html) describe how we can interact with HDFS via web requests. Many [examples](https://requests.readthedocs.io/en/latest/user/quickstart/) show these web requests being made with the curl command, but you'll adapt those examples to use requests.get. By default, WebHDFS runs on port 9870. So use port 9870 instead of 9000 to access HDFS for this part.
Use a `GETFILEBLOCKLOCATIONS` operation to find the block locations.
## Part 3: `PartitionByCounty` and `CalcAvgLoan` gRPC Calls
## Part 3: `CalcAvgLoan` gRPC Call
In this part, your task is to implement the `PartitionByCounty` and `CalcAvgLoan` gRPC calls (you can find the interface definition in the proto file).
In this part, your task is to implement the `CalcAvgLoan` gRPC call (you can find the interface definition in the proto file).
Imagine a scenario where there could be many queries differentiated by `county`, and one of them is to get the average loan amount for a county. In this case, it might be much more efficient to generate a set of 1x Parquet files filtered by county, and then read data from these partitioned, relatively much smaller tables for computation.
The call should read hdma-wi-2021.parquet, filtering to rows with the specified county code. One way to do this would be to pass a `("column", "=", ????)` tuple inside a `filters` list upon read: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html
The call should return the average loan amount from the filtered table.
As an optimization, your code should also write the filtered data to a file named `partitions/<county_code>.parquet`. If there are later calls for the same county_code, your code should use the smaller, county-specific Parquet file (instead of filtering the big Parquet file with all loan applications). The county-specific Parquet file should have 1x replication. When `CalcAvgLoan` returns the average, it should also use the "source" field to indicate whether the data came from big Parquet file (`source="create"` because a new county-specific file had to be created) or a county-specific file was previously created (`source="reuse"`).
**PartitionByCounty:** To be more specific, you need to categorize the contents of that parquet file just stored in HDFS using `county_id` as the key. For each `county_id`, create a new parquet file that records all entries under that county, and then save them with a **1x replication**. Files should be written into folder `/partitioned/` and name for each should be their `county_id`.
One easy way to check if the county-specific file already exists is to just try reading it with PyArrow. You should get an `FileNotFoundError` exception if it doesn't exist.
<!--
Imagine a scenario where there could be many queries differentiated by `county`, and one of them is to get the average loan amount for a county. In this case, it might be much more efficient to generate a set of 1x Parquet files filtered by county, and then read data from these partitioned, relatively much smaller tables for computation.
**CalcAvgLoan:** To be more specific, for a given `county_id` , you need to return a int value, indicating the average `loan_amount` of that county. **Note:** You are required to perform this calculation based on the partitioned parquet files generated by `FilterByCounty`. `source` field in proto file can ignored in this part.
-->
The inside of the partitioned directory should look like this:
After a `DbToHdfs` call and a few `CalcAvgLoan` calls, your HDFS directory structure will look something like this:
```
├── hdma-wi-2021.parquet
├── partitioned/
│ ├── 55001.parquet
│ ├── 55003.parquet
│ └── ...
```
The root directory on HDFS should now look like this:
```
14.4 M 43.2 M hdfs://boss:9000/hdma-wi-2021.parquet
19.3 M 19.3 M hdfs://boss:9000/partitioned
```
## Part 4: Fault Tolerance
In this part, your task is to modify the `CalcAvgLoan` gRPC calls you implemented in Part 3.
......
......@@ -6,7 +6,7 @@ services:
deploy:
resources:
limits:
memory: 2g
memory: 3g
mysql:
image: ${PROJECT}-mysql
......@@ -19,7 +19,7 @@ services:
deploy:
resources:
limits:
memory: 2g
memory: 1g
nn:
image: ${PROJECT}-nn
hostname: boss
......
......@@ -17,11 +17,10 @@ message CalcAvgLoanReq {
message CalcAvgLoanResp {
int32 avg_loan = 1;
string source = 2; // partitioned or unpartitioned?
string source = 2; // create, reuse, or recreate
string error = 3;
}
message StatusString{
string status= 1;
}
......@@ -29,10 +28,10 @@ message StatusString{
service Lender {
//Load input.data from SQL server and upload it to HDFS
rpc DbToHdfs (Empty) returns (StatusString);
//Get the block locations of the Parquet file in HDFS
rpc BlockLocations (BlockLocationsReq) returns (BlockLocationsResp);
//Classify the data in input.data based on county_code, and save each county_code as a separate Parquet file.
rpc PartitionByCounty (Empty) returns (StatusString);
//Calculate the average loan amount for a given county_code
rpc CalcAvgLoan (CalcAvgLoanReq) returns (CalcAvgLoanResp);
}
\ No newline at end of file
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment