diff --git a/p4/.gitignore b/p4/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..5c1e56c427e8fc30fadd1a6a99a5f20cb6145685 --- /dev/null +++ b/p4/.gitignore @@ -0,0 +1,3 @@ +venv/ +tmp/ +*.csv diff --git a/p4/Dockerfile.datanode b/p4/Dockerfile.datanode new file mode 100644 index 0000000000000000000000000000000000000000..09fc6df9f0bf33164c44a7a54d984afde8093cfe --- /dev/null +++ b/p4/Dockerfile.datanode @@ -0,0 +1,3 @@ +FROM p4-hdfs +CMD export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob` && \ + hdfs datanode -D dfs.datanode.data.dir=/var/datanode -fs hdfs://boss:9000 \ No newline at end of file diff --git a/p4/Dockerfile.hdfs b/p4/Dockerfile.hdfs new file mode 100644 index 0000000000000000000000000000000000000000..256cf98a5102797e9eefb1fda302964bdcdda7df --- /dev/null +++ b/p4/Dockerfile.hdfs @@ -0,0 +1,9 @@ +FROM ubuntu:24.04 +RUN apt-get update; apt-get install -y wget curl openjdk-11-jdk python3-pip iproute2 nano + +# HDFS +RUN wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz; tar -xf hadoop-3.3.6.tar.gz; rm hadoop-3.3.6.tar.gz + +ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 +ENV PATH="${PATH}:/hadoop-3.3.6/bin" +ENV HADOOP_HOME=/hadoop-3.3.6 diff --git a/p4/Dockerfile.mysql b/p4/Dockerfile.mysql new file mode 100644 index 0000000000000000000000000000000000000000..f7014cd19b155f8fea9c8d0974f3420a2342eab5 --- /dev/null +++ b/p4/Dockerfile.mysql @@ -0,0 +1,8 @@ +FROM mysql:8.4.0-oraclelinux8 + +WORKDIR / +#COPY init.sql /docker-entrypoint-initdb.d/ +COPY hdma-wi-2021.sql.gz /docker-entrypoint-initdb.d/ +RUN gzip -d /docker-entrypoint-initdb.d/hdma-wi-2021.sql.gz +CMD ["mysqld"] + diff --git a/p4/Dockerfile.namenode b/p4/Dockerfile.namenode new file mode 100644 index 0000000000000000000000000000000000000000..bc39ef7c2e105405ddea41b2805aaa99217fd632 --- /dev/null +++ b/p4/Dockerfile.namenode @@ -0,0 +1,4 @@ +FROM p4-hdfs +CMD export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob` && \ + hdfs namenode -format &&\ + hdfs namenode -D dfs.namenode.stale.datanode.interval=10000 -D dfs.namenode.heartbeat.recheck-interval=30000 -fs hdfs://boss:9000 diff --git a/p4/Outline.md b/p4/Outline.md deleted file mode 100644 index 9bd677e94da85e8767db7963ff8803cb80ec5af4..0000000000000000000000000000000000000000 --- a/p4/Outline.md +++ /dev/null @@ -1,89 +0,0 @@ -<!-- -I think both approaches are fine for this project. Using Jupyter will be more detailed and fundamental, while the application approach will be more engaging. - - - -# Jupyter Way -## Part1: Setup and SQL Query - -Offer them `docker-compose.yml` , `Dockerfile.sql` , `Dockerfile.hdfs` and `Dockerfile.notebook`, while they are required to complete `Dockerfile.namenode`, `Dockerfile.datanode` . - -In `Dockerfile.sql`, we download data, deploy SQL server, get it ready to be queried - -Then the whole system can be established by running `docker compose up` . - -**Q1: Connect to SQL server and query** - -In jupyter, use `mysqlconnector` to connect to SQL server, then do specific queries, then print the result. - -**Q2: Persist a table from SQL** - -Read a table from SQL server and save it separately `input.parquet`. - -## Part2 Data Upload and HDFS status - -**Q3: Check the number of living datanodes** - -run `hdfs dfsadmin -fs -report` command to get the status of HDFS. - -<br> -Then upload and `input.parquet` to HDFS with 2x replication. - -**Q4: what are the logical and physical sizes of the parquet files?** - -Run `hdfs dfs -du -h hdfs://boss:9000/` - -## Part3 PyArrow - -**Q5: What is the average of `XXX` (something like this)** - -Use PyArrow to read from HDFS and do some calculation. -<br> - - -Ask them to do some more complex calculations and store results as a `output.parquet` back to HDFS with 1 replication. - -**Q6: blocks distribution across the two DataNode for** `output.parquet` (2x) -Use the WebHDFS `OPEN` operation with `offset` 0 and `noredirect=true` to get it. - -output is like:`{'755329887c2a': 9, 'c181cd6fd6fe': 7}` - -**Q7: blocks distribution across the two DataNode for** `output.parquet` (1x) - -Use the WebHDFS `GETFILEBLOCKLOCATIONS` and iterate every block for counting. - -## Part 4: Disaster Strikes - -Kill one datanode manually. - -**Q8: how many live DataNodes are in the cluster?** -Run `hdfs dfsadmin -fs -report` again, but expecting `Live datanodes (1)` - - -<br> -Ask students to access `result.parquet` , which expected to fail. - -**Q9: how many blocks of single.parquet were lost?** - -Use `OPEN` or `GETFILEBLOCKLOCATIONS` to get that. - -**Q10: return specific line of output by recalculate with replicated** `input.parquet` - -# Application Way - ---> - -Offer them `docker-compose.yml` , `Dockerfile.sql` , `Dockerfile.hdfs` and `Dockerfile.notebook`, while they are required to complete `Dockerfile.namenode`, `Dockerfile.datanode`. - -Then the main system can be established by running `docker compose up`. - -Students need to: - -1. Define interfaces, `grpc` or `flask` -2. Write a `server.py`: read data from SQL, save them as `input.parquet`, store `input.parquet` in HDFS with 1x rep, do calculation, store `output.parquet` in HDFS with 1x rep, then start serving(`grpc` or `flask`). -3. Manually kill one datanode. -4. Add logic for data disaster recovery: -<blockquote> - - * If the output data is incomplete, read from the input and compute the result directly. - * If a data node has restarted, recompute and store the output.</blockquote> \ No newline at end of file diff --git a/p4/README.md b/p4/README.md new file mode 100644 index 0000000000000000000000000000000000000000..d8eedc2ee797fe4c2e427ebcda09054a7a1a01ef --- /dev/null +++ b/p4/README.md @@ -0,0 +1,216 @@ +# P4 (4% of grade): SQL and HDFS + +## Overview + +In this project, you will depoly a data system consisting of an SQL server, an HDFS cluster, a gRPC server, and a client. You will need to read and filter data from an SQL database and persist it to HDFS. Then, you will partition the data to enable efficient request processing. Additionally, you will need to utilize the fault-tolerant mechanism of HDFS to handle requests even in scenarios where HDFS data nodes fail, which you will simulate as part of this project. + +Learning objectives: +* to communicate with the SQL Server using SQL queries. +* to use the WebHDFS API. +* to utilize PyArrow to interact with HDFS and process Parquet files. +* to handle data loss scenarios + +Before starting, please review the [general project directions](../projects.md). + +## Corrections/Clarifications + +None yet. + +## Introduction + +You'll need to deploy a system including 5 docker containers like this: + +<img src="arch.png" width=600> + + +We have provided the other components; what you only need is to complete the work within the gRPC server and its Dockerfile. +### Client +This project will use `docker exec -it` to run the client on the gRPC server's container. Usage of `client.py` is as follows: +``` +#Inside the server container +python3 client.py DbToHdfs +python3 client.py BlockLocations -f <file_path> +python3 client.py PartitionByCounty +python3 client.py CalcAvgLoan -c <county_code> +``` + +### Docker Compose + +Take a look at the provided Docker compose file. There are several services, including `datanodes` with 3 replicas, a `namenode`, a `SQL server`, a `gRPC Server`. The namenode service will serve at the host of `boss` within the docker compose network. + +### gRPC +You are required to write a srever.py and Dockerfile.server based on the provided proto file (you may not modify it). + +The gRPC interfaces have already been defined, see `lender.proto` for details. There are no constraints on the return values of `DbToHdfs` and `FilterByCounty`, so you may define them freely as needed. + + + + +### Docker image naming +You need to build the Docker image following this naming convention. +``` +docker build . -f Dockerfile.hdfs -t p4-hdfs +docker build . -f Dockerfile.namenode -t p4-nn +docker build . -f Dockerfile.datanode -t p4-dn +docker build . -f Dockerfile.mysql -t p4-mysql +docker build . -f Dockerfile.server -t p4-server +``` +### PROJECT environment variable +Note that the compose file assumes there is a "PROJECT" environment +variable. You can set it to p4 in your environment: + +``` +export PROJECT=p4 +``` + + +**Hint 1:** Command `docker logs <container-name> -f` might be very useful when issue locating and bug fixing. It allows you to view real-time output from a specific container. If you don’t want the output to occupy your terminal, you can remove `-f` to view the logs up to the current point. + + +**Hint 2:** Think about whether there is any .sh script that will help you quickly test code changes. For example, you may want it to rebuild your Dockerfiles, cleanup an old Compose cluster, and deploy a new cluster. + +**Hint 3:** You might find it really helpful to use these command below to clean up the disk space occupied by Docker iamges/containers/networks/volumes artifacts. during the development of this project. +```bash +docker image prune -a -f +docker container prune -f +docker network prune -f +docker volume prune -f +docker system prune -a --volumes -f #Equivalent to the combination of all cmd above +``` + + +## Part 1: Load data from SQL Database to HDFS + +In this part, your task is to implement the `DbToHdfs` gRPC call (you can find the interface definition in the proto file). + +**DbToHdfs:** To be more specific, you need to: +1. Connect to the SQL server, with the database name as `CS544` and the password as `abc`. There are two tables in databse: `loans` ,and `loan_types`. The former records all information related to loans, while the latter maps the numbers in the loan_type column of the loans table to their corresponding loan types. There should be **447367** rows in table `loans`. It's like: + ```mysql + mysql> show tables; + +-----------------+ + | Tables_in_CS544 | + +-----------------+ + | loan_types | + | loans | + +-----------------+ + mysql> select count(*) from new_table; + +----------+ + | count(*) | + +----------+ + | 426716 | + +----------+ + ``` +2. What are the actual types for those loans? +Perform an inner join on these two tables so that a new column `loan_type_name` added to the `loans` table, where its value is the corresponding `loan_type_name` from the `loan_types` table based on the matching `loan_type_id` in `loans`. +3. Filter all rows where `loan_amount` is **greater than 30,000** and **less than 800,000**. After filtering, this table should have only **426716** rows. +4. Upload the generated table to `/hdma-wi-2021.parquet` in the HDFS, with **3x** replication. + + +To check whether the upload was correct, you can use `docker exec -it` to enter the gRPC server's container and use HDFS command `hdfs dfs -du -h <path>`to see the file size. The expected result is: + ``` + 14.4 M 43.2 M hdfs://boss:9000/hdma-wi-2021.parquet + ``` +**Hint1:** If you are not familiar with those tables, you can use SQL queries to print the table schema or retrieve sample data. A convenient way to do this is to use `docker exec -it` to enter the SQL Server, then run mysql client `mysql -p CS544` to access the SQL Server and then perform queries. + +**Hint2:** After `docker compose up`, the SQL Server needs some time to load the data before it is ready. Therefore, you need to Wait for a while, or preferably, add a retry mechanism for the SQL connection. +## Part 2: BlockLocations + +In this part, your task is to implement the `BlockLocations` gRPC call (you can find the interface definition in the proto file). + +**BlockLocations:** To be more specific, for a given file path, you need to return a python `directory`(i.e. `map` in proto), recording how many blocks there are in each data node(key is the **datanode location** and value is **number** of blocks on that node). + +The return value of gRPC call `BlockLocations` should be something like: +``` +{'dd8706a32f34': 6, '21a88993bb15': 4, '47c17821001f': 5} +``` +Note: datanode loacation is the randomly generated container ID for +the container running the DataNode, so yours will be different, and the distribution of blocks across different nodes may also vary. + +The documents [here](https://hadoop.apache.org/docs/r3.3.6/hadoop-project-dist/hadoop-hdfs/WebHDFS.html) describe how we can interact with HDFS via web requests. Many [examples](https://requests.readthedocs.io/en/latest/user/quickstart/) show these web requests being made with the curl command, but you'll adapt those examples to use requests.get. By default, WebHDFS runs on port 9870. So use port 9870 instead of 9000 to access HDFS for this part. + +**Hint1:** Note that if `r` is a response object, then `r.content` will contain some bytes, which you could convert to a dictionary; alternatively,`r.json()` does this for you. + +For example: +```python +r = requests.get("http://boss:9870/webhdfs/v1/<filepath>?op=GETFILESTATUS") +r.raise_for_status() +print(r.json()) +``` +You might get a result like: +``` +{'FileStatus': {... + 'blockSize': 1048576, + ... + 'length': 16642976, + ... + 'replication': 1, + 'storagePolicy': 0, + 'type': 'FILE'}} +``` + + +**Hint2:** You can use operation `GETFILESTATUS` to get file `lenth` and `blocksize`, and use the `open`operation with appropriate offset to retrieve the node where each block is located. + +## Part 3: Calculate the average loan for each county + +In this part, your task is to implement the `PartitionByCounty` and `CalcAvgLoan` gRPC calls (you can find the interface definition in the proto file). + +Imagine a scenario where there could be many queries differentiated by `county`, and one of them is to get the average loan amount for a county. In this case, it might be much more efficient to generate a set of 1x Parquet files filtered by county, and then read data from these partitioned, relatively much smaller tables for computation. + +**PartitionByCounty:** To be more specific, you need to categorize the contents of that parquet file just stored in HDFS using `county_id` as the key. For each `county_id`, create a new parquet file that records all entries under that county, and then save them with a **1x replication**. Files should be written into folder `/partitioned/` and name for each should be their `county_id`. + +**CalcAvgLoan:** To be more specific, for a given `county_id` , you need to return a int value, indicating the average `loan_amount` of that county. **Note:** You are required to perform this calculation based on the partitioned parquet files generated by `FilterByCounty`. `source` field in proto file can ignored in this part. + +The inside of the partitioned directory should look like this: + + ``` + ├── partitioned/ + │ ├── 55001.parquet + │ ├── 55003.parquet + │ └── ... + ``` + +The root directory on HDFS should now look like this: +``` +14.4 M 43.2 M hdfs://boss:9000/hdma-wi-2021.parquet +19.3 M 19.3 M hdfs://boss:9000/partitioned +``` + + +## Part 4: Fault Tolerant + +In this part, your task is to modify the `CalcAvgLoan` gRPC calls you implemented in Part3. + +Imagine a scenario where one (or even two) of the three DataNodes fail and go offline. In this case, can the logic implemented in Part 3 still function correctly? If we still want our gRPC server to correctly respond to requests for calculating the average loan, what should we do? + +Keep in mind that `/hdma-wi-2021.parquet` has a replication factor of 3, while `/partitioned` only has a replication factor of 1. + +**CalcAvgLoan:** To be more specific, modify this gRPC call to support the server in correctly handling responses even when one (or even two) DataNode is offline. You may try reading the partitioned parquet file first. If unsuccessful, then go back to the large `hdma-wi-2021.parquet` file and complete the computation. What's more, you have to return with the field `source` filled: +1. "partitioned": calculation performed on parquet partitioned before +2. "unpartitioned": parquet partitioned before is lost, and calculation performed on the initial unpartitioned table + +To simulate a data node failure, you should use `docker kill` to terminate a node and then wait until you confirm that the number of `live DataNodes` has decreased using the `hdfs dfsadmin -fs <hdfs_path> -report` command. + + +## Submission + +Read the directions [here](../projects.md) about how to create the +repo. + +You have some flexibility about how you write your code, but we must be able to run it like this: + +``` +docker build . -f Dockerfile.hdfs -t p4-hdfs +docker build . -f Dockerfile.namenode -t p4-nn +docker build . -f Dockerfile.datanode -t p4-dn +docker build . -f Dockerfile.mysql -t p4-mysql +docker build . -f Dockerfile.server -t p4-server +docker compose up -d +``` + +We will copy in the all the dockerfiles except "Dockerfile.server", "docker-compose.yml", "client.py", "lender.proto", and "hdma-wi-2021.sql.gz", overwriting anything you might have changed. + +Please make sure you have `client.py` copied into p4-server-1. We will run client.py in p4-server-1 to test your code. + +## Tester +Not released yet. diff --git a/p4/arch.png b/p4/arch.png new file mode 100644 index 0000000000000000000000000000000000000000..0e02b767819b7723622ffa03e1f21d31a3291340 Binary files /dev/null and b/p4/arch.png differ diff --git a/p4/client.py b/p4/client.py new file mode 100644 index 0000000000000000000000000000000000000000..5996b28244bb342f2d08814db7de57dcd5fb2ae1 --- /dev/null +++ b/p4/client.py @@ -0,0 +1,41 @@ +import sys +import grpc +import argparse +from concurrent import futures +import lender_pb2, lender_pb2_grpc +import pandas as pd + + +parser = argparse.ArgumentParser(description="argument parser for p4 clinet") + + +parser.add_argument("mode", help="which action to take", choices=["SqlQuery", "DbToHdfs","PartitionByCounty","BlockLocations","CalcAvgLoan"]) + +parser.add_argument("-c", "--code", type=int, default=0, help="county code to query average loan amount in CalcAvgLoan mode") +parser.add_argument("-f", "--file", type=str, default="", help="file path for BlockLocation") +args = parser.parse_args() + +channel = grpc.insecure_channel("server:5000") +stub = lender_pb2_grpc.LenderStub(channel) +if args.mode == "DbToHdfs": + resp = stub.DbToHdfs(lender_pb2.Empty()) + print(resp.status) +elif args.mode == "PartitionByCounty": + resp = stub.PartitionByCounty(lender_pb2.Empty()) + print(resp.status) +elif args.mode == "CalcAvgLoan": + resp = stub.CalcAvgLoan(lender_pb2.CalcAvgLoanReq(county_code=args.code)) + if resp.error: + print(f"error: {resp.error}") + else: + print(resp.avg_loan) + print(resp.source) +elif args.mode == "BlockLocations": + resp = stub.BlockLocations(lender_pb2.BlockLocationsReq(path=args.file)) + if resp.error: + print(f"error: {resp.error}") + else: + print(resp.block_entries) + + + diff --git a/p4/docker-compose.yml b/p4/docker-compose.yml new file mode 100644 index 0000000000000000000000000000000000000000..f65fb32dc5cb430d25aee7a86f5dcf60067660a2 --- /dev/null +++ b/p4/docker-compose.yml @@ -0,0 +1,31 @@ +name: ${PROJECT} + +services: + server: + image: ${PROJECT}-server + + + mysql: + image: ${PROJECT}-mysql + hostname: mysql + #ports: + # - "127.0.0.1:3306:3306" + + environment: + MYSQL_ROOT_PASSWORD: abc + MYSQL_DATABASE: CS544 + nn: + image: ${PROJECT}-nn + hostname: boss + deploy: + resources: + limits: + memory: 0.75g + + dn: + image: ${PROJECT}-dn + deploy: + replicas: 3 + resources: + limits: + memory: 0.75g diff --git a/p4/hdma-wi-2021.sql.gz b/p4/hdma-wi-2021.sql.gz new file mode 100644 index 0000000000000000000000000000000000000000..5f7977af6d1a60063224caf47454206ef3aafd68 Binary files /dev/null and b/p4/hdma-wi-2021.sql.gz differ diff --git a/p4/lender.proto b/p4/lender.proto new file mode 100644 index 0000000000000000000000000000000000000000..b2cb2b96beb8dab7620f0a6e5f8a3c9aa30160b8 --- /dev/null +++ b/p4/lender.proto @@ -0,0 +1,38 @@ +syntax="proto3"; + +message Empty{} + +message BlockLocationsReq { + string path = 1; +} + +message BlockLocationsResp { + map <string, int32> block_entries = 1; // list of block locations + string error = 2; +} + +message CalcAvgLoanReq { + int32 county_code = 1; +} + +message CalcAvgLoanResp { + int32 avg_loan = 1; + string source = 2; // partitioned or unpartitioned? + string error = 3; +} + + +message StatusString{ + string status= 1; +} + +service Lender { + //Load input.data from SQL server and upload it to HDFS + rpc DbToHdfs (Empty) returns (StatusString); + //Get the block locations of the Parquet file in HDFS + rpc BlockLocations (BlockLocationsReq) returns (BlockLocationsResp); + //Classify the data in input.data based on county_code, and save each county_code as a separate Parquet file. + rpc PartitionByCounty (Empty) returns (StatusString); + //Calculate the average loan amount for a given county_code + rpc CalcAvgLoan (CalcAvgLoanReq) returns (CalcAvgLoanResp); +} \ No newline at end of file