added pub/sub to run-waku

This commit is contained in:
0xFugue 2023-01-21 23:58:45 +05:30
parent 73bca2786e
commit 43ada8fd4f
4 changed files with 75 additions and 23 deletions

View File

@ -1,13 +1,21 @@
DEFAULT_CONFIG_FILE = "github.com/logos-co/wadoku/waku/config.json"
DEFAULT_RUN_PAIR = "lf"
def get_config_file(args):
return DEFAULT_CONFIG_FILE if not hasattr(args, "config") else args.config
def get_run_pair(args):
return DEFAULT_RUN_PAIR if not hasattr(args, "run_pair") else args.run_pair
def run(args):
print(args)
config_file = get_config_file(args)
run_pair = get_run_pair(args)
print("Reading the config from: %s" %config_file)
print("The runpair is %s" %run_pair)
config_json = read_file(src=config_file)
config = json.decode(config_json)
@ -19,7 +27,10 @@ def run(args):
ctopic = config['content_topic']
print(config)
waku_filtr = add_service(
if run_pair == "lf": # run lightpush and filter
waku_filtr = add_service(
service_id = "waku-filter",
config = struct(
image = "waku-filter:alpha",
@ -30,8 +41,8 @@ def run(args):
"-l=" + loglvl,
"-i=" + iat ],
),
)
waku_lpush = add_service(
)
waku_lpush = add_service(
service_id = "waku-lightpush",
config = struct(
image = "waku-lightpush:alpha",
@ -42,7 +53,33 @@ def run(args):
"-l=" + loglvl,
"-i=" + iat ],
),
)
)
print(waku_filtr, waku_lpush)
print(waku_filtr, waku_lpush)
else: # run waku publish and subscribe
waku_sub = add_service(
service_id = "waku-subscribe",
config = struct(
image = "waku-subscribe:alpha",
entrypoint= ["/go/bin/waku-subscribe"],
cmd = [ "-o=" + "/go/bin/out/subscribe.out",
"-d=" + duration,
"-c=" + ctopic,
"-l=" + loglvl,
"-i=" + iat ],
),
)
waku_pub = add_service(
service_id = "waku-publish",
config = struct(
image = "waku-publish:alpha",
entrypoint= ["/go/bin/waku-publish"],
cmd = [ "-o=" + "/go/bin/out/publish.out",
"-d=" + duration,
"-c=" + ctopic,
"-l=" + loglvl,
"-i=" + iat ],
),
)
print(waku_sub, waku_pub)

View File

@ -1,13 +1,24 @@
#!/bin/sh
usage(){
echo "Usage: ./run.sh < clean | metal | docker | kurtosis>"
echo "Usage: ./run.sh <clean | metal | docker | kurtosis> <duration> <time> <ps|lf>"
exit 1
}
run_pair="lf"
# two tests per run
filtr="filter"
lpush="lightpush"
if [ lf = $4 ]; then
lpush="lightpush"
filtr="filter"
elif [ ps = $4 ]; then
lpush="publish"
filtr="subscribe"
run_pair="ps"
# two tests per run
else
echo "Unknown run pair: $4"
usage
fi
# file/dir locations
prefix="waku"
@ -18,14 +29,14 @@ host_output_dir="/home1/mimosa/runs"
# params for the run
ctopic="8fc1f6b30b63bdd0a65df833f1da3fa" # default ctopic, a md5
duration="100s" # duration of the run
iat="100ms" # pub msg inter-arrival-time
iat="100ms" # pub msg inter-arrival-time
loglvl="info" # log level
duration=$2
#iat=$3
sleep_time=20
time=""
time=$3
unique="XYZ"
@ -65,7 +76,7 @@ build_docker() {
start() {
time="$(\date +"%Y-%m-%d-%Hh%Mm%Ss")"
#time="$(\date +"%Y-%m-%d-%Hh%Mm%Ss")"
ctopic="$duration-$iat-$time-$unique"
echo "\t\t.............................................."
echo "\t\tBEGINNING THE $1 RUN @ $time"
@ -86,7 +97,7 @@ metal_run() {
FTDIR="$host_output_dir/data/metal/$ctopic"
LPDIR="$host_output_dir/data/metal/$ctopic"
killall -9 $prefix-$filtr $prefix-$lpush
killall -SIGTERM $prefix-$filtr $prefix-$lpush # TODO: send SIGTERM/SIGINT
cd waku/$filtr
[ -d $FTDIR ] || mkdir -p $FTDIR
ofname="$FTDIR/$filtr.out"
@ -94,7 +105,7 @@ metal_run() {
./$prefix-$filtr -o $ofname -d $duration -i $iat -l $loglvl -c $ctopic > $FTDIR/$filtr.log &
echo " done"
cd ..
cd lightpush
cd $lpush
sleep $sleep_time
@ -122,7 +133,7 @@ docker_run() {
[ -d $FTDIR ] || mkdir -p $FTDIR
ofname="$FTDIR/$filtr.out"
echo "docker: stopping and removing $prefix-$filtr"
docker stop $prefix-$filtr
docker stop $prefix-$filtr # NOTE: docker stop sends SIGTERM; stick to it
docker rm $prefix-$filtr
echo "docker: starting $prefix-$filtr"
echo "docker run --name "$prefix-$filtr" -d=true "$prefix-$filtr:alpha" -o "$docker_op_dir/$filtr.out" -d $duration -l $loglvl -i $iat -c $ctopic "
@ -134,7 +145,7 @@ docker_run() {
[ -d $LPDIR ] || mkdir -p $LPDIR
ofname="$LPDIR/$lpush.out"
echo "docker: stopping and removing $prefix-$lpush"
docker stop $prefix-$lpush
docker stop $prefix-$lpush # NOTE: docker stop sends SIGTERM; stick to it
docker rm $prefix-$lpush
echo "docker: starting $prefix-$lpush"
echo " docker run --name "$prefix-$lpush" -d=true "$prefix-$lpush:alpha" -o "$docker_op_dir/$lpush.out" -d $duration -i $iat -l $loglvl -c $ctopic"
@ -167,7 +178,7 @@ kurtosis_run() {
[ -d $FTDIR ] || mkdir -p $FTDIR
[ -d $LPDIR ] || mkdir -p $LPDIR
#cd waku
kurtosis clean -a
kurtosis clean -a # TODO: find out what signal kurtosis sends
docker rm $prefix-$filtr
docker rm $prefix-$lpush
@ -179,9 +190,10 @@ kurtosis_run() {
\"content_topic\": \"$ctopic\",
\"log_level\": \"$loglvl\",
\"log_file\": \"output.log\"
}" > waku/config.json
}" > waku/config.json
kurtosis run --enclave-id $enclave . '{"config":"github.com/logos-co/wadoku/waku/config.json"}' > $FTDIR/kurtosis_output.log
echo "kurtosis run --enclave-id $enclave . '{\"config\":\"github.com/logos-co/wadoku/waku/config.json\", \"run_pair\":\"$run_pair\"}' > $FTDIR/kurtosis_output.log"
kurtosis run --enclave-id $enclave . '{"config":"github.com/logos-co/wadoku/waku/config.json", "run_pair":"'$run_pair'"}' > $FTDIR/kurtosis.output
sleep $sleep_time
@ -206,11 +218,10 @@ kurtosis_run() {
}
echo "$# $1 $2"
[ 2 -eq "$#" ] || usage
echo "$# $1 $2 $3 $4"
[ 4 -eq "$#" ] || usage
[ metal != $1 -a docker != $1 -a kurtosis != $1 -a clean != $1 ] && usage
if [ clean = $1 ]; then
start $1
clean

View File

@ -3,11 +3,11 @@ FROM golang:1.18
# Create the app directory to hold apps source code
ADD waku-subscribe /go/bin/waku-subscribe
ADD ./waku-subscribe /go/bin/waku-subscribe
RUN umask -S 202
RUN mkdir -p /go/bin/out
ADD subscribe.go /go/src/subscribe.go
ADD ./subscribe.go /go/src/subscribe.go
ADD go.mod go.sum /go/src/
# Tells Docker which network port your container listens on

View File

@ -120,6 +120,10 @@ func main() {
log.Info("Waiting to receive the message")
for env := range theSub.C {
msg := env.Message()
// ignore other relay messages
if msg.ContentTopic != conf.ContentTopic {
continue
}
rbuf := bytes.NewBuffer(msg.Payload)
var r32 int32 //:= make([]int64, (len(msg.Payload)+7)/8)