DRAFT! Don't start yet.
P6 (4% of grade): Cassandra, Weather Data
Overview
NOAA (National Oceanic and Atmospheric Administration) collects weather data from all over the world. In this project, you'll build a gRPC server that receives data (from some .py client, but imagine weather stations) and inserts it into a Cassandra table. Your server will also let clients ask simple questions about the data.
We'll also explore read/write availability tradeoffs. We always want sensors to be able to upload data, but it is OK if we cannot always read the latest stats (we prefer an error over inconsistent results).
Learning objectives:
- create Cassandra schemas involving partition keys and cluster keys
- use Spark to preprocess data for insertion into Cassandra
- configure queries to achieve a tradeoff between read and write availability
- use prepared statements
Before starting, please review the general project directions.
IMPORTANT:
- You may not modify the
Dockerfile, docker-compose.yml, cassandra.sh, src/Client*.py, and src/station.proto
. The autobadger will give 0 point if you modify those files.
Cluster Setup
Note that the compose file assumes there is a "PROJECT" environment variable. You can set it to p6 in your environment:
export PROJECT=p6
We provide the Dockerfile and docker-compose.yml for this project. In Dockerfile, we install several necessary python packages. You can also add more packages you need in src/requirements.txt. You can run the following:
docker build . -t p6
docker compose up -d
It will start three containers ('p6-db-1', 'p6-db-2', 'p6-db-3'). It generally takes around 1 to 2 minutes for the Cassandra cluster to be ready. Note that you may not modify the Dockerfile.
Run the following command:
docker exec p6-db-1 nodetool status
and if the cluster is ready, it will produce an output like this:
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 172.27.0.4 70.28 KiB 16 64.1% 90d9e6d3-6632-4721-a78b-75d65c673db1 rack1
UN 172.27.0.3 70.26 KiB 16 65.9% 635d1361-5675-4399-89fa-f5624df4a960 rack1
UN 172.27.0.2 70.28 KiB 16 70.0% 8936a80e-c6b2-42ef-b54d-4160ff08857d rack1
If the cluster is not ready it will generally show an error. If this occurs then wait a little bit and rerun the command and keep doing so until you see that the cluster is ready.
Part 1: Server Initialization
Communication
We provide the client programs and src/station.proto.
- Inside the src/ directory, four client programs (ClientStationSchema.py, ClientStationName.py, ClientRecordTemps.py, ClientStationMax.py) will communicate with with your server.py, via gRPC. We'll explain each RPC in details later.
- station.proto contains
rpc
andmessage
entries to generate a gRPC stub (used by our clients).
Firstly, run the grpc_tools.protoc tool in p6-db-1 to generate stub code for our clients and servicer code for your server.
docker exec -w /src p6-db-1 sh -c "python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. station.proto"
Your job is to finish writing the StationService
class in the provided server.py (StationService
overrides methods in the StationServicer
class generated by the protoc tool).
In your server, you need to complete the __init__
function and override four
RPC methods for the StationService class.
Note: Don't delete print("Server started")
in StationService.__init__
or add any code after it in the constructor. The autograder will watch for that print to know your server is ready.
If communication is working correctly so far, you should be able to start a server and used a client to get back a "TODO" error message via gRPC:
# in a terminal:
docker exec -it -w /src p6-db-1 python3 server.py
# in a second terminal:
docker exec -w /src p6-db-1 python3 ClientStationSchema.py
Note: In both container, you need to set the enviroment variable PROJECT to p6.
Cassandra Schema
Inside the __init__
method, you firstly need to connect to the Cassandra cluster, like in lecture:
https://git.doit.wisc.edu/cdis/cs/courses/cs544/s25/main/-/tree/main/lec/26-cassandra?ref_type=heads
Note: when running locally, the workers will be named p6-db-<N>
, but
the autograder will use a different prefix, ????-db-<N>
. You can
use os.environ['PROJECT']
to get the prefix programmatically and
connect to the correct Cassandra workers.
Write additional code using the session to run CQL queries to do the following:
- drop a
weather
keyspace if it already exists - create a
weather
keyspace with 3x replication - inside
weather
, create astation_record
type containing two ints:tmin
andtmax
- inside
weather
, create astations
table
The stations
table should have four columns: id
(text), name
(text), date
(date), record
(weather.station_record):
-
id
is a partition key and corresponds to a station's ID (like 'USC00470273') -
date
is a cluster key, ascending -
name
is a static field (because there is only one name per ID). Example: 'UW ARBORETUM - MADISON' -
record
is a regular field because there will be many records per station partition.
StationSchema
:
RPC - Now you'll implement StationSchema function in StationService class. It should execute describe table weather.stations
cassandra query and extract the create_statement
from result.
Then, after restarting server.py ('server started' printed in the terminal), you should be able to use ClientStationSchema.py to make a client call:
docker exec -w /src p6-db-1 python3 ClientStationSchema.py
If your implementation is correct, the above command will print out something like this:
CREATE TABLE weather.stations (
id text,
date date,
name text static,
record station_record,
PRIMARY KEY (id, date)
) WITH CLUSTERING ORDER BY (date ASC)
AND additional_write_policy = '99p'
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND cdc = false
...
Spark Session
Your constructor should also create a Spark session like this:
self.spark = SparkSession.builder.appName("p6").getOrCreate()
This is a local Spark deployment, so you won't have separate Spark workers and a boss. Instead, tasks run in the Spark driver for the session, so in the same container as your server.py. This means local file paths will work, and you won't use HDFS for anything in this project.
You'll use your Spark session in the next part to process a text file describing stations and insert that into your Cassandra table.
Part 2: Station Data (Per Partition)
Your constructor should load data with Spark from src/ghcnd-stations.txt, then insert to Cassandra. Note that we did some similar parsing of this same file during lecture:
In your implementation, do the following:
-
Use Spark and
SUBSTRING
to extractID
,STATE
, andNAME
fromsrc/ghcnd-stations.txt
. Reference the documentation to determine the offsets (this contains format descriptions for several different files, so be sure you're reading about the correct one): -
Filter your results to the state of Wisconsin, and collect the rows so you can loop over them
-
Do an
INSERT
into yourweather.stations
table for each station ID and name.
StationName
:
RPC - You'll implement the StationName
function in StationService
class. The function should execute a Cassandra query and parse the
result to obtain the name of a station for a specific station id. The
station id is stored in request.station
(refer to station.proto).
Then, you can use ClientStationName.py to make a client call:
docker exec -w /src p6-db-1 python3 ClientStationName.py US1WIMR0003
If you did load station data and execute StationName
correctly, it should print out "AMBERG 1.3 SW".
Part 3: Weather Data (Per Row)
RecordTemps
:
RPC - Now you'll implement RecordTemps
function in StationService
class. It receives temperature data and writes it to
weather.stations
.
The ClientRecordTemps.py pulls its data from src/weather.parquet. You can run it as follows:
docker exec -w /src p6-db-1 python3 ClientRecordTemps.py
If you did RecordTemps correctly, it should print out a list of: Inserted {station_id} on {date} with tmin={tmin} and tmax={tmax}
.
StationMax
:
RPC - Similarly, You'll also implement StationMax
RPC in server.py, which will return the maximum tmax
ever seen for the given station.
Then, you can use ClientStationMax.py to make a client call:
docker exec -w /src p6-db-1 python3 ClientStationMax.py USR0000WDDG
If you did RecordTemps and StationMax correctly, it will print out 344.
Part 4: Fault Tolerance
Error Handling
Go back to RecordTempsReply
and StationMaxReply
and add some error
handling code. If there is a cassandra.Unavailable
or
cassandra.cluster.NoHostAvailable
exception, the error
field in
the gRPC response should be "unavailable".
If there are other exceptions besides these, return an error message
of your choosing. If the error
field is set, it doesn't matter what
number your StationMax
returns for tmax
.
Consistency Levels
We want the server to be highly available for receiving writes. Thus, we'll set W=1. Choose an R so that R + W > RF.
The thought is that real sensors might not have much space to save old
data that hasn't been uploaded, so we want to accept writes whenever
possible. We also want to avoid a situation where a StationMax
returns a smaller temperature than one previously added with
RecordTemps
; it would be better to return an error message if
necessary.
RecordTempsReply
is a method that writes to the database (so it
should be highly available) whereas StationMaxReply
is a method that
reads (so it is OK if it is unavailable if some replicas are down).
To implement your W and R settings in these methods, you need to modify your code to use prepared statements instead of doing
queries directly. If you have a prepared statement prepared
, you can
set the consistency level like this:
from cassandra.query import ConsistencyLevel
...
prepared.consistency_level = ConsistencyLevel.????
You can fill in the missing code with ConsistencyLevel.ONE
,
ConsistencyLevel.TWO
, or ConsistencyLevel.THREE
.
Specifically, you should define prepare statements for your SQL query in RecordTempsReply and StationMaxReply. The prepare statements should be defined in the __init__ function of StationService class, so that the prepared statements can be prepared once and used many times.
Disaster Strikes
Important: run a docker
command to kill the p6-db-2
container.
Run the following command
docker exec p6-db-1 nodetool status
Verify that you see that one of the nodes is down before proceeding to the next steps.
Call ClientRecordTemps.py again. Inserts should happen with ConsistencyLevel.ONE
, so this ought to work, meaning the empty string is the expected result for error
.
Call ClientStationMax.py again. Reads should raises a cassandra.Unavailable except e, then the error should have a string like this:
unavailable
Submission
Read the directions here about how to create the
repo. Note that we will copy in the the provided files (every file
except server.py
and src/requirements.txt
), overwriting anything
you might have changed.
Make sure you upload every file you need to run the following commands:
docker build . -t p6
docker compose up -d
docker exec -w /src p6-db-1 sh -c "python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. station.proto "
# in a terminal:
docker exec -it -w /src p6-db-1 python3 server.py
# in a second terminal:
docker exec -w /src p6-db-1 python3 ClientStationSchema.py
Testing:
Please be sure that your installed autobadger is on version TODO. You can print the version using
autobadger --info
See projects.md for more information.
This constitutes 90% of the total score. The remaining 10% will be graded manually. We will be checking your read and write consistency levels.