Skip to content
Snippets Groups Projects
user avatar
TYLER CARAZA-HARTER authored
c463b53e
History
Code owners
Assign users and groups as approvers for specific file changes. Learn more.

DRAFT! Don't start yet.

P6 (4% of grade): Cassandra, Weather Data

Overview

NOAA (National Oceanic and Atmospheric Administration) collects weather data from all over the world. In this project, you'll build a gRPC server that receives data (from some .py client, but imagine weather stations) and inserts it into a Cassandra table. Your server will also let clients ask simple questions about the data.

We'll also explore read/write availability tradeoffs. We always want sensors to be able to upload data, but it is OK if we cannot always read the latest stats (we prefer an error over inconsistent results).

Learning objectives:

  • create Cassandra schemas involving partition keys and cluster keys
  • use Spark to preprocess data for insertion into Cassandra
  • configure queries to achieve a tradeoff between read and write availability
  • use prepared statements

Before starting, please review the general project directions.

IMPORTANT:

  • You may not modify the Dockerfile, docker-compose.yml, cassandra.sh, src/Client*.py, and src/station.proto. The autobadger will give 0 point if you modify those files.

Cluster Setup

Note that the compose file assumes there is a "PROJECT" environment variable. You can set it to p6 in your environment:

  • export PROJECT=p6

We provide the Dockerfile and docker-compose.yml for this project. In Dockerfile, we install several necessary python packages. You can also add more packages you need in src/requirements.txt. You can run the following:

  • docker build . -t p6
  • docker compose up -d

It will start three containers ('p6-db-1', 'p6-db-2', 'p6-db-3'). It generally takes around 1 to 2 minutes for the Cassandra cluster to be ready. Note that you may not modify the Dockerfile.

Run the following command:

docker exec p6-db-1 nodetool status

and if the cluster is ready, it will produce an output like this:

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load       Tokens  Owns (effective)  Host ID                               Rack 
UN  172.27.0.4  70.28 KiB  16      64.1%             90d9e6d3-6632-4721-a78b-75d65c673db1  rack1
UN  172.27.0.3  70.26 KiB  16      65.9%             635d1361-5675-4399-89fa-f5624df4a960  rack1
UN  172.27.0.2  70.28 KiB  16      70.0%             8936a80e-c6b2-42ef-b54d-4160ff08857d  rack1

If the cluster is not ready it will generally show an error. If this occurs then wait a little bit and rerun the command and keep doing so until you see that the cluster is ready.

Part 1: Server Initialization

Communication

We provide the client programs and src/station.proto.

  • Inside the src/ directory, four client programs (ClientStationSchema.py, ClientStationName.py, ClientRecordTemps.py, ClientStationMax.py) will communicate with with your server.py, via gRPC. We'll explain each RPC in details later.
  • station.proto contains rpc and message entries to generate a gRPC stub (used by our clients).

Firstly, run the grpc_tools.protoc tool in p6-db-1 to generate stub code for our clients and servicer code for your server.

docker exec -w /src p6-db-1 sh -c "python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. station.proto"

Your job is to finish writing the StationService class in the provided server.py (StationService overrides methods in the StationServicer class generated by the protoc tool).

In your server, you need to complete the __init__ function and override four RPC methods for the StationService class.

Note: Don't delete print("Server started") in StationService.__init__ or add any code after it in the constructor. The autograder will watch for that print to know your server is ready.

If communication is working correctly so far, you should be able to start a server and used a client to get back a "TODO" error message via gRPC:

# in a terminal:
docker exec -it -w /src p6-db-1 python3 server.py

# in a second terminal:
docker exec -w /src p6-db-1 python3 ClientStationSchema.py

Note: In both container, you need to set the enviroment variable PROJECT to p6.

Cassandra Schema

Inside the __init__ method, you firstly need to connect to the Cassandra cluster, like in lecture: https://git.doit.wisc.edu/cdis/cs/courses/cs544/s25/main/-/tree/main/lec/26-cassandra?ref_type=heads

Note: when running locally, the workers will be named p6-db-<N>, but the autograder will use a different prefix, ????-db-<N>. You can use os.environ['PROJECT'] to get the prefix programmatically and connect to the correct Cassandra workers.

Write additional code using the session to run CQL queries to do the following:

  • drop a weather keyspace if it already exists
  • create a weather keyspace with 3x replication
  • inside weather, create a station_record type containing two ints: tmin and tmax
  • inside weather, create a stations table

The stations table should have four columns: id (text), name (text), date (date), record (weather.station_record):

  • id is a partition key and corresponds to a station's ID (like 'USC00470273')
  • date is a cluster key, ascending
  • name is a static field (because there is only one name per ID). Example: 'UW ARBORETUM - MADISON'
  • record is a regular field because there will be many records per station partition.
RPC - StationSchema:

Now you'll implement StationSchema function in StationService class. It should execute describe table weather.stations cassandra query and extract the create_statement from result.

Then, after restarting server.py ('server started' printed in the terminal), you should be able to use ClientStationSchema.py to make a client call:

docker exec -w /src p6-db-1 python3 ClientStationSchema.py

If your implementation is correct, the above command will print out something like this:

CREATE TABLE weather.stations (
    id text,
    date date,
    name text static,
    record station_record,
    PRIMARY KEY (id, date)
) WITH CLUSTERING ORDER BY (date ASC)
    AND additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND cdc = false
    ...

Spark Session

Your constructor should also create a Spark session like this:

self.spark = SparkSession.builder.appName("p6").getOrCreate()

This is a local Spark deployment, so you won't have separate Spark workers and a boss. Instead, tasks run in the Spark driver for the session, so in the same container as your server.py. This means local file paths will work, and you won't use HDFS for anything in this project.

You'll use your Spark session in the next part to process a text file describing stations and insert that into your Cassandra table.

Part 2: Station Data (Per Partition)

Your constructor should load data with Spark from src/ghcnd-stations.txt, then insert to Cassandra. Note that we did some similar parsing of this same file during lecture:

https://git.doit.wisc.edu/cdis/cs/courses/cs544/f24/main/-/blob/main/lec/20-spark/nb/lec1b.ipynb?ref_type=heads

In your implementation, do the following:

  • Use Spark and SUBSTRING to extract ID, STATE, and NAME from src/ghcnd-stations.txt. Reference the documentation to determine the offsets (this contains format descriptions for several different files, so be sure you're reading about the correct one):

    https://www.ncei.noaa.gov/pub/data/ghcn/daily/readme.txt

  • Filter your results to the state of Wisconsin, and collect the rows so you can loop over them

  • Do an INSERT into your weather.stations table for each station ID and name.

RPC - StationName:

You'll implement the StationName function in StationService class. The function should execute a Cassandra query and parse the result to obtain the name of a station for a specific station id. The station id is stored in request.station (refer to station.proto).

Then, you can use ClientStationName.py to make a client call:

docker exec -w /src p6-db-1 python3 ClientStationName.py US1WIMR0003

If you did load station data and execute StationName correctly, it should print out "AMBERG 1.3 SW".

Part 3: Weather Data (Per Row)

RPC - RecordTemps:

Now you'll implement RecordTemps function in StationService class. It receives temperature data and writes it to weather.stations.

The ClientRecordTemps.py pulls its data from src/weather.parquet. You can run it as follows:

docker exec -w /src p6-db-1 python3 ClientRecordTemps.py

If you did RecordTemps correctly, it should print out a list of: Inserted {station_id} on {date} with tmin={tmin} and tmax={tmax}.

RPC - StationMax:

Similarly, You'll also implement StationMax RPC in server.py, which will return the maximum tmax ever seen for the given station.

Then, you can use ClientStationMax.py to make a client call:

docker exec -w /src p6-db-1 python3 ClientStationMax.py USR0000WDDG

If you did RecordTemps and StationMax correctly, it will print out 344.

Part 4: Fault Tolerance

Error Handling

Go back to RecordTempsReply and StationMaxReply and add some error handling code. If there is a cassandra.Unavailable or cassandra.cluster.NoHostAvailable exception, the error field in the gRPC response should be "unavailable".

If there are other exceptions besides these, return an error message of your choosing. If the error field is set, it doesn't matter what number your StationMax returns for tmax.

Consistency Levels

We want the server to be highly available for receiving writes. Thus, we'll set W=1. Choose an R so that R + W > RF.

The thought is that real sensors might not have much space to save old data that hasn't been uploaded, so we want to accept writes whenever possible. We also want to avoid a situation where a StationMax returns a smaller temperature than one previously added with RecordTemps; it would be better to return an error message if necessary.

RecordTempsReply is a method that writes to the database (so it should be highly available) whereas StationMaxReply is a method that reads (so it is OK if it is unavailable if some replicas are down).

To implement your W and R settings in these methods, you need to modify your code to use prepared statements instead of doing queries directly. If you have a prepared statement prepared, you can set the consistency level like this:

from cassandra.query import ConsistencyLevel
...
prepared.consistency_level = ConsistencyLevel.????

You can fill in the missing code with ConsistencyLevel.ONE, ConsistencyLevel.TWO, or ConsistencyLevel.THREE.

Specifically, you should define prepare statements for your SQL query in RecordTempsReply and StationMaxReply. The prepare statements should be defined in the __init__ function of StationService class, so that the prepared statements can be prepared once and used many times.

Disaster Strikes

Important: run a docker command to kill the p6-db-2 container.

Run the following command

docker exec p6-db-1 nodetool status

Verify that you see that one of the nodes is down before proceeding to the next steps.

Call ClientRecordTemps.py again. Inserts should happen with ConsistencyLevel.ONE, so this ought to work, meaning the empty string is the expected result for error.

Call ClientStationMax.py again. Reads should raises a cassandra.Unavailable except e, then the error should have a string like this:

unavailable

Submission

Read the directions here about how to create the repo. Note that we will copy in the the provided files (every file except server.py and src/requirements.txt), overwriting anything you might have changed.

Make sure you upload every file you need to run the following commands:

docker build . -t p6
docker compose up -d

docker exec -w /src p6-db-1 sh -c "python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. station.proto "

# in a terminal:
docker exec -it -w /src p6-db-1 python3 server.py

# in a second terminal:
docker exec -w /src p6-db-1 python3 ClientStationSchema.py

Testing:

Please be sure that your installed autobadger is on version TODO. You can print the version using

autobadger --info

See projects.md for more information.

This constitutes 90% of the total score. The remaining 10% will be graded manually. We will be checking your read and write consistency levels.