From 621405c2d955b5a8721372b65875de11f5d9b0ae Mon Sep 17 00:00:00 2001 From: Jing Lan <jlan25@cs544-jlan25.cs.wisc.edu> Date: Fri, 21 Feb 2025 00:18:53 -0600 Subject: [PATCH] p3 starter code --- p3/client.py | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 p3/client.py diff --git a/p3/client.py b/p3/client.py new file mode 100644 index 0000000..90c42b3 --- /dev/null +++ b/p3/client.py @@ -0,0 +1,57 @@ +import grpc, threading, os, sys +import table_pb2_grpc, table_pb2 + +SERVER = "localhost:5440" + +class TableClient: + def __init__(self, file, num_threads=1): + self.channel = grpc.insecure_channel(SERVER) + self.stub = table_pb2_grpc.TableStub(channel=self.channel) + self.num_threads = num_threads + self.read_workload_file(file) + + def Purge(self): + resp = self.stub.Purge(table_pb2.PurgeReq()) + + def Upload(self, path): + with open(path, "rb") as f: + data = f.read() + resp = self.stub.Upload(table_pb2.UploadReq(csv_data=data)) + print(resp.error if resp.error else "success") + + def ColSum(self, format, column_name): + resp = self.stub.ColSum(table_pb2.ColSumReq(column=column_name, format=format)) + print(resp.error if resp.error else f"{resp.total}") + + def exec_command(self, line): + command = line.strip().split(" ") + if len(command) == 1: + self.Purge() + elif len(command) == 2: + self.Upload(command[1]) + elif len(command) == 3: + self.ColSum("parquet" if command[1] == "p" else "csv", command[2]) + else: + print(f"Skipping invalid command: {line}") + + def read_workload_file(self, file): + with open(file, "r") as f: + for line in f.readlines(): + self.exec_command(line) + + +def main(): + if len(sys.argv) < 2: + print("Expecting command: python3 client.py INPUT [NUM_THREADS]; Missing INPUT") + sys.exit(1) + workload_spec = sys.argv[1] + + num_threads = 1 + if len(sys.argv) > 2: + num_threads = int(sys.argv[2]) + + client = TableClient(workload_spec, num_threads) + + +if __name__ == "__main__": + main() -- GitLab