Skip to content
Snippets Groups Projects
Commit 7ecabf8a authored by TYLER CARAZA-HARTER's avatar TYLER CARAZA-HARTER
Browse files

rollback p3 spec to s25

parent 84d3f843
No related branches found
No related tags found
No related merge requests found
# P3 (4% of grade): Large, Thread-Safe Tables
# DRAFT! Don't start yet.
# DRAFT: DO NOT START
# P3 (4% of grade): Large, Thread-Safe Tables
## Overview
In this project, you'll build a server that handles the uploading of CSV files, storing their contents, and performing query operations on the data. The server maintains **only ONE** logical table. You should think of each uploaded CSV as containing a portion of that larger table, which grows with each upload.
The server will write two files for each uploaded CSV file: one in CSV format and another in Parquet (i.e., they are two copies of the table in different formats). Clients that we provide will communicate with your server via RPC calls.
### Workflow and Format Walkthrough
In P3, our `client.py` takes in a batch of operation commands stored in file `workload.txt` and executes them line by line. There are two types of commands you can put into `workload.txt` to control the client behavior. First, each *upload* command:
```
u file.csv
```
will instruct the client to read a CSV data file named `file.csv` as binary bytes, and use the corresponding RPC call to upload it to the server. Next, you can use a subsequent *sum* command to perform summation to the table over one specified column. For example:
```
s p x
```
asks the client to send an RPC request to the server and instruct the server to return the total sum value of column `x`. As there are two copies of the same table in `CSV` and `Parquet` format, `p` in the command asks the server to read column data only from `Parquet` files. Below is a minimal example. Assume that the `server` has uploaded two files `file1.csv` and `fil12.csv`, which contain these records respectively:
```
x,y,z
1,2,3
4,5,6
```
And:
In this project, you'll build a server that handles the uploading of
CSV files, storing their contents, and performing operations on the
data. You should think of each CSV upload as containing a portion of
a larger table that grows with each upload.
```
x,y
5,10
0,20
10,15
```
You can assume columns contain only integers. You should be able to upload the files and do sums with the following `workload.txt` description:
```
u file1.csv
u file2.csv
s p x
s p z
s c w
```
Expected ouptut would be:
```
20
9
0
```
Inspect both the `workload.txt` file content and client code (i.e., `read_workload_file()`) to understand how each text command leads to one `gRPC` call. A separate `purge.txt` workload file is provided and *should not be modified*. The client can use a RPC call `Purge()` to reset the server and remove all files stored by the remote peer.
The server will write two files for each uploaded CSV file: one in CSV
format and another in Parquet. Clients that we provide will
communicate with your server via RPC calls.
Learning objectives:
* Implement logic for uploading and processing CSV and Parquet files.
* Perform computations like summing values from specific columns.
* Manage concurrency with locking in multi-threading server/client.
* Benchmark a server/client system and visualize the results.
* Implement logic for uploading and processing CSV and Parquet files
* Perform computations like summing values from specific columns
* Manage concurrency with locking in a multi-threaded server
Before starting, please review the [general project directions](../projects.md).
## Clarifications/Corrections
* None yet
* none yet
## Part 1: Communication (gRPC)
## Part 1: Communication
In this project, the client program `client.py` will communicate with a server, `server.py`, via gRPC. We provide starter code for the client program. Your job is to write a `.proto` file to generate a gRPC stub (used by our client) and servicer class that you will inherit from in server.py.
In this project, three client programs (upload.py, csvsum.py, and
parquetsum.py) will communicate with a server, server.py, via gRPC.
We provide the client programs. Your job is to write a .proto to
generate a gRPC stub (used by our clients) and servicer class (that
you will inherit from in server.py).
Take a moment to look at code for the client code and answer the following questions:
Take a moment to look at code for the three client programs and answer
the following:
* what are the names of the imported gRPC modules? This will determine what you name your `.proto` file.
* what methods are called on the stubs? This will determine the RPC definitions in your `.proto` file.
* what arguments are passed to the methods, and what values are extracted from the return values? This will determine the fields in the messages in your `.proto` file.
* what port number does the client use? This will determine the port that the gRPC server should expose.
* what are the names of the imported gRPC modules? This will determine what you name your .proto file.
* what methods are called on the stubs? This will determine the RPC calls in your .proto
* what arguments are passed to the methods, and what values are extracted from the return values? This will determine the fields in the messages in your .proto
* what port number do the clients use? This will determine the port that the server should use.
Write a `.proto` file based on your above observations and run the `grpc_tools.protoc` compiler to generate stub code for our client and servicer code for your server. All field types will be strings, except `total` and `csv_data`,which should be `int64` and `bytes` respectively.
Write a .proto file based on your above observations and run the
grpc_tools.protoc tool to generate stub code for our clients and
servicer code for your server. All field types will be strings,
except `total` and `csv_data`, which should be `int64` and
`bytes` respectively.
Now build the .proto on your VM. Install the tools like this:
Now build the .proto on your VM. Install the tools like this:
```bash
```sh
python3 -m venv venv
source venv/bin/activate
pip3 install grpcio==1.70.0 grpcio-tools==1.70.0 protobuf==5.29.3
pip3 install grpcio==1.66.1 grpcio-tools==1.66.1 numpy==2.1.1 protobuf==5.27.2 pyarrow==17.0.0 setuptools==75.1.0
```
Then use `grpc_tools.protoc` to build your `.proto` file.
Then use `grpc_tools.protoc` to build your .proto file.
In your server, override the *three* RPC methods for the generated servicer. For now, the methods do nothing but returning messages with the error field set to "TODO", leaving any other field unspecified.
In your server, override the two RPC methods for the generated
servicer. For now, you can just return messages with the error field
set to "TODO", leaving any other field unspecified.
If communication is working correctly so far, you should be able to start a server and used a client to get back a "TODO" error message via gRPC:
If communication is working correctly so far, you should be able to
start a server and used a client to get back a "TODO" error message
via gRPC:
```bash
python3 -u server.py &> log.txt &
python3 client.py workload.txt
# should see multiple "TODO"s
```
Create a `Dockerfile.server` to build an image that will also let you run your server in a container. It should be possible to build and run your server like this:
```bash
docker build . -f Dockerfile.server -t ${PROJECT}-server
docker run -d -m 512m -p 127.0.0.1:5440:5440
python3 -u server.py &> log.txt &
python3 upload.py simple.csv
# should see "TODO"
```
Like P2, the compose file assumes a "PROJECT" environment variable. You can set it to p3 in your environment with (the autograder may use another prefix for testing):
Create a Dockerfile to build an image that will also let you run your
server in a container. It should be possible to build and run your
server like this:
```bash
export PROJECT=p3
```
The client program should then be able to communicate with the server program the same way it communicated with that outside of a container. Once your client program successfully interacts with the dockerized server, you should similarly draft a `Dockerfile.client` to build a container for `client.py`. Finally, test your setup with `docker compose`:
```bash
docker compose up -d
docker ps
# should see:
CONTAINER ID IMAGE COMMAND CREATED ...
fa8de65e0e7c p3-client "python3 -u /client.…" 2 seconds ago ...
4c899de6e43f p3-server "python3 -u /server.…" 2 seconds ago ...
docker build . -t p3
docker run -d -m 512m -p 127.0.0.1:5440:5440 p3
```
**HINT:** consider writing a .sh script that helps you redeploy code changes. Everytime you modify the source code `client.py/server.py/benchmark.py`, you may want to rebuild the images, bring down the previous docker cluster, and re-instantiate a new cluster.
The client programs should then be able to communicate with the
Dockerized programs the same way they communicated with the server
outside of a container.
## Part 2: Server Implementation
If you want to make code changes without rebuilding the image each
time, consider using a volume mount to make the latest version of
server.py on your VM replace the server.py in the file (if the server
is not at /server.py inside the container, modify accordingly):
You will need to implement three RPC calls on the server side:
### Upload
This method should:
1. Recover the uploaded CSV table from *binary* bytes carried by the RPC request message.
2. Write the table to a CSV file and write the same table to another file in Parquet format.
**Requirement:** Write two files to disk per upload. We will test your server with a 512MB memory limit. Do *NOT* keep the table data in memory.
**HINT 1:** You are free to decide the names and locations of the stored files. However, the server must keep these records to process future queries (for instance, you can add paths to a data structure like a list or dictionary).
**HINT 2:** Both `pandas` and `pyarrow` provide interfaces to write a table to file.
### ColSum
Whenever your server receives a column summation request, it should loop over all the data files that have been uploaded, compute a local sum for each such file, and finally return a total sum for the whole table.
The table does not have a fixed schema (i.e., it is not guaranteed that a column appears in any uploaded file). You should skip a file if it lacks the target column (e.g., z and w in the above example). The server should sum over either Parquet or CSV files according to the input `format` (not both). For a given column, the query results for format="parquet" should be the same as for format="csv", while performance may differ.
### Purge
```
docker run --rm -m 512m -p 127.0.0.1:5440:5440 -v ./server.py:/server.py p3
```
This method facilitates testing and subsequent benchmarking. The method should:
1. Remove all local file previously uploaded by method `Upload()`
2. Reset all associated server state (e.g., names, paths, etc.)
## Part 2: Upload
## Part 3: Multi-threading Server/Client
When your server receives an upload request with some CSV data, your
program should write the CSV to a new file somewhere. You can decide
the name and location, but the server must remember the path to the
file (for example, you could add the path to some data structure, like a
list or dictionary).
With the Global Interpreter Lock (GIL), commonly-used CPython does not support parallel multi-threading execution. However, multi-threading can still boost the performance of our small system (why?). In Part 3, you are required to add threading support to `client.py`, then `server.py`.
Your server should similarly write the same data to a parquet file
somewhere, using pyarrow.
### Client
## Part 3: Column Sum
More specifically, you will need to manually create *N* threads for `client.py` (with thread management primitives come with the `threading` module) to concurrently process the provided `workload.txt`. For example, each worker thread may repeatedly fetch one command line from `workload.txt` and process it. You can load all command strings to a list, then provide thread-safe access to all launched threads (how?).
When your server receives a column summation request, it should loop
over all the data that has been uploaded, computing a sum for each
file, and returning a total sum.
**HINT:** Before moving to work on the `server`, test your multi-threading client by running it with a single thread:
For example, assume file1.csv and file2.csv contain this:
```bash
python3 client.py workload.txt 1 # set to use only 1 thread
```
### Server
Now with concurrent requests sent from `client.py`, you must correspondingly protect your server from data race with `threading.Lock()`. Make sure only one thread can modify the server state (e.g., lists of names or paths). Note that you don't need to explicitly create threads for `server.py` as gRPC can do that for you. The following example code creates a thread pool with 8 threads:
```python
grpc.server(
futures.ThreadPoolExecutor(max_workers=8),
options=[("grpc.so_reuseport", 0)]
)
x,y,z
1,2,3
4,5,6
```
**Requirement 1:** The server should properly acquire then release the lock. A single global lock is sufficient. Lock release should also work with any potential exceptions.
**Requirement 2:** The server *MUST NOT* hold the lock when reading or writing files. A thread should release the lock right after it has done accessing the shared data structure. How could this behavior affect the performance?
And this:
## Part 4: Benchmarking the System
Congratulations, you have implemented a minimal multi-threading data system! Let's write a small script to finally benchmark it with different scales (i.e., number of worker threads). Overall, the script is expected to perform the following tasks:
1. Run `client.py` multiple times with different therading parameters, record their execution time.
2. plot the data to visualize the performance trend.
### Driving the Client
```
x,y
5,10
0,20
```
Each time `benchmark.py` should collect 4 pairs of data by running `client.py` with 1, 2, 4, and 8 thread(s). Wrap each `client.py` execution with a pair of timestamp collection. Then calculate the execution time. Make sure you always reset the server before sending the `workload.txt`, by issuing a `Purge()` command through `client.py`:
You should be able to upload the files and do sums as follows:
```bash
python3 client.py purge
# let sometime for the reset to complete
time.sleep(3)
# test follows...
```
python3 upload.py file1.csv
python3 upload.py file2.csv
python3 csvsum.py x # should print 10
python3 csvsum.py z # should print 9
python3 csvsum.py w # should print 0
```
You may also want `benchmark.py` to wait a few seconds for the `server` to get ready for any client RPC requests.
**HINT 1:** You can get a timestamp with `time.time()`.
You can assume any column you sum over contains only integers, but
some files may lack certain columns (e.g., it is OK to sum over z
above, even though file2.csv doesn't have that column).
**HINT 2:** There are multiple tools to launch a Python program from within another. Examples are [`os.system()`](https://docs.python.org/3/library/os.html#os.system) and [`subprocess.run`](https://docs.python.org/3/library/subprocess.html#subprocess.run).
The only difference between `csvsum.py` and `parquetsum.py` is that
they will pass the format string to your gRPC method as "csv" or
"parquet", respectively. Your server is expected to do the summing
over either the CSV or parquet files accordingly (not both). Given
the CSVs and parquets contain the same data, running `csvsum.py x`
should produce the same number as `parquetsum.py x`, though there may
be a performance depending on which format is used.
### Visualizing the Results
Parquet is a column-oriented format, so all the data in a single file
should be adjacent on disk. This means it should be possible to read
a column of data without reading the whole file. See the `columns`
parameter here:
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html
Plot a simple line graph with the execution time acquired by the previous step. Save the figure to a file called `plot.png`. Your figure must include at least 4 data points as mentioned above. You will find an example plot `plot.png` in the project repo.
**Requirement:** when the server is asked to sum over the column of a
Parquet file, it should only read the data from that column, not other
columns.
**HINT 1:** Feel free to any tools to plot the figure. Below is a minimal example to plot a dictionary with 2 data points:
**Note:** we will run your server with a 512-MB limit on RAM. Any
individual files we upload will fit within that limit, but the total
size of the files uploaded will exceed that limit. That's why your
server will have to do sums by reading the files (instead of just
keeping all table data in memory).
```python
import pandas as pd
data = {1: 100, 2: 200}
series = pd.Series(data)
ax = series.plot.line()
ax.get_figure().savefig("plot.png")
```
## Part 4: Locking
**HINT 2:** You can copy out a file `plot.png` from container named `test` to the current folder of host using:
You don't need to explicitly create threads using Python calls because
gRPC will do it for you. Set `max_workers` to 8 so that gRPC will
create 8 threads:
```bash
docker cp test:/plot.png plot.png
```
grpc.server(
futures.ThreadPoolExecutor(max_workers=????),
options=[("grpc.so_reuseport", 0)]
)
```
## Submission
Delirable should work with `docker-compose.yaml` we provide:
1. `Dockerfile.client` must launch `benchmark.py` **(NOT `client.py`)**. To achieve this, you need to copy both `client.py` and the driver `benchmark.py` to the image, as well as `workload.txt`, `purge.txt`, and the two sample input CSV files `file1.csv` `file2.csv`.
2. `Dockerfile.server` must launch `server.py`.
Now that your server has multiple threads, your code should hold a
lock (https://docs.python.org/3/library/threading.html#threading.Lock)
whenever accessing any shared data structures, including the list(s)
of files (or whatever data structure you used). Use a single global
lock for everything. Ensure the lock is released properly, even when
there is an exception. Even if your chosen data structures provide any
guarantees related to thread-safe access, you must still hold the lock
when accessing them to gain practice protecting shared data.
**Requirement:** Do **NOT** submit the `venv` directory (e.g., use `.gitignore`).
**Requirement:** reading and writing files is a slow operation, so
your code must NOT hold the lock when doing file I/O.
## Tester
* Autobadger for p3 not availble for use yet.
Use the **autobadger** tool on your machine to run tests against your code:
```bash
autobadger --project=p3 --verbose
```
The --verbose flag will print more information to the console as your tests are running. Pushing to main will submit your project and we will grade your code on main from a remote VM. A GitLab issue should be pushed to your repository shortly after you submit.
\ No newline at end of file
Not released yet. Details coming soon!
\ No newline at end of file
import grpc, sys
import table_pb2_grpc, table_pb2
SERVER = "localhost:5440"
BATCH_COUNT = 400
BATCH_SIZE = 250_000
def main():
if len(sys.argv) != 1:
print("Usage: python3 bigdata.py")
sys.exit(1)
channel = grpc.insecure_channel(SERVER)
stub = table_pb2_grpc.TableStub(channel)
for batch in range(BATCH_COUNT):
print(f"Batch {batch+1} of {BATCH_COUNT}: 0.25 million rows")
rows = "x,y,z\n" + "\n".join([f"1,{i},{batch*1000+i%1000}" for i in range(BATCH_SIZE)])
resp = stub.Upload(table_pb2.UploadReq(csv_data=bytes(rows, "utf-8")))
if resp.error:
print(resp.error)
sys.exit(1)
else:
print("uploaded ")
if __name__ == "__main__":
main()
import grpc, threading, os, sys
import table_pb2_grpc, table_pb2
SERVER = "localhost:5440"
class TableClient:
def __init__(self, file, num_threads=1):
self.channel = grpc.insecure_channel(SERVER)
self.stub = table_pb2_grpc.TableStub(channel=self.channel)
self.num_threads = num_threads
self.read_workload_file(file)
def Purge(self):
resp = self.stub.Purge(table_pb2.PurgeReq())
def Upload(self, path):
with open(path, "rb") as f:
data = f.read()
resp = self.stub.Upload(table_pb2.UploadReq(csv_data=data))
print(resp.error if resp.error else "success")
def ColSum(self, format, column_name):
resp = self.stub.ColSum(table_pb2.ColSumReq(column=column_name, format=format))
print(resp.error if resp.error else f"{resp.total}")
def exec_command(self, line):
command = line.strip().split()
if len(command) == 1 and command[0] == "p":
self.Purge()
elif len(command) == 2 and command[0] == "u":
self.Upload(command[1])
elif len(command) == 3:
self.ColSum("parquet" if command[1] == "p" else "csv", command[2])
else:
print(f"Skipping invalid command: {line}")
def read_workload_file(self, file):
with open(file, "r") as f:
for line in f.readlines():
self.exec_command(line)
def main():
if len(sys.argv) < 2:
print("Expecting command: python3 client.py INPUT [NUM_THREADS]; Missing INPUT")
sys.exit(1)
workload_spec = sys.argv[1]
num_threads = 1
if len(sys.argv) > 2:
num_threads = int(sys.argv[2])
client = TableClient(workload_spec, num_threads)
if __name__ == "__main__":
main()
import grpc, sys, time
import table_pb2_grpc, table_pb2
SERVER = "localhost:5440"
def main():
if len(sys.argv) != 2:
print("Usage: python3 csvsum.py <COLUMN>")
sys.exit(1)
column = sys.argv[1]
channel = grpc.insecure_channel(SERVER)
stub = table_pb2_grpc.TableStub(channel)
start = time.time()
resp = stub.ColSum(table_pb2.ColSumReq(column=column, format="csv"))
end = time.time()
print(f"{round((end-start)*1000, 1)} ms")
if resp.error:
print(resp.error)
else:
print(resp.total)
if __name__ == "__main__":
main()
name: ${PROJECT}
services:
server:
image: ${PROJECT}-server
ports:
- "5440:5440"
environment:
- PROJECT=${PROJECT}
client:
image: ${PROJECT}-client
environment:
- PROJECT=${PROJECT}
\ No newline at end of file
x,y
5,10
0,20
10,15
import grpc, sys, time
import table_pb2_grpc, table_pb2
SERVER = "localhost:5440"
def main():
if len(sys.argv) != 2:
print("Usage: python3 parquetsum.py <COLUMN>")
sys.exit(1)
column = sys.argv[1]
channel = grpc.insecure_channel(SERVER)
stub = table_pb2_grpc.TableStub(channel)
start = time.time()
resp = stub.ColSum(table_pb2.ColSumReq(column=column, format="parquet"))
end = time.time()
print(f"{round((end-start)*1000, 1)} ms")
if resp.error:
print(resp.error)
else:
print(resp.total)
if __name__ == "__main__":
main()
p3/plot.png

90.2 KiB

p
\ No newline at end of file
File moved
import grpc, sys
import table_pb2_grpc, table_pb2
SERVER = "localhost:5440"
def main():
if len(sys.argv) != 2:
print("Usage: python3 upload.py <CSV_PATH>")
sys.exit(1)
path = sys.argv[1]
channel = grpc.insecure_channel(SERVER)
stub = table_pb2_grpc.TableStub(channel)
with open(path, "rb") as f:
data = f.read() # binary "bytes" data
resp = stub.Upload(table_pb2.UploadReq(csv_data=data))
if resp.error:
print(resp.error)
else:
print("success")
if __name__ == "__main__":
main()
u file1.csv
u file2.csv
u file1.csv
s p x
s p y
s c z
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment