Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cdis/cs/courses/cs544/s25/main
  • zzhang2478/main
  • spark667/main
  • vijayprabhak/main
  • vijayprabhak/544-main
  • wyang338/cs-544-s-25
  • jmin39/main
7 results
Show changes
Showing
with 3113 additions and 0 deletions
services:
nb:
image: spark-demo
ports:
- "127.0.0.1:5000:5000"
- "127.0.0.1:4040:4040"
volumes:
- "./nb:/nb"
command: python3 -m jupyterlab --no-browser --ip=0.0.0.0 --port=5000 --allow-root --NotebookApp.token=''
nn:
image: spark-demo
hostname: nn
command: sh -c "hdfs namenode -format -force && hdfs namenode -D dfs.replication=1 -fs hdfs://nn:9000"
dn:
image: spark-demo
command: hdfs datanode -fs hdfs://nn:9000
spark-boss:
image: spark-demo
hostname: boss
command: sh -c "/spark-3.5.5-bin-hadoop3/sbin/start-master.sh && sleep infinity"
spark-worker:
image: spark-demo
command: sh -c "/spark-3.5.5-bin-hadoop3/sbin/start-worker.sh spark://boss:7077 -c 2 -m 2g && sleep infinity"
deploy:
replicas: 2
%% Cell type:code id:595b2e04-3312-4c06-a0fd-9c147076f0f9 tags:
``` python
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
.master("spark://boss:7077")
.config("spark.executor.memory", "2G")
.config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate())
```
%% Output
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/15 18:51:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
%% Cell type:code id:d8efc653-d8cf-4fb2-a982-fb2f93a8630f tags:
``` python
import pandas as pd
import numpy as np
df = pd.DataFrame({"x1": np.random.randint(0, 10, 100).astype(float),
"x2": np.random.randint(0, 3, 100).astype(float)})
df["y"] = df["x1"] + df["x2"] + np.random.rand(len(df))
df = spark.createDataFrame(df)
df.limit(3).toPandas()
```
%% Output
x1 x2 y
0 2.0 1.0 3.262734
1 6.0 0.0 6.347860
2 3.0 0.0 3.134057
FROM ubuntu:22.04
RUN apt-get update; apt-get install -y wget curl openjdk-17-jdk python3-pip iproute2
# Python stuff
RUN pip3 install cassandra-driver==3.28.0 grpcio==1.58.0 grpcio-tools==1.58.0 jupyterlab==4.3.6 pandas==2.2.3
# SPARK
RUN wget https://dlcdn.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz && \
tar -xf spark-3.5.5-bin-hadoop3.tgz && \
rm spark-3.5.5-bin-hadoop3.tgz
# CASSANDRA
RUN wget https://archive.apache.org/dist/cassandra/5.0.0/apache-cassandra-5.0.0-bin.tar.gz; tar -xf apache-cassandra-5.0.0-bin.tar.gz; rm apache-cassandra-5.0.0-bin.tar.gz
ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
ENV PATH="${PATH}:/apache-cassandra-5.0.0/bin:/spark-3.4.1-bin-hadoop3.2/bin"
COPY cassandra.sh /cassandra.sh
CMD ["sh", "/cassandra.sh"]
echo "-Xms128M" >> /apache-cassandra-5.0.0/conf/jvm-server.options
echo "-Xmx128M" >> /apache-cassandra-5.0.0/conf/jvm-server.options
# get the environment variable
PROJECT=${PROJECT}
sed -i "s/^listen_address:.*/listen_address: "`hostname`"/" /apache-cassandra-5.0.0/conf/cassandra.yaml
sed -i "s/^rpc_address:.*/rpc_address: "`hostname`"/" /apache-cassandra-5.0.0/conf/cassandra.yaml
sed -i "s/- seeds:.*/- seeds: ${PROJECT}-db-1,${PROJECT}-db-2,${PROJECT}-db-3/" /apache-cassandra-5.0.0/conf/cassandra.yaml
/apache-cassandra-5.0.0/bin/cassandra -R
sleep infinity
name: p6
services:
db:
image: cassandra-demo
hostname: db
ports:
- "5000-5002:5000"
volumes:
- "./src:/src"
deploy:
replicas: 3
resources:
limits:
memory: 2.5G
environment:
- PROJECT=${PROJECT}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
File added
p2/arch.png

101 KiB

name: ${PROJECT}
services:
cache:
image: ${PROJECT}-cache
deploy:
replicas: 3
ports:
- "8080"
environment:
- PROJECT=${PROJECT}
dataset:
image: ${PROJECT}-dataset
deploy:
replicas: 2
environment:
- PROJECT=${PROJECT}
This diff is collapsed.
import grpc, sys
import table_pb2_grpc, table_pb2
SERVER = "localhost:5440"
BATCH_COUNT = 400
BATCH_SIZE = 250_000
def main():
if len(sys.argv) != 1:
print("Usage: python3 bigdata.py")
sys.exit(1)
channel = grpc.insecure_channel(SERVER)
stub = table_pb2_grpc.TableStub(channel)
for batch in range(BATCH_COUNT):
print(f"Batch {batch+1} of {BATCH_COUNT}: 0.25 million rows")
rows = "x,y,z\n" + "\n".join([f"1,{i},{batch*1000+i%1000}" for i in range(BATCH_SIZE)])
resp = stub.Upload(table_pb2.UploadReq(csv_data=bytes(rows, "utf-8")))
if resp.error:
print(resp.error)
sys.exit(1)
else:
print("uploaded ")
if __name__ == "__main__":
main()
import grpc, sys, time
import table_pb2_grpc, table_pb2
SERVER = "localhost:5440"
def main():
if len(sys.argv) != 2:
print("Usage: python3 csvsum.py <COLUMN>")
sys.exit(1)
column = sys.argv[1]
channel = grpc.insecure_channel(SERVER)
stub = table_pb2_grpc.TableStub(channel)
start = time.time()
resp = stub.ColSum(table_pb2.ColSumReq(column=column, format="csv"))
end = time.time()
print(f"{round((end-start)*1000, 1)} ms")
if resp.error:
print(resp.error)
else:
print(resp.total)
if __name__ == "__main__":
main()
x,y,z
1,2,3
4,5,6
import grpc, sys, time
import table_pb2_grpc, table_pb2
SERVER = "localhost:5440"
def main():
if len(sys.argv) != 2:
print("Usage: python3 parquetsum.py <COLUMN>")
sys.exit(1)
column = sys.argv[1]
channel = grpc.insecure_channel(SERVER)
stub = table_pb2_grpc.TableStub(channel)
start = time.time()
resp = stub.ColSum(table_pb2.ColSumReq(column=column, format="parquet"))
end = time.time()
print(f"{round((end-start)*1000, 1)} ms")
if resp.error:
print(resp.error)
else:
print(resp.total)
if __name__ == "__main__":
main()
import grpc, sys
import table_pb2_grpc, table_pb2
SERVER = "localhost:5440"
def main():
if len(sys.argv) != 2:
print("Usage: python3 upload.py <CSV_PATH>")
sys.exit(1)
path = sys.argv[1]
channel = grpc.insecure_channel(SERVER)
stub = table_pb2_grpc.TableStub(channel)
with open(path, "rb") as f:
data = f.read() # binary "bytes" data
resp = stub.Upload(table_pb2.UploadReq(csv_data=data))
if resp.error:
print(resp.error)
else:
print("success")
if __name__ == "__main__":
main()
venv/
tmp/
*.csv