FROM ubuntu:22.04
RUN apt-get update; apt-get install -y wget curl openjdk-17-jdk python3-pip net-tools lsof vim unzip
# Python stuff
RUN pip3 install numpy==2.1.3 pyspark==3.4.1 cassandra-driver==3.28.0 grpcio==1.58.0 grpcio-tools==1.58.0
# Install packages in requirements.txt, you can add more packages you need to the requirements.txt file
COPY requirements.txt /requirements.txt
RUN pip3 install -r /requirements.txt
RUN wget && tar -xf spark-3.4.1-bin-hadoop3.tgz && rm spark-3.4.1-bin-hadoop3.tgz
RUN wget; tar -xf apache-cassandra-5.0.0-bin.tar.gz; rm apache-cassandra-5.0.0-bin.tar.gz
ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
ENV PATH="${PATH}:/apache-cassandra-5.0.0/bin:/spark-3.4.1-bin-hadoop3.2/bin"
CMD ["sh", "/"]
# DRAFT! Don't start yet.
# P6 (4% of grade): Cassandra, Weather Data
## Overview
NOAA (National Oceanic and Atmospheric Administration) collects
weather data from all over the world. In this project, you'll build
a gRPC server that receives data (from some .py client, but imagine
weather stations) and inserts it into a Cassandra table. Your server
will also let clients ask simple questions about the data.
We'll also explore read/write availability tradeoffs. When always
want sensors to be able to upload data, but it is OK if we cannot
always read the latest stats (we prefer an error over inconsistent
Learning objectives:
* create Cassandra schemas involving partition keys and cluster keys
* use Spark to preprocess data for insertion into Cassandra
* configure queries to achieve a tradeoff between read and write availability
* use perpare statement to cache that representation
Before starting, please revisit the [general project directions](../
* You may not modify the `Dockerfile.cassandra, docker-compose.yml,, src/Client*.py, and src/station.proto`. The autobadger will give 0 point if you modify those files.
## Cluster Setup
Note that the compose file assumes there is a "PROJECT" environment variable. You can set it to p6 in your environment:
* `export PROJECT=p6`
We provide the Dockerfile and docker-compose.yml for this project. In Dockerfile.cassandra, we install several necessary python packages.You can also add more packages you need in requirements.txt. You can run the following:
* `docker build -f Dockerfile.cassandra -t p6-base .`
* `docker compose up -d`
It will start three containers ('p6-db-1', 'p6-db-2', 'p6-db-3'). It generally takes around 1 to 2 minutes for the Cassandra cluster to be ready. **Note that** you may not modify the Dockerfile.cassandra.
Run the following command:
docker exec p6-db-1 nodetool status
and if the cluster is ready, it will produce an output like this:
Datacenter: datacenter1
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 70.28 KiB 16 64.1% 90d9e6d3-6632-4721-a78b-75d65c673db1 rack1
UN 70.26 KiB 16 65.9% 635d1361-5675-4399-89fa-f5624df4a960 rack1
UN 70.28 KiB 16 70.0% 8936a80e-c6b2-42ef-b54d-4160ff08857d rack1
If the cluster is not ready it will generally show an error. If this
occurs then wait a little bit and rerun the command and keep doing so
until you see that the cluster is ready.
## Part 1: Server Initialization
### Communication
We provide the client programs and [src/station.proto](src/station.proto).
* Inside the [src/](src/) directory, four client programs (,,, will communicate with with your, via gRPC. We'll explain each RPC in details later.
* station.proto contains `rpc` and `message` entries to generate a gRPC stub (used by our clients).
Firstly, run the grpc_tools.protoc tool in p6-db-1 to generate stub code for our clients and servicer code for your server.
docker exec -w /src p6-db-1 sh -c "python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. station.proto"
**Your job is to finish writing the `StationService` class in the provided (`StationService` overrides methods in the `StationServicer` class generated by the protoc tool).**
In your server, you need to complete the `__init__` function and override four
RPC methods for the StationService class.
**Note:** Don't delete `print("Server started")` in `StationService.__init__` or add any code after it in the constructor. The autograder will watch for that print to know your server is ready.
If communication is working correctly so far, you should be able to start a server and used a client to get back a "TODO" error message via gRPC:
# in a terminal:
docker exec -it -w /src p6-db-1 python3
# in a second terminal:
docker exec -w /src p6-db-1 python3
Note: In both container, you need to set the enviroment variable PROJECT to p6.
### Cassandra Schema
Inside the `__init__` method, you firstly need to connect to the Cassandra cluster, with `Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])`. You can review how to connect here:
* **TODO - add the link**
Write additional code using the session to run CQL queries to do the following:
* drop a `weather` keyspace if it already exists
* create a `weather` keyspace with 3x replication
* inside `weather`, create a `station_record` type containing two ints: `tmin` and `tmax`
* inside `weather`, create a `stations` table
The `stations` table should have four columns: `id` (text), `name` (text), `date` (date), `record` (weather.station_record):
* `id` is a partition key and corresponds to a station's ID (like 'USC00470273')
* `date` is a cluster key, ascending
* `name` is a static field (because there is only one name per ID). Example: 'UW ARBORETUM - MADISON'
* `record` is a regular field because there will be many records per station partition.
##### RPC - `StationSchema`:
Now you'll implement StationSchema function in StationService class. It should execute `describe table weather.stations` cassandra query and extract the `create_statement` from result.
Then, after restarting ('server started' printed in the terminal), you should be able to use to make a client call:
docker exec -w /src p6-db-1 python3
If your implementation is correct, the above command will print out something like this:
CREATE TABLE weather.stations (
id text,
date date,
name text static,
record station_record,
PRIMARY KEY (id, date)
AND additional_write_policy = '99p'
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND cdc = false
### Spark Session
Your constructor should also create a Spark session like this:
self.spark = SparkSession.builder.appName("p6").getOrCreate()
This is a local Spark deployment, so you won't have separate Spark
workers and a boss. Instead, tasks run in the Spark driver for the
session, so in the same container as your This means local
file paths will work, and you won't use HDFS for anything in this
You'll use your Spark session in the next part to process a text file
describing stations and insert that into your Cassandra table.
## Part 2: Station Data (Per Partition)
Your constructor should load data with Spark from [src/ghcnd-stations.txt](), then insert to Cassandra. Note that we did some similar parsing of this same file during lecture:
In your implementation, do the following:
* Use Spark and `SUBSTRING` to extract `ID`, `STATE`, and `NAME` from `src/ghcnd-stations.txt`. Reference the documentation to determine the offsets (this contains format descriptions for several different files, so be sure you're reading about the correct one):
* Filter your results to the state of Wisconsin, and collect the rows so you can loop over them
* Do an `INSERT` into your `weather.stations` table for each station ID and name.
##### RPC - `StationName`:
You'll implement the `StationName` function in `StationService`
class. The function should execute a Cassandra query and parse the
result to obtain the name of a station for a specific station id. The
station id is stored in `request.station` (refer to station.proto).
Then, you can use to make a client call:
docker exec -w /src p6-db-1 python3 US1WIMR0003
If you did load station data and execute `StationName` correctly, it should print out "AMBERG 1.3 SW".
## Part 3: Weather Data (Per Row)
#### RPC - `RecordTemps`:
Now you'll implement `RecordTemps` function in `StationService`
class. It receives temperature data and writes it to
The pulls its data from src/weather.parquet. You can run it as follows:
docker exec -w /src p6-db-1 python3
If you did RecordTemps correctly, it should print out a list of: `Inserted {station_id} on {date} with tmin={tmin} and tmax={tmax}`.
#### RPC - `StationMax`:
Similarly, You'll also implement `StationMax` RPC in, which will return the maximum `tmax` ever seen for the given station.
Then, you can use to make a client call:
docker exec -w /src p6-db-1 python3 USR0000WDDG
If you did RecordTemps and StationMax correctly, it will print out 344.
## Part 4: Fault Tolerance
### Error Handling
Go back to `RecordTempsReply` and `StationMaxReply` and add some error
handling code. If there is a `cassandra.Unavailable` or
`cassandra.cluster.NoHostAvailable` exception, the `error` field in
the gRPC response should be "unavailable".
If there are other exceptions besides these, return an error message
of your choosing. If the `error` field is set, it doesn't matter what
number your `StationMax` returns for `tmax`.
### Consistency Levels
We want the server to be highly available for receiving writes. Thus,
we'll set W=1. Choose an R so that R + W > RF.
The thought is that real sensors might not have much space to save old
data that hasn't been uploaded, so we want to accept writes whenever
possible. We also want to avoid a situation where a `StationMax`
returns a smaller temperature than one previously added with
`RecordTemps`; it would be better to return an error message if
`RecordTempsReply` is a method that writes to the database (so it
should be highly available) whereas `StationMaxReply` is a method that
reads (so it is OK if it is unavailable if some replicas are down).
To implement your W and R settings in these methods, you need to modify your code to use prepared statements instead of doing
queries directly. If you have a prepared statement `prepared`, you can
set the consistency level like this:
from cassandra.query import ConsistencyLevel
prepared.consistency_level = ConsistencyLevel.????
You can fill in the missing code with `ConsistencyLevel.ONE`,
`ConsistencyLevel.TWO`, or `ConsistencyLevel.THREE`.
Specifically, you should define prepare statements for your SQL query in RecordTempsReply and StationMaxReply. The prepare statements should be defined in the \_\_init\_\_ function of StationService class, so that the prepared statements can be prepared once and used many times.
### Disaster Strikes
**Important:** run a `docker` command to kill the `p6-db-2` container.
Run the following command
docker exec p6-db-1 nodetool status
Verify that you see that one of the nodes is down before proceeding to the next steps.
Call again. Inserts should happen with `ConsistencyLevel.ONE`, so this ought to work, meaning the empty string is the expected result for `error`.
Call again. Reads should raises a cassandra.Unavailable except e, then the error should have a string like this:
## Submission
Read the directions [here](../ about how to create the
Note that we will copy in the the provided files (every file except `` and `requirements.txt`), overwriting anything you might have changed.
Make sure you upload every file you need to run the following commands:
docker build -f Dockerfile.cassandra -t p6-base
docker compose up -d
docker exec -w /src p6-db-1 sh -c "python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. station.proto "
# in a terminal:
docker exec -it -w /src p6-db-1 python3
# in a second terminal:
docker exec -w /src p6-db-1 python3
## Testing:
Please be sure that your installed autobadger is on version **TODO**. You can print the version using
autobadger --info
See []( for more information.
This constitutes 90% of the total score.
The remaining 10% will be graded manually.
We will be checking your read and write consistency levels.
echo "-Xms128M" >> /apache-cassandra-5.0.0/conf/jvm-server.options
echo "-Xmx128M" >> /apache-cassandra-5.0.0/conf/jvm-server.options
# get the environment variable
sed -i "s/^listen_address:.*/listen_address: "`hostname`"/" /apache-cassandra-5.0.0/conf/cassandra.yaml
sed -i "s/^rpc_address:.*/rpc_address: "`hostname`"/" /apache-cassandra-5.0.0/conf/cassandra.yaml
sed -i "s/- seeds:.*/- seeds: ${PROJECT}-db-1,${PROJECT}-db-2,${PROJECT}-db-3/" /apache-cassandra-5.0.0/conf/cassandra.yaml
/apache-cassandra-5.0.0/bin/cassandra -R
sleep infinity
name: ${PROJECT}
image: ${PROJECT}-base
hostname: db
- "./src:/src"
replicas: 3
memory: 2.5G
import grpc
import station_pb2
import station_pb2_grpc
import zipfile
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, to_date
# As both the client and server are running on container p6-db-1, we can use localhost as the server address
SERVER = "localhost:5440"
def run():
spark = SparkSession.builder.appName("clientLoader").getOrCreate()
data ="weather.parquet").collect()
# Connect to the gRPC server
with grpc.insecure_channel(SERVER) as channel:
stub = station_pb2_grpc.StationStub(channel)
for row in data:
station_id = row["station"]
date = row["date"]
tmin = row["tmin_value"] if row["tmin_value"] is not None else 0
tmax = row["tmax_value"] if row["tmax_value"] is not None else 0
# Make the gRPC call
response = stub.RecordTemps(station_pb2.RecordTempsRequest(
# Print the response for each record
if response.error:
print(f"Error inserting {station_id} on {date}: {response.error}")
print(f"Inserted {station_id} on {date} with tmin={tmin} and tmax={tmax}")
if __name__ == '__main__':
import grpc
import station_pb2
import station_pb2_grpc
import sys
# As both the client and server are running on container p6-db-1, we can use localhost as the server address
SERVER = "localhost:5440"
def run():
if len(sys.argv) != 2:
print("Usage: python3 <StationID>")
stationID = sys.argv[1]
# Connect to the gRPC server
with grpc.insecure_channel(SERVER) as channel:
stub = station_pb2_grpc.StationStub(channel)
# Send the request and get the number of stations
response = stub.StationMax(station_pb2.StationInspectRequest(station = stationID))
if response.error != "":
if __name__ == '__main__':
\ No newline at end of file
import grpc
import station_pb2
import station_pb2_grpc
import sys
# As both the client and server are running on container p6-db-1, we can use localhost as the server address
SERVER = "localhost:5440"
def run():
if len(sys.argv) != 2:
print("Usage: python3 <StationID>")
stationID = sys.argv[1]
# Connect to the gRPC server
with grpc.insecure_channel(SERVER) as channel:
stub = station_pb2_grpc.StationStub(channel)
# Send the request and get the number of stations
response = stub.StationName(station_pb2.StationInspectRequest(station = stationID))
if response.error != "":
if __name__ == '__main__':
\ No newline at end of file
import grpc
import station_pb2
import station_pb2_grpc
# As both the client and server are running on container p6-db-1, we can use localhost as the server address
SERVER = "localhost:5440"
def run():
# Connect to the gRPC server
with grpc.insecure_channel(SERVER) as channel:
stub = station_pb2_grpc.StationStub(channel)
# Send the request and get the table schema
response = stub.StationSchema(station_pb2.EmptyRequest())
if response.error:
# Print the Table(stations)'s schema
if __name__ == '__main__':
\ No newline at end of file
service Station {
rpc StationSchema(EmptyRequest) returns (StationSchemaReply) {}
rpc StationName(StationInspectRequest) returns (StationNameReply) {}
rpc RecordTemps(RecordTempsRequest) returns (RecordTempsReply) {}
rpc StationMax(StationInspectRequest) returns (StationMaxReply) {}
message EmptyRequest {}
message StationInspectRequest {
string station = 1; // Station ID
message RecordTempsRequest {
string station = 1; // Station ID
string date = 2;
int32 tmin = 3;
int32 tmax = 4;
message RecordTempsReply {
string error = 1;
message StationSchemaReply {
string schema = 1;
string error = 2;
message StationNameReply {
string name = 1;
string error = 2;
message StationMaxReply {
int32 tmax = 1;
string error = 2;
