From a5f03c234af6ba6cd46bd2c7cf35d172c9d2a595 Mon Sep 17 00:00:00 2001 From: tylerharter <tylerharter@gmail.com> Date: Thu, 20 Mar 2025 06:15:16 -0500 Subject: [PATCH] P6 draft --- p6/Dockerfile.cassandra | 21 +++ p6/README.md | 324 ++++++++++++++++++++++++++++++++++ p6/cassandra.sh | 12 ++ p6/docker-compose.yml | 15 ++ p6/requirements.txt | 0 p6/src/ClientRecordTemps.py | 40 +++++ p6/src/ClientStationMax.py | 26 +++ p6/src/ClientStationName.py | 26 +++ p6/src/ClientStationSchema.py | 21 +++ p6/src/station.proto | 40 +++++ 10 files changed, 525 insertions(+) create mode 100644 p6/Dockerfile.cassandra create mode 100644 p6/README.md create mode 100644 p6/cassandra.sh create mode 100644 p6/docker-compose.yml create mode 100644 p6/requirements.txt create mode 100644 p6/src/ClientRecordTemps.py create mode 100644 p6/src/ClientStationMax.py create mode 100644 p6/src/ClientStationName.py create mode 100644 p6/src/ClientStationSchema.py create mode 100644 p6/src/station.proto diff --git a/p6/Dockerfile.cassandra b/p6/Dockerfile.cassandra new file mode 100644 index 0000000..74cedb1 --- /dev/null +++ b/p6/Dockerfile.cassandra @@ -0,0 +1,21 @@ +FROM ubuntu:22.04 +RUN apt-get update; apt-get install -y wget curl openjdk-17-jdk python3-pip net-tools lsof vim unzip + +# Python stuff +RUN pip3 install numpy==2.1.3 pyspark==3.4.1 cassandra-driver==3.28.0 grpcio==1.58.0 grpcio-tools==1.58.0 + +# Install packages in requirements.txt, you can add more packages you need to the requirements.txt file +COPY requirements.txt /requirements.txt +RUN pip3 install -r /requirements.txt + +# SPARK +RUN wget https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz && tar -xf spark-3.4.1-bin-hadoop3.tgz && rm spark-3.4.1-bin-hadoop3.tgz + +# CASSANDRA +RUN wget https://archive.apache.org/dist/cassandra/5.0.0/apache-cassandra-5.0.0-bin.tar.gz; tar -xf apache-cassandra-5.0.0-bin.tar.gz; rm apache-cassandra-5.0.0-bin.tar.gz + +ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 +ENV PATH="${PATH}:/apache-cassandra-5.0.0/bin:/spark-3.4.1-bin-hadoop3.2/bin" + +COPY cassandra.sh /cassandra.sh +CMD ["sh", "/cassandra.sh"] diff --git a/p6/README.md b/p6/README.md new file mode 100644 index 0000000..2e4a228 --- /dev/null +++ b/p6/README.md @@ -0,0 +1,324 @@ +# DRAFT! Don't start yet. + +# P6 (4% of grade): Cassandra, Weather Data + +## Overview + +NOAA (National Oceanic and Atmospheric Administration) collects +weather data from all over the world. In this project, you'll build +a gRPC server that receives data (from some .py client, but imagine +weather stations) and inserts it into a Cassandra table. Your server +will also let clients ask simple questions about the data. + +We'll also explore read/write availability tradeoffs. When always +want sensors to be able to upload data, but it is OK if we cannot +always read the latest stats (we prefer an error over inconsistent +results). + +Learning objectives: + +* create Cassandra schemas involving partition keys and cluster keys +* use Spark to preprocess data for insertion into Cassandra +* configure queries to achieve a tradeoff between read and write availability +* use perpare statement to cache that representation + +Before starting, please revisit the [general project directions](../projects.md). + +IMPORTANT: +* You may not modify the `Dockerfile.cassandra, docker-compose.yml, cassandra.sh, src/Client*.py, and src/station.proto`. The autobadger will give 0 point if you modify those files. + +## Cluster Setup + + +Note that the compose file assumes there is a "PROJECT" environment variable. You can set it to p6 in your environment: +* `export PROJECT=p6` + +We provide the Dockerfile and docker-compose.yml for this project. In Dockerfile.cassandra, we install several necessary python packages.You can also add more packages you need in requirements.txt. You can run the following: +* `docker build -f Dockerfile.cassandra -t p6-base .` +* `docker compose up -d` + +It will start three containers ('p6-db-1', 'p6-db-2', 'p6-db-3'). It generally takes around 1 to 2 minutes for the Cassandra cluster to be ready. **Note that** you may not modify the Dockerfile.cassandra. + + +Run the following command: + +``` +docker exec p6-db-1 nodetool status +``` + +and if the cluster is ready, it will produce an output like this: + +```sh +Datacenter: datacenter1 +======================= +Status=Up/Down +|/ State=Normal/Leaving/Joining/Moving +-- Address Load Tokens Owns (effective) Host ID Rack +UN 172.27.0.4 70.28 KiB 16 64.1% 90d9e6d3-6632-4721-a78b-75d65c673db1 rack1 +UN 172.27.0.3 70.26 KiB 16 65.9% 635d1361-5675-4399-89fa-f5624df4a960 rack1 +UN 172.27.0.2 70.28 KiB 16 70.0% 8936a80e-c6b2-42ef-b54d-4160ff08857d rack1 +``` + +If the cluster is not ready it will generally show an error. If this +occurs then wait a little bit and rerun the command and keep doing so +until you see that the cluster is ready. + +## Part 1: Server Initialization + +### Communication + +We provide the client programs and [src/station.proto](src/station.proto). + +* Inside the [src/](src/) directory, four client programs (ClientStationSchema.py, ClientStationName.py, ClientRecordTemps.py, ClientStationMax.py) will communicate with with your server.py, via gRPC. We'll explain each RPC in details later. +* station.proto contains `rpc` and `message` entries to generate a gRPC stub (used by our clients). + +Firstly, run the grpc_tools.protoc tool in p6-db-1 to generate stub code for our clients and servicer code for your server. + +```sh +docker exec -w /src p6-db-1 sh -c "python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. station.proto" +``` + +**Your job is to finish writing the `StationService` class in the provided server.py (`StationService` overrides methods in the `StationServicer` class generated by the protoc tool).** + +In your server, you need to complete the `__init__` function and override four +RPC methods for the StationService class. + +**Note:** Don't delete `print("Server started")` in `StationService.__init__` or add any code after it in the constructor. The autograder will watch for that print to know your server is ready. + +If communication is working correctly so far, you should be able to start a server and used a client to get back a "TODO" error message via gRPC: + +``` +# in a terminal: +docker exec -it -w /src p6-db-1 python3 server.py + +# in a second terminal: +docker exec -w /src p6-db-1 python3 ClientStationSchema.py + +``` + +Note: In both container, you need to set the enviroment variable PROJECT to p6. + + + +### Cassandra Schema + +Inside the `__init__` method, you firstly need to connect to the Cassandra cluster, with `Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])`. You can review how to connect here: + +* **TODO - add the link** + +Write additional code using the session to run CQL queries to do the following: + +* drop a `weather` keyspace if it already exists +* create a `weather` keyspace with 3x replication +* inside `weather`, create a `station_record` type containing two ints: `tmin` and `tmax` +* inside `weather`, create a `stations` table + +The `stations` table should have four columns: `id` (text), `name` (text), `date` (date), `record` (weather.station_record): + +* `id` is a partition key and corresponds to a station's ID (like 'USC00470273') +* `date` is a cluster key, ascending +* `name` is a static field (because there is only one name per ID). Example: 'UW ARBORETUM - MADISON' +* `record` is a regular field because there will be many records per station partition. + +##### RPC - `StationSchema`: + +Now you'll implement StationSchema function in StationService class. It should execute `describe table weather.stations` cassandra query and extract the `create_statement` from result. + +Then, after restarting server.py ('server started' printed in the terminal), you should be able to use ClientStationSchema.py to make a client call: + +``` +docker exec -w /src p6-db-1 python3 ClientStationSchema.py +``` + +If your implementation is correct, the above command will print out something like this: + +``` +CREATE TABLE weather.stations ( + id text, + date date, + name text static, + record station_record, + PRIMARY KEY (id, date) +) WITH CLUSTERING ORDER BY (date ASC) + AND additional_write_policy = '99p' + AND bloom_filter_fp_chance = 0.01 + AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'} + AND cdc = false + ... +``` + +### Spark Session + +Your constructor should also create a Spark session like this: + +```python +self.spark = SparkSession.builder.appName("p6").getOrCreate() +``` + +This is a local Spark deployment, so you won't have separate Spark +workers and a boss. Instead, tasks run in the Spark driver for the +session, so in the same container as your server.py. This means local +file paths will work, and you won't use HDFS for anything in this +project. + +You'll use your Spark session in the next part to process a text file +describing stations and insert that into your Cassandra table. + +## Part 2: Station Data (Per Partition) + +Your constructor should load data with Spark from [src/ghcnd-stations.txt](), then insert to Cassandra. Note that we did some similar parsing of this same file during lecture: + +https://git.doit.wisc.edu/cdis/cs/courses/cs544/f24/main/-/blob/main/lec/20-spark/nb/lec1b.ipynb?ref_type=heads + +In your implementation, do the following: + +* Use Spark and `SUBSTRING` to extract `ID`, `STATE`, and `NAME` from `src/ghcnd-stations.txt`. Reference the documentation to determine the offsets (this contains format descriptions for several different files, so be sure you're reading about the correct one): + + https://www.ncei.noaa.gov/pub/data/ghcn/daily/readme.txt + +* Filter your results to the state of Wisconsin, and collect the rows so you can loop over them +* Do an `INSERT` into your `weather.stations` table for each station ID and name. + +##### RPC - `StationName`: + +You'll implement the `StationName` function in `StationService` +class. The function should execute a Cassandra query and parse the +result to obtain the name of a station for a specific station id. The +station id is stored in `request.station` (refer to station.proto). + +Then, you can use ClientStationName.py to make a client call: + +``` +docker exec -w /src p6-db-1 python3 ClientStationName.py US1WIMR0003 +``` + +If you did load station data and execute `StationName` correctly, it should print out "AMBERG 1.3 SW". + +## Part 3: Weather Data (Per Row) + +#### RPC - `RecordTemps`: + +Now you'll implement `RecordTemps` function in `StationService` +class. It receives temperature data and writes it to +`weather.stations`. + +The ClientRecordTemps.py pulls its data from src/weather.parquet. You can run it as follows: + +``` +docker exec -w /src p6-db-1 python3 ClientRecordTemps.py +``` + +If you did RecordTemps correctly, it should print out a list of: `Inserted {station_id} on {date} with tmin={tmin} and tmax={tmax}`. + +#### RPC - `StationMax`: + +Similarly, You'll also implement `StationMax` RPC in server.py, which will return the maximum `tmax` ever seen for the given station. + +Then, you can use ClientStationMax.py to make a client call: + +``` +docker exec -w /src p6-db-1 python3 ClientStationMax.py USR0000WDDG +``` + +If you did RecordTemps and StationMax correctly, it will print out 344. + +## Part 4: Fault Tolerance + +### Error Handling + +Go back to `RecordTempsReply` and `StationMaxReply` and add some error +handling code. If there is a `cassandra.Unavailable` or +`cassandra.cluster.NoHostAvailable` exception, the `error` field in +the gRPC response should be "unavailable". + +If there are other exceptions besides these, return an error message +of your choosing. If the `error` field is set, it doesn't matter what +number your `StationMax` returns for `tmax`. + +### Consistency Levels + +We want the server to be highly available for receiving writes. Thus, +we'll set W=1. Choose an R so that R + W > RF. + +The thought is that real sensors might not have much space to save old +data that hasn't been uploaded, so we want to accept writes whenever +possible. We also want to avoid a situation where a `StationMax` +returns a smaller temperature than one previously added with +`RecordTemps`; it would be better to return an error message if +necessary. + +`RecordTempsReply` is a method that writes to the database (so it +should be highly available) whereas `StationMaxReply` is a method that +reads (so it is OK if it is unavailable if some replicas are down). + +To implement your W and R settings in these methods, you need to modify your code to use prepared statements instead of doing +queries directly. If you have a prepared statement `prepared`, you can +set the consistency level like this: + +```python +from cassandra.query import ConsistencyLevel +... +prepared.consistency_level = ConsistencyLevel.???? +``` + +You can fill in the missing code with `ConsistencyLevel.ONE`, +`ConsistencyLevel.TWO`, or `ConsistencyLevel.THREE`. + +Specifically, you should define prepare statements for your SQL query in RecordTempsReply and StationMaxReply. The prepare statements should be defined in the \_\_init\_\_ function of StationService class, so that the prepared statements can be prepared once and used many times. + +### Disaster Strikes + +**Important:** run a `docker` command to kill the `p6-db-2` container. + +Run the following command + +``` +docker exec p6-db-1 nodetool status +``` + +Verify that you see that one of the nodes is down before proceeding to the next steps. + +Call ClientRecordTemps.py again. Inserts should happen with `ConsistencyLevel.ONE`, so this ought to work, meaning the empty string is the expected result for `error`. + +Call ClientStationMax.py again. Reads should raises a cassandra.Unavailable except e, then the error should have a string like this: + +``` +unavailable +``` + +## Submission + +Read the directions [here](../projects.md) about how to create the +repo. +Note that we will copy in the the provided files (every file except `server.py` and `requirements.txt`), overwriting anything you might have changed. + +Make sure you upload every file you need to run the following commands: + + +``` +docker build -f Dockerfile.cassandra -t p6-base +docker compose up -d + +docker exec -w /src p6-db-1 sh -c "python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. station.proto " + +# in a terminal: +docker exec -it -w /src p6-db-1 python3 server.py + +# in a second terminal: +docker exec -w /src p6-db-1 python3 ClientStationSchema.py +``` + + +## Testing: + +Please be sure that your installed autobadger is on version **TODO**. You can print the version using +``` +autobadger --info +``` + +See [projects.md](https://git.doit.wisc.edu/cdis/cs/courses/cs544/s25/main/-/blob/main/projects.md#testing) for more information. + + +This constitutes 90% of the total score. +The remaining 10% will be graded manually. +We will be checking your read and write consistency levels. diff --git a/p6/cassandra.sh b/p6/cassandra.sh new file mode 100644 index 0000000..1f2e565 --- /dev/null +++ b/p6/cassandra.sh @@ -0,0 +1,12 @@ +echo "-Xms128M" >> /apache-cassandra-5.0.0/conf/jvm-server.options +echo "-Xmx128M" >> /apache-cassandra-5.0.0/conf/jvm-server.options + +# get the environment variable +PROJECT=${PROJECT} + +sed -i "s/^listen_address:.*/listen_address: "`hostname`"/" /apache-cassandra-5.0.0/conf/cassandra.yaml +sed -i "s/^rpc_address:.*/rpc_address: "`hostname`"/" /apache-cassandra-5.0.0/conf/cassandra.yaml +sed -i "s/- seeds:.*/- seeds: ${PROJECT}-db-1,${PROJECT}-db-2,${PROJECT}-db-3/" /apache-cassandra-5.0.0/conf/cassandra.yaml + +/apache-cassandra-5.0.0/bin/cassandra -R +sleep infinity diff --git a/p6/docker-compose.yml b/p6/docker-compose.yml new file mode 100644 index 0000000..d9ca451 --- /dev/null +++ b/p6/docker-compose.yml @@ -0,0 +1,15 @@ +name: ${PROJECT} + +services: + db: + image: ${PROJECT}-base + hostname: db + volumes: + - "./src:/src" + deploy: + replicas: 3 + resources: + limits: + memory: 2.5G + environment: + - PROJECT=${PROJECT} diff --git a/p6/requirements.txt b/p6/requirements.txt new file mode 100644 index 0000000..e69de29 diff --git a/p6/src/ClientRecordTemps.py b/p6/src/ClientRecordTemps.py new file mode 100644 index 0000000..7a3b5c8 --- /dev/null +++ b/p6/src/ClientRecordTemps.py @@ -0,0 +1,40 @@ +import grpc +import station_pb2 +import station_pb2_grpc +import zipfile +import os +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, date_format, to_date + +# As both the client and server are running on container p6-db-1, we can use localhost as the server address +SERVER = "localhost:5440" + +def run(): + spark = SparkSession.builder.appName("clientLoader").getOrCreate() + data = spark.read.parquet("weather.parquet").collect() + + # Connect to the gRPC server + with grpc.insecure_channel(SERVER) as channel: + stub = station_pb2_grpc.StationStub(channel) + for row in data: + station_id = row["station"] + date = row["date"] + tmin = row["tmin_value"] if row["tmin_value"] is not None else 0 + tmax = row["tmax_value"] if row["tmax_value"] is not None else 0 + + # Make the gRPC call + response = stub.RecordTemps(station_pb2.RecordTempsRequest( + station=station_id, + date=date, + tmin=int(tmin), + tmax=int(tmax) + )) + + # Print the response for each record + if response.error: + print(f"Error inserting {station_id} on {date}: {response.error}") + else: + print(f"Inserted {station_id} on {date} with tmin={tmin} and tmax={tmax}") + +if __name__ == '__main__': + run() diff --git a/p6/src/ClientStationMax.py b/p6/src/ClientStationMax.py new file mode 100644 index 0000000..fada91b --- /dev/null +++ b/p6/src/ClientStationMax.py @@ -0,0 +1,26 @@ +import grpc +import station_pb2 +import station_pb2_grpc +import sys + +# As both the client and server are running on container p6-db-1, we can use localhost as the server address +SERVER = "localhost:5440" + +def run(): + if len(sys.argv) != 2: + print("Usage: python3 ClientStationMax.py <StationID>") + sys.exit(1) + stationID = sys.argv[1] + + # Connect to the gRPC server + with grpc.insecure_channel(SERVER) as channel: + stub = station_pb2_grpc.StationStub(channel) + # Send the request and get the number of stations + response = stub.StationMax(station_pb2.StationInspectRequest(station = stationID)) + if response.error != "": + print(response.error) + else: + print(response.tmax) + +if __name__ == '__main__': + run() \ No newline at end of file diff --git a/p6/src/ClientStationName.py b/p6/src/ClientStationName.py new file mode 100644 index 0000000..fdfb394 --- /dev/null +++ b/p6/src/ClientStationName.py @@ -0,0 +1,26 @@ +import grpc +import station_pb2 +import station_pb2_grpc +import sys + +# As both the client and server are running on container p6-db-1, we can use localhost as the server address +SERVER = "localhost:5440" + +def run(): + if len(sys.argv) != 2: + print("Usage: python3 ClientStationMax.py <StationID>") + sys.exit(1) + stationID = sys.argv[1] + + # Connect to the gRPC server + with grpc.insecure_channel(SERVER) as channel: + stub = station_pb2_grpc.StationStub(channel) + # Send the request and get the number of stations + response = stub.StationName(station_pb2.StationInspectRequest(station = stationID)) + if response.error != "": + print(response.error) + else: + print(response.name) + +if __name__ == '__main__': + run() \ No newline at end of file diff --git a/p6/src/ClientStationSchema.py b/p6/src/ClientStationSchema.py new file mode 100644 index 0000000..6195ee1 --- /dev/null +++ b/p6/src/ClientStationSchema.py @@ -0,0 +1,21 @@ +import grpc +import station_pb2 +import station_pb2_grpc + +# As both the client and server are running on container p6-db-1, we can use localhost as the server address +SERVER = "localhost:5440" + +def run(): + # Connect to the gRPC server + with grpc.insecure_channel(SERVER) as channel: + stub = station_pb2_grpc.StationStub(channel) + # Send the request and get the table schema + response = stub.StationSchema(station_pb2.EmptyRequest()) + if response.error: + print(response.error) + else: + # Print the Table(stations)'s schema + print(response.schema) + +if __name__ == '__main__': + run() \ No newline at end of file diff --git a/p6/src/station.proto b/p6/src/station.proto new file mode 100644 index 0000000..396e159 --- /dev/null +++ b/p6/src/station.proto @@ -0,0 +1,40 @@ +syntax="proto3"; + +service Station { + rpc StationSchema(EmptyRequest) returns (StationSchemaReply) {} + rpc StationName(StationInspectRequest) returns (StationNameReply) {} + rpc RecordTemps(RecordTempsRequest) returns (RecordTempsReply) {} + rpc StationMax(StationInspectRequest) returns (StationMaxReply) {} +} + +message EmptyRequest {} + +message StationInspectRequest { + string station = 1; // Station ID +} + +message RecordTempsRequest { + string station = 1; // Station ID + string date = 2; + int32 tmin = 3; + int32 tmax = 4; +} + +message RecordTempsReply { + string error = 1; +} + +message StationSchemaReply { + string schema = 1; + string error = 2; +} + +message StationNameReply { + string name = 1; + string error = 2; +} + +message StationMaxReply { + int32 tmax = 1; + string error = 2; +} -- GitLab