added loglvl and more logging

This commit is contained in:
0xFugue 2023-01-14 01:10:43 +05:30
parent 2d3d75a891
commit d9e43c4a5a
6 changed files with 127 additions and 51 deletions

5
.gitignore vendored
View File

@ -20,8 +20,13 @@ rlnCredentials.txt
*.aar *.aar
*.jar *.jar
# output/log dir and files
pkg/* pkg/*
data/* data/*
filter.out
lightpush.out
filter.log
lightpush.log
# output binaries # output binaries
waku-lightpush waku-lightpush

View File

@ -15,8 +15,8 @@ def run(args):
output_file = config['output_file'] output_file = config['output_file']
duration = config['duration'] duration = config['duration']
iat = config['iat'] iat = config['iat']
mount_src = config['mount_src'] loglvl = config['log_level']
mount_dst = config['mount_target'] ctopic = config['content_topic']
print(config) print(config)
waku_filtr = add_service( waku_filtr = add_service(
@ -26,6 +26,8 @@ def run(args):
entrypoint= ["/go/bin/waku-filter"], entrypoint= ["/go/bin/waku-filter"],
cmd = [ "-o=" + "/go/bin/out/filter.out", cmd = [ "-o=" + "/go/bin/out/filter.out",
"-d=" + duration, "-d=" + duration,
"-c=" + ctopic,
"-l=" + loglvl,
"-i=" + iat ], "-i=" + iat ],
), ),
) )
@ -36,6 +38,8 @@ def run(args):
entrypoint= ["/go/bin/waku-lightpush"], entrypoint= ["/go/bin/waku-lightpush"],
cmd = [ "-o=" + "/go/bin/out/lightpush.out", cmd = [ "-o=" + "/go/bin/out/lightpush.out",
"-d=" + duration, "-d=" + duration,
"-c=" + ctopic,
"-l=" + loglvl,
"-i=" + iat ], "-i=" + iat ],
), ),
) )

View File

@ -5,21 +5,27 @@ usage(){
exit 1 exit 1
} }
# two tests per run
filtr="filter" filtr="filter"
lpush="lightpush" lpush="lightpush"
# file/dir locations
prefix="waku" prefix="waku"
docker_op_dir="/go/bin/out" docker_op_dir="/go/bin/out"
enclave="waku-enclave" enclave="waku-enclave"
host_output_dir="/home1/mimosa/runs"
content_topic="80fc1f6b30b63bdd0a65df833f1da3fa" #just to be unique, a md5 # params for the run
duration="1000s" # run duration is 16.666 mins ctopic="8fc1f6b30b63bdd0a65df833f1da3fa" # default ctopic, a md5
iat="300ms" # pub msg inter-arrival-time is 300ms duration="100s" # duration of the run
iat="300ms" # pub msg inter-arrival-time
loglvl="info" # log level
sleep_time=5 sleep_time=5
time=""
clean(){ clean(){
parent=$(pwd) parent=$(pwd) # no pushd / popd
#rm -rf ./data #rm -rf ./data
cd waku/$filtr cd waku/$filtr
rm $prefix-$filtr rm $prefix-$filtr
@ -54,28 +60,33 @@ build_docker() {
start() { start() {
echo "\t\t.........................." time="$(\date +"%Y-%m-%d-%Hh%Mm%Ss")"
echo "\t\tBEGINNING THE $1 RUN..." ctopic="$duration-$iat-$time"
echo "\t\t.........................." echo "\t\t.............................................."
echo "\t\tBEGINNING THE $1 RUN @ $time"
echo "\t\tparams: $ctopic"
echo "\t\t.............................................."
} }
end() { end() {
echo "\t\t $1 RUN DONE..." echo "\t\t $1 RUN DONE @ $(\date +"%Y-%m-%d-%Hh%Mm%Ss") "
echo "\t\t params: $time, $duration, $iat-$ctopic"
} }
metal_run() { metal_run() {
parent=$(pwd) parent=$(pwd)
time="$(\date +"%Y-%m-%d-%Hh%Mm%Ss")" #time="$(\date +"%Y-%m-%d-%Hh%Mm%Ss")"
FTDIR="$(pwd)/data/metal/$time" FTDIR="$host_output_dir/data/metal/$ctopic"
LPDIR="$(pwd)/data/metal/$time" LPDIR="$host_output_dir/data/metal/$ctopic"
killall -9 $prefix-$filtr $prefix-$lpush
cd waku/$filtr cd waku/$filtr
[ -d $FTDIR ] || mkdir -p $FTDIR [ -d $FTDIR ] || mkdir -p $FTDIR
ofname="$FTDIR/$filtr.out" ofname="$FTDIR/$filtr.out"
echo -n "starting $filtr... $pwd " echo -n "starting $filtr... $pwd "
./$prefix-$filtr -o $ofname -d $duration -i $iat > $FTDIR/$filtr.log & ./$prefix-$filtr -o $ofname -d $duration -i $iat -l $loglvl -c $ctopic > $FTDIR/$filtr.log &
echo " done" echo " done"
cd .. cd ..
cd lightpush cd lightpush
@ -85,7 +96,7 @@ metal_run() {
[ -d $LPDIR ] || mkdir -p $LPDIR [ -d $LPDIR ] || mkdir -p $LPDIR
ofname="$LPDIR/$lpush.out" ofname="$LPDIR/$lpush.out"
echo -n "starting $lpush... " echo -n "starting $lpush... "
./$prefix-$lpush -o $ofname -d $duration -i $iat > $LPDIR/$lpush.log & ./$prefix-$lpush -o $ofname -d $duration -i $iat -l $loglvl -c $ctopic > $LPDIR/$lpush.log &
echo "done" echo "done"
cd $parent cd $parent
echo "$(date): Waiting for the metal run to finish in $duration" echo "$(date): Waiting for the metal run to finish in $duration"
@ -95,16 +106,17 @@ metal_run() {
docker_run() { docker_run() {
parent=$(pwd) parent=$(pwd)
time="$(\date +"%Y-%m-%d-%Hh%Mm%Ss")" #time="$(\date +"%Y-%m-%d-%Hh%Mm%Ss")"
FTDIR="$(pwd)/data/docker/$time" FTDIR="$host_output_dir/data/docker/$ctopic"
LPDIR="$(pwd)/data/docker/$time" LPDIR="$host_output_dir/data/docker/$ctopic"
#FTDIR="$host_output_dir/data/docker/$time"
#LPDIR="$host_output_dir/data/docker/$time"
[ -d $FTDIR ] || mkdir -p $FTDIR [ -d $FTDIR ] || mkdir -p $FTDIR
ofname="$FTDIR/$filtr.out" ofname="$FTDIR/$filtr.out"
echo "docker run $filtr $ofname" echo "docker run $filtr $ofname"
docker rm $prefix-$filtr docker rm $prefix-$filtr
echo "(docker run --name "$prefix-$filtr" "$prefix-$filtr:alpha" -o /go/bin/out/$filtr.out -d $duration -i $iat > $FTDIR/$filtr.log)" docker run --network=host --name "$prefix-$filtr" "$prefix-$filtr:alpha" -o "$docker_op_dir/$filtr.out" -d $duration -l $loglvl -i $iat > $FTDIR/$filtr.log &
docker run --name "$prefix-$filtr" "$prefix-$filtr:alpha" -o /go/bin/out/$filtr.out -d $duration -i $iat > $FTDIR/$filtr.log &
echo "$prefix-$filtr is running as $prefix-$filtr" echo "$prefix-$filtr is running as $prefix-$filtr"
sleep $sleep_time sleep $sleep_time
@ -113,7 +125,7 @@ docker_run() {
ofname="$LPDIR/$lpush.out" ofname="$LPDIR/$lpush.out"
echo "docker run $lpush $ofname" echo "docker run $lpush $ofname"
docker rm $prefix-$lpush docker rm $prefix-$lpush
docker run --name "$prefix-$lpush" "$prefix-$lpush:alpha" -o /go/bin/out/$lpush.out -d $duration -i $iat > $LPDIR/$lpush.log & docker run --network=host --name "$prefix-$lpush" "$prefix-$lpush:alpha" -o "$docker_op_dir/$lpush.out" -d $duration -i $iat -l $loglvl -c $ctopic> $LPDIR/$lpush.log &
echo "$prefix-$filtr is running as $prefix-$lpush" echo "$prefix-$filtr is running as $prefix-$lpush"
cd $parent cd $parent
@ -124,16 +136,20 @@ docker_run() {
# copy the output files # copy the output files
echo "Status code of docker run: ${status_code}" echo "Status code of docker run: ${status_code}"
echo "$(date): copying output files from docker" echo "$(date): copying output files from docker"
docker cp "$prefix-$filtr:/go/bin/out/$filtr.out" $FTDIR docker logs $prefix-$filtr > "$FTDIR/$filtr-docker.log"
docker cp "$prefix-$lpush:/go/bin/out/$lpush.out" $LPDIR docker logs $prefix-$lpush > "$LPDIR/$lpush-docker.log"
docker cp "$prefix-$filtr:$docker_op_dir/$filtr.out" $FTDIR
docker cp "$prefix-$lpush:$docker_op_dir/$lpush.out" $LPDIR
} }
kurtosis_run() { kurtosis_run() {
parent=$(pwd) parent=$(pwd)
time="$(\date +"%Y-%m-%d-%Hh%Mm%Ss")" #time="$(\date +"%Y-%m-%d-%Hh%Mm%Ss")"
FTDIR="$(pwd)/data/kurtosis/$time" FTDIR="$host_output_dir/data/kurtosis/$ctopic"
LPDIR="$(pwd)/data/kurtosis/$time" LPDIR="$host_output_dir/data/kurtosis/$ctopic"
#FTDIR="$host_output_dir/data/kurtosis/$time"
#LPDIR="$host_output_dir/data/kurtosis/$time"
[ -d $FTDIR ] || mkdir -p $FTDIR [ -d $FTDIR ] || mkdir -p $FTDIR
[ -d $LPDIR ] || mkdir -p $LPDIR [ -d $LPDIR ] || mkdir -p $LPDIR
@ -142,7 +158,17 @@ kurtosis_run() {
docker rm $prefix-$filtr docker rm $prefix-$filtr
docker rm $prefix-$lpush docker rm $prefix-$lpush
kurtosis run --enclave-id $enclave main.star '{"config":"github.com/logos-co/wadoku/waku/config.json"}' > $FTDIR/kurtosis_output.log # generate the config.json
echo "{
\"output_file\": \"output.out\",
\"duration\": \"$duration\",
\"iat\": \"$iat\",
\"content_topic\": \"$ctopic\",
\"log_level\": \"$loglvl\",
\"log_file\": \"output.log\"
}" > waku/config.json
kurtosis run --enclave-id $enclave . '{"config":"github.com/logos-co/wadoku/waku/config.json"}' > $FTDIR/kurtosis_output.log
sleep $sleep_time sleep $sleep_time
filtr_suffix="$(kurtosis enclave inspect $enclave | grep $prefix-$filtr | cut -f 1 -d ' ')" filtr_suffix="$(kurtosis enclave inspect $enclave | grep $prefix-$filtr | cut -f 1 -d ' ')"
lpush_suffix="$(kurtosis enclave inspect $enclave | grep $prefix-$lpush | cut -f 1 -d ' ')" lpush_suffix="$(kurtosis enclave inspect $enclave | grep $prefix-$lpush | cut -f 1 -d ' ')"
@ -151,13 +177,16 @@ kurtosis_run() {
filtr_cid="$enclave--user-service--$filtr_suffix" filtr_cid="$enclave--user-service--$filtr_suffix"
lpush_cid="$enclave--user-service--$lpush_suffix" lpush_cid="$enclave--user-service--$lpush_suffix"
echo "created $filtr_cid, $lpush_cid..." echo "created $filtr_cid, $lpush_cid..."
echo "$(date): Waiting for the kurtosis run to finish in $duration" echo "$(date): Waiting for the kurtosis ($filtr_cid, $lpush_cid ) run to finish in $duration"
status_code="$(docker container wait $filtr_cid $lpush_cid)" status_code="$(docker container wait $filtr_cid $lpush_cid)"
echo "Status code of the kurtosis run: ${status_code}"
# copy the output files # copy the output files
docker cp "$filtr_cid:/go/bin/out/$filtr.out" $FTDIR docker cp "$filtr_cid:/go/bin/out/$filtr.out" $FTDIR
docker logs $filtr_cid > $FTDIR/$filtr.log
docker cp "$lpush_cid:/go/bin/out/$lpush.out" $LPDIR docker cp "$lpush_cid:/go/bin/out/$lpush.out" $LPDIR
docker logs $lpush_cid > $LPDIR/$lpush.log
kurtosis enclave dump $enclave $FTDIR/kurtosis_dump
echo "Status code of the kurtosis run: ${status_code}"
cd $parent cd $parent
} }
@ -185,6 +214,7 @@ elif [ docker = $1 ]; then
elif [ kurtosis = $1 ]; then elif [ kurtosis = $1 ]; then
build_metal build_metal
build_docker build_docker
clean # stay under the 4mb limit imposed by gRPC
start $1 start $1
kurtosis_run kurtosis_run
end $1 end $1

View File

@ -1,8 +1,8 @@
{ {
"output_file": "output.out", "output_file": "output.out",
"duration": "1000s", "duration": "4000s",
"iat": "300ms", "iat": "300ms",
"mount_src": "/go/bin/out", "content_topic": "2023-01-13-23h49m02s-4000s-300ms",
"mount_target": "/go/bin/out", "log_level": "info",
"log_file": "output.log" "log_file": "output.log"
} }

View File

@ -5,6 +5,8 @@ import (
"flag" "flag"
"fmt" "fmt"
"net" "net"
"bytes"
"encoding/binary"
"os" "os"
"time" "time"
@ -29,6 +31,7 @@ const dnsDiscoveryUrl = "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZW
const nameServer = "1.1.1.1" // your local dns provider might be blocking entr const nameServer = "1.1.1.1" // your local dns provider might be blocking entr
type Config struct { type Config struct {
LogLevel string
Ofname string Ofname string
ContentTopic string ContentTopic string
Iat time.Duration Iat time.Duration
@ -42,8 +45,10 @@ func init() {
fmt.Println("Populating CLI params...") fmt.Println("Populating CLI params...")
flag.DurationVar(&conf.Duration, "d", 1000*time.Second, flag.DurationVar(&conf.Duration, "d", 1000*time.Second,
"Specify the duration (1s,2m,4h)") "Specify the duration (1s,2m,4h)")
flag.DurationVar(&conf.Iat, "i", 100*time.Millisecond, flag.DurationVar(&conf.Iat, "i", 300*time.Millisecond,
"Specify the interarrival time in millisecs") "Specify the interarrival time in millisecs")
flag.StringVar(&conf.LogLevel, "l", "info",
"Specify the log level")
flag.StringVar(&conf.Ofname, "o", "lightpush.out", flag.StringVar(&conf.Ofname, "o", "lightpush.out",
"Specify the output file") "Specify the output file")
flag.StringVar(&conf.ContentTopic, "c", "d608b04e6b6fd7006afdfe916f08b5d", flag.StringVar(&conf.ContentTopic, "c", "d608b04e6b6fd7006afdfe916f08b5d",
@ -55,7 +60,7 @@ func main() {
flag.Parse() flag.Parse()
// setup the log // setup the log
lvl, err := logging.LevelFromString("info") lvl, err := logging.LevelFromString(conf.LogLevel)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -73,8 +78,9 @@ func main() {
panic(err) panic(err)
} }
log.Info("CONFIG : ", conf)
// find the list of full node fleet peers // find the list of full node fleet peers
fmt.Printf("attempting DNS discovery with %s\n", dnsDiscoveryUrl) log.Info("attempting DNS discovery with: ", dnsDiscoveryUrl)
nodes, err := dnsdisc.RetrieveNodes(ctx, dnsDiscoveryUrl, dnsdisc.WithNameserver(nameServer)) nodes, err := dnsdisc.RetrieveNodes(ctx, dnsDiscoveryUrl, dnsdisc.WithNameserver(nameServer))
if err != nil { if err != nil {
panic(err.Error()) panic(err.Error())
@ -85,26 +91,27 @@ func main() {
for _, n := range nodes { for _, n := range nodes {
nodeList = append(nodeList, n.Addresses...) nodeList = append(nodeList, n.Addresses...)
} }
fmt.Printf("Discovered and connecting to %v \n", nodeList[0]) log.Info("Discovered and connecting to: ", nodeList[0])
peerID, err := nodeList[0].ValueForProtocol(multiaddr.P_P2P) peerID, err := nodeList[0].ValueForProtocol(multiaddr.P_P2P)
if err != nil { if err != nil {
fmt.Printf("could not get peerID: %s \n", err) log.Error("could not get peerID: ", err)
panic(err) panic(err)
} }
err = lightNode.DialPeerWithMultiAddress(ctx, nodeList[0]) err = lightNode.DialPeerWithMultiAddress(ctx, nodeList[0])
if err != nil { if err != nil {
fmt.Printf("could not connect to %s: %s \n", peerID, err) log.Error("could not connect to ", peerID, err)
panic(err) panic(err)
} }
fmt.Println("STARTING THE LIGHTNODE ", conf.ContentTopic) log.Info("STARTING THE FILTER NODE ", conf.ContentTopic)
// start the light node // start the light node
err = lightNode.Start() err = lightNode.Start()
if err != nil { if err != nil {
log.Error("COULD NOT START THE FILTER NODE ", conf.ContentTopic)
panic(err) panic(err)
} }
fmt.Println("SUBSCRIBING TO THE TOPIC ", conf.ContentTopic) log.Info("SUBSCRIBING TO THE TOPIC ", conf.ContentTopic)
// Subscribe to our ContentTopic and send a FilterRequest // Subscribe to our ContentTopic and send a FilterRequest
cf := filter.ContentFilter{ cf := filter.ContentFilter{
Topic: pubSubTopic.String(), Topic: pubSubTopic.String(),
@ -119,6 +126,7 @@ func main() {
go func() { go func() {
f, err := os.OpenFile(conf.Ofname, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) f, err := os.OpenFile(conf.Ofname, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil { if err != nil {
log.Error("Could not open file: ", err)
panic(err) panic(err)
} }
defer f.Close() defer f.Close()
@ -126,7 +134,17 @@ func main() {
for env := range theFilter.Chan { for env := range theFilter.Chan {
msg := env.Message() msg := env.Message()
log.Info("Light node received msg, ", string(msg.Payload)) log.Info("Light node received msg, ", string(msg.Payload))
str := fmt.Sprintf("Received msg, @", string(msg.ContentTopic), "@", msg.Timestamp, "@", utils.GetUnixEpochFrom(lightNode.Timesource().Now()))
rbuf := bytes.NewBuffer(msg.Payload)
var r32 int32 //:= make([]int64, (len(msg.Payload)+7)/8)
err = binary.Read(rbuf, binary.LittleEndian, &r32)
if err != nil {
log.Error("binary.Read failed:", err)
panic(err)
}
msg_delay := time.Since(time.Unix(0, msg.Timestamp))
str := fmt.Sprintf("Received msg, @", r32, "@", string(msg.ContentTopic), "@", msg.Timestamp, "@", utils.GetUnixEpochFrom(lightNode.Timesource().Now()), "@", msg_delay.Microseconds(), "@", msg_delay.Milliseconds() )
log.Info(str) log.Info(str)
//"Received msg, @", string(msg.ContentTopic), "@", msg.Timestamp, "@", utils.GetUnixEpochFrom(lightNode.Timesource().Now()) ) //"Received msg, @", string(msg.ContentTopic), "@", msg.Timestamp, "@", utils.GetUnixEpochFrom(lightNode.Timesource().Now()) )
if _, err = f.WriteString(str + "\n"); err != nil { if _, err = f.WriteString(str + "\n"); err != nil {

View File

@ -5,6 +5,8 @@ import (
"flag" "flag"
"fmt" "fmt"
"net" "net"
"bytes"
"encoding/binary"
"os" "os"
"time" "time"
@ -28,7 +30,10 @@ var log = logging.Logger("lightpush")
const NameServer = "1.1.1.1" // your local dns provider might be blocking entr const NameServer = "1.1.1.1" // your local dns provider might be blocking entr
const DnsDiscoveryUrl = "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im" const DnsDiscoveryUrl = "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im"
var seqNumber int32 = 0
type Config struct { type Config struct {
LogLevel string
Ofname string Ofname string
ContentTopic string ContentTopic string
Iat time.Duration Iat time.Duration
@ -44,6 +49,8 @@ func init() {
"Specify the duration (1s,2m,4h)") "Specify the duration (1s,2m,4h)")
flag.DurationVar(&conf.Iat, "i", 100*time.Millisecond, flag.DurationVar(&conf.Iat, "i", 100*time.Millisecond,
"Specify the interarrival time in millisecs") "Specify the interarrival time in millisecs")
flag.StringVar(&conf.LogLevel, "l", "info",
"Specify the log level")
flag.StringVar(&conf.Ofname, "o", "lightpush.out", flag.StringVar(&conf.Ofname, "o", "lightpush.out",
"Specify the output file") "Specify the output file")
flag.StringVar(&conf.ContentTopic, "c", "d608b04e6b6fd7006afdfe916f08b5d", flag.StringVar(&conf.ContentTopic, "c", "d608b04e6b6fd7006afdfe916f08b5d",
@ -55,7 +62,7 @@ func main() {
flag.Parse() flag.Parse()
// setup the log // setup the log
lvl, err := logging.LevelFromString("info") lvl, err := logging.LevelFromString(conf.LogLevel)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -74,8 +81,9 @@ func main() {
panic(err) panic(err)
} }
log.Info("config: ", conf)
// find the list of full node fleet peers // find the list of full node fleet peers
fmt.Printf("attempting DNS discovery with %s\n", DnsDiscoveryUrl) log.Info("attempting DNS discovery with: ", DnsDiscoveryUrl)
nodes, err := dnsdisc.RetrieveNodes(ctx, DnsDiscoveryUrl, dnsdisc.WithNameserver(NameServer)) nodes, err := dnsdisc.RetrieveNodes(ctx, DnsDiscoveryUrl, dnsdisc.WithNameserver(NameServer))
if err != nil { if err != nil {
panic(err.Error()) panic(err.Error())
@ -86,23 +94,24 @@ func main() {
for _, n := range nodes { for _, n := range nodes {
nodeList = append(nodeList, n.Addresses...) nodeList = append(nodeList, n.Addresses...)
} }
fmt.Printf("Discovered and connecting to %v \n", nodeList[0]) log.Info("Discovered and connecting to: ", nodeList[0])
peerID, err := nodeList[0].ValueForProtocol(multiaddr.P_P2P) peerID, err := nodeList[0].ValueForProtocol(multiaddr.P_P2P)
if err != nil { if err != nil {
fmt.Printf("could not connect to %s: %s \n", peerID, err) log.Error("could not get peerID: ", err)
panic(err) panic(err)
} }
err = lightNode.DialPeerWithMultiAddress(ctx, nodeList[0]) err = lightNode.DialPeerWithMultiAddress(ctx, nodeList[0])
if err != nil { if err != nil {
fmt.Printf("could not connect to %s: %s \n", peerID, err) log.Error("could not connect to ", peerID, err)
panic(err) panic(err)
} }
fmt.Println("STARTING THE LIGHTNODE ", conf.ContentTopic) log.Info("STARTING THE LIGHTPUSH NODE ", conf.ContentTopic)
// start the light node // start the light node
err = lightNode.Start() err = lightNode.Start()
if err != nil { if err != nil {
log.Error("COULD NOT START THE LIGHTPUSH ", peerID, err)
panic(err) panic(err)
} }
@ -115,23 +124,33 @@ func main() {
} }
func writeLoop(ctx context.Context, conf *Config, wakuNode *node.WakuNode) { func writeLoop(ctx context.Context, conf *Config, wakuNode *node.WakuNode) {
fmt.Println("STARTING THE WRITELOOP ", conf.ContentTopic) log.Info("STARTING THE WRITELOOP ", conf.ContentTopic)
f, err := os.OpenFile(conf.Ofname, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) f, err := os.OpenFile(conf.Ofname, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil { if err != nil {
log.Error("Could not open file: ", err)
panic(err) panic(err)
} }
defer f.Close() defer f.Close()
for { for {
time.Sleep(conf.Iat) time.Sleep(conf.Iat)
seqNumber++
// build the message // build the message & seq number
p := new(payload.Payload) p := new(payload.Payload)
wbuf := new(bytes.Buffer)
err := binary.Write(wbuf, binary.LittleEndian, seqNumber)
if err != nil {
log.Error("binary.Write failed:", err)
panic(err)
}
p.Data = wbuf.Bytes()
var version uint32 = 0 var version uint32 = 0
payload, err := p.Encode(version) payload, err := p.Encode(version)
if err != nil { if err != nil {
log.Error("Could not Encode: ", err) log.Error("Could not Encode: ", err)
panic(err)
} }
msg := &pb.WakuMessage{ msg := &pb.WakuMessage{
Payload: payload, Payload: payload,
@ -151,6 +170,6 @@ func writeLoop(ctx context.Context, conf *Config, wakuNode *node.WakuNode) {
if _, err = f.WriteString(str); err != nil { if _, err = f.WriteString(str); err != nil {
panic(err) panic(err)
} }
fmt.Println("PUBLISHED/PUSHED...", msg) log.Info("PUBLISHED/PUSHED... ", seqNumber, msg)
} }
} }