mirror of
https://github.com/logos-co/wadoku.git
synced 2025-02-23 10:58:28 +00:00
refactored go mods, fixed bugs, cleaned up the runs
This commit is contained in:
parent
620962e001
commit
b5de96eb5c
56
run-waku.sh
56
run-waku.sh
@ -12,18 +12,23 @@ lpush="lightpush"
|
|||||||
# file/dir locations
|
# file/dir locations
|
||||||
prefix="waku"
|
prefix="waku"
|
||||||
docker_op_dir="/go/bin/out"
|
docker_op_dir="/go/bin/out"
|
||||||
enclave="waku-enclave"
|
enclave="enclave-waku"
|
||||||
host_output_dir="/home1/mimosa/runs"
|
host_output_dir="/home1/mimosa/runs"
|
||||||
|
|
||||||
# params for the run
|
# params for the run
|
||||||
ctopic="8fc1f6b30b63bdd0a65df833f1da3fa" # default ctopic, a md5
|
ctopic="8fc1f6b30b63bdd0a65df833f1da3fa" # default ctopic, a md5
|
||||||
duration="100s" # duration of the run
|
duration="100s" # duration of the run
|
||||||
iat="300ms" # pub msg inter-arrival-time
|
iat="100ms" # pub msg inter-arrival-time
|
||||||
loglvl="info" # log level
|
loglvl="info" # log level
|
||||||
|
|
||||||
sleep_time=5
|
duration=$2
|
||||||
|
#iat=$3
|
||||||
|
|
||||||
|
sleep_time=30
|
||||||
time=""
|
time=""
|
||||||
|
|
||||||
|
unique="XYZ"
|
||||||
|
|
||||||
clean(){
|
clean(){
|
||||||
parent=$(pwd) # no pushd / popd
|
parent=$(pwd) # no pushd / popd
|
||||||
#rm -rf ./data
|
#rm -rf ./data
|
||||||
@ -61,7 +66,7 @@ build_docker() {
|
|||||||
|
|
||||||
start() {
|
start() {
|
||||||
time="$(\date +"%Y-%m-%d-%Hh%Mm%Ss")"
|
time="$(\date +"%Y-%m-%d-%Hh%Mm%Ss")"
|
||||||
ctopic="$duration-$iat-$time"
|
ctopic="$duration-$iat-$time-$unique"
|
||||||
echo "\t\t.............................................."
|
echo "\t\t.............................................."
|
||||||
echo "\t\tBEGINNING THE $1 RUN @ $time"
|
echo "\t\tBEGINNING THE $1 RUN @ $time"
|
||||||
echo "\t\tparams: $ctopic"
|
echo "\t\tparams: $ctopic"
|
||||||
@ -71,7 +76,7 @@ start() {
|
|||||||
|
|
||||||
end() {
|
end() {
|
||||||
echo "\t\t $1 RUN DONE @ $(\date +"%Y-%m-%d-%Hh%Mm%Ss") "
|
echo "\t\t $1 RUN DONE @ $(\date +"%Y-%m-%d-%Hh%Mm%Ss") "
|
||||||
echo "\t\t params: $time, $duration, $iat-$ctopic"
|
echo "\t\t params: $ctopic"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -112,22 +117,30 @@ docker_run() {
|
|||||||
#FTDIR="$host_output_dir/data/docker/$time"
|
#FTDIR="$host_output_dir/data/docker/$time"
|
||||||
#LPDIR="$host_output_dir/data/docker/$time"
|
#LPDIR="$host_output_dir/data/docker/$time"
|
||||||
|
|
||||||
[ -d $FTDIR ] || mkdir -p $FTDIR
|
#darg= "--network=host"
|
||||||
ofname="$FTDIR/$filtr.out"
|
|
||||||
echo "docker run $filtr $ofname"
|
|
||||||
docker rm $prefix-$filtr
|
|
||||||
docker run --network=host --name "$prefix-$filtr" "$prefix-$filtr:alpha" -o "$docker_op_dir/$filtr.out" -d $duration -l $loglvl -i $iat > $FTDIR/$filtr.log &
|
|
||||||
echo "$prefix-$filtr is running as $prefix-$filtr"
|
|
||||||
|
|
||||||
sleep $sleep_time
|
|
||||||
|
|
||||||
[ -d $LPDIR ] || mkdir -p $LPDIR
|
[ -d $LPDIR ] || mkdir -p $LPDIR
|
||||||
ofname="$LPDIR/$lpush.out"
|
ofname="$LPDIR/$lpush.out"
|
||||||
echo "docker run $lpush $ofname"
|
echo "docker: stopping and removing $prefix-$lpush"
|
||||||
|
docker stop $prefix-$lpush
|
||||||
docker rm $prefix-$lpush
|
docker rm $prefix-$lpush
|
||||||
docker run --network=host --name "$prefix-$lpush" "$prefix-$lpush:alpha" -o "$docker_op_dir/$lpush.out" -d $duration -i $iat -l $loglvl -c $ctopic> $LPDIR/$lpush.log &
|
echo "docker: starting $prefix-$lpush"
|
||||||
echo "$prefix-$filtr is running as $prefix-$lpush"
|
echo " docker run --name "$prefix-$lpush" -d=true "$prefix-$lpush:alpha" -o "$docker_op_dir/$lpush.out" -d $duration -i $iat -l $loglvl -c $ctopic"
|
||||||
cd $parent
|
docker run --name "$prefix-$lpush" -d=true "$prefix-$lpush:alpha" -o "$docker_op_dir/$lpush.out" -d $duration -i $iat -l $loglvl -c $ctopic
|
||||||
|
|
||||||
|
#sleep $sleep_time
|
||||||
|
|
||||||
|
[ -d $FTDIR ] || mkdir -p $FTDIR
|
||||||
|
ofname="$FTDIR/$filtr.out"
|
||||||
|
echo "docker: stopping and removing $prefix-$filtr"
|
||||||
|
docker stop $prefix-$filtr
|
||||||
|
docker rm $prefix-$filtr
|
||||||
|
echo "docker: starting $prefix-$filtr"
|
||||||
|
echo "docker run --name "$prefix-$filtr" -d=true "$prefix-$filtr:alpha" -o "$docker_op_dir/$filtr.out" -d $duration -l $loglvl -i $iat -c $ctopic "
|
||||||
|
docker run --name "$prefix-$filtr" -d=true "$prefix-$filtr:alpha" -o "$docker_op_dir/$filtr.out" -d $duration -l $loglvl -i $iat -c $ctopic
|
||||||
|
echo "$prefix-$filtr is running as $prefix-$filtr"
|
||||||
|
|
||||||
|
|
||||||
# now wait for runs to complete...
|
# now wait for runs to complete...
|
||||||
echo "$(date): Waiting for the docker run to finish in $duration"
|
echo "$(date): Waiting for the docker run to finish in $duration"
|
||||||
@ -136,10 +149,11 @@ docker_run() {
|
|||||||
# copy the output files
|
# copy the output files
|
||||||
echo "Status code of docker run: ${status_code}"
|
echo "Status code of docker run: ${status_code}"
|
||||||
echo "$(date): copying output files from docker"
|
echo "$(date): copying output files from docker"
|
||||||
docker logs $prefix-$filtr > "$FTDIR/$filtr-docker.log"
|
docker logs $prefix-$filtr > "$FTDIR/$filtr.log"
|
||||||
docker logs $prefix-$lpush > "$LPDIR/$lpush-docker.log"
|
docker logs $prefix-$lpush > "$LPDIR/$lpush.log"
|
||||||
docker cp "$prefix-$filtr:$docker_op_dir/$filtr.out" $FTDIR
|
docker cp "$prefix-$filtr:$docker_op_dir/$filtr.out" $FTDIR
|
||||||
docker cp "$prefix-$lpush:$docker_op_dir/$lpush.out" $LPDIR
|
docker cp "$prefix-$lpush:$docker_op_dir/$lpush.out" $LPDIR
|
||||||
|
cd $parent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -185,14 +199,14 @@ kurtosis_run() {
|
|||||||
docker logs $filtr_cid > $FTDIR/$filtr.log
|
docker logs $filtr_cid > $FTDIR/$filtr.log
|
||||||
docker cp "$lpush_cid:/go/bin/out/$lpush.out" $LPDIR
|
docker cp "$lpush_cid:/go/bin/out/$lpush.out" $LPDIR
|
||||||
docker logs $lpush_cid > $LPDIR/$lpush.log
|
docker logs $lpush_cid > $LPDIR/$lpush.log
|
||||||
kurtosis enclave dump $enclave $FTDIR/kurtosis_dump
|
#kurtosis enclave dump $enclave $FTDIR/kurtosis_dump
|
||||||
echo "Status code of the kurtosis run: ${status_code}"
|
echo "Status code of the kurtosis run: ${status_code}"
|
||||||
cd $parent
|
cd $parent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
echo "$# $1"
|
echo "$# $1 $2"
|
||||||
[ 1 -eq "$#" ] || usage
|
[ 2 -eq "$#" ] || usage
|
||||||
[ metal != $1 -a docker != $1 -a kurtosis != $1 -a clean != $1 ] && usage
|
[ metal != $1 -a docker != $1 -a kurtosis != $1 -a clean != $1 ] && usage
|
||||||
|
|
||||||
|
|
||||||
|
@ -7,10 +7,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const StartPort = 60000
|
const StartPort = 60000
|
||||||
const Offset = 1000
|
const PortRange = 1000
|
||||||
|
|
||||||
const DnsDiscoveryUrl = "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im"
|
const DnsDiscoveryUrl = "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im"
|
||||||
const NameServer = "1.1.1.1"
|
const NameServer = "1.1.1.1"
|
||||||
|
const LocalHost = "0.0.0.0"
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
LogLevel string
|
LogLevel string
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
{
|
{
|
||||||
"output_file": "output.out",
|
"output_file": "output.out",
|
||||||
"duration": "4000s",
|
"duration": "2000s",
|
||||||
"iat": "300ms",
|
"iat": "100ms",
|
||||||
"content_topic": "2023-01-13-23h49m02s-4000s-300ms",
|
"content_topic": "2000s-100ms-2023-01-18-14h33m38s-XYZ",
|
||||||
"log_level": "info",
|
"log_level": "info",
|
||||||
"log_file": "output.log"
|
"log_file": "output.log"
|
||||||
}
|
}
|
||||||
|
@ -1,8 +0,0 @@
|
|||||||
{
|
|
||||||
"output_file": "filter.out",
|
|
||||||
"duration": "1000s",
|
|
||||||
"iat": "500ms",
|
|
||||||
"mount_src": "./data",
|
|
||||||
"mount_target": "/go/bin/out",
|
|
||||||
"log_file": "filter.log"
|
|
||||||
}
|
|
@ -18,7 +18,7 @@ import (
|
|||||||
"github.com/waku-org/go-waku/waku/v2/node"
|
"github.com/waku-org/go-waku/waku/v2/node"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
// "github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"github.com/logos-co/wadoku/waku/common"
|
"github.com/logos-co/wadoku/waku/common"
|
||||||
//"crypto/rand"
|
//"crypto/rand"
|
||||||
//"encoding/hex"
|
//"encoding/hex"
|
||||||
@ -30,7 +30,7 @@ import (
|
|||||||
var log = logging.Logger("filter")
|
var log = logging.Logger("filter")
|
||||||
var pubSubTopic = protocol.DefaultPubsubTopic()
|
var pubSubTopic = protocol.DefaultPubsubTopic()
|
||||||
var conf = common.Config{}
|
var conf = common.Config{}
|
||||||
|
var nodeType = "filter"
|
||||||
//const dnsDiscoveryUrl = "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im"
|
//const dnsDiscoveryUrl = "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im"
|
||||||
//const nameServer = "1.1.1.1" // your local dns provider might be blocking entr
|
//const nameServer = "1.1.1.1" // your local dns provider might be blocking entr
|
||||||
|
|
||||||
@ -52,14 +52,17 @@ func main() {
|
|||||||
}
|
}
|
||||||
logging.SetAllLoggers(lvl)
|
logging.SetAllLoggers(lvl)
|
||||||
|
|
||||||
tcpEndPoint := "0.0.0.0:" + strconv.Itoa(common.StartPort + common.RandInt(0, common.Offset))
|
tcpEndPoint := common.LocalHost +
|
||||||
|
":" +
|
||||||
|
strconv.Itoa(common.StartPort + common.RandInt(0, common.PortRange))
|
||||||
// create the waku node
|
// create the waku node
|
||||||
hostAddr, _ := net.ResolveTCPAddr("tcp", tcpEndPoint)
|
hostAddr, _ := net.ResolveTCPAddr("tcp", tcpEndPoint)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
lightNode, err := node.New(ctx,
|
filterNode, err := node.New(ctx,
|
||||||
//node.WithWakuRelay(),
|
//node.WithWakuRelay(),
|
||||||
|
//node.WithNTP(), // don't use NTP, fails at msec granularity
|
||||||
node.WithHostAddress(hostAddr),
|
node.WithHostAddress(hostAddr),
|
||||||
node.WithWakuFilter(false),
|
node.WithWakuFilter(false), // we do NOT want a full node
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -84,27 +87,27 @@ func main() {
|
|||||||
log.Error("could not get peerID: ", err)
|
log.Error("could not get peerID: ", err)
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
err = lightNode.DialPeerWithMultiAddress(ctx, nodeList[0])
|
err = filterNode.DialPeerWithMultiAddress(ctx, nodeList[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("could not connect to ", peerID, err)
|
log.Error("could not connect to ", peerID, err)
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("STARTING THE FILTER NODE ", conf.ContentTopic)
|
log.Info("Starting the ", nodeType, " node ", conf.ContentTopic)
|
||||||
// start the light node
|
// start the light node
|
||||||
err = lightNode.Start()
|
err = filterNode.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("COULD NOT START THE FILTER NODE ", conf.ContentTopic)
|
log.Error("Could not start the", nodeType, " node ", conf.ContentTopic)
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("SUBSCRIBING TO THE TOPIC ", conf.ContentTopic)
|
log.Info("Subscribing to the content topic", conf.ContentTopic)
|
||||||
// Subscribe to our ContentTopic and send a FilterRequest
|
// Subscribe to our ContentTopic and send a FilterRequest
|
||||||
cf := filter.ContentFilter{
|
cf := filter.ContentFilter{
|
||||||
Topic: pubSubTopic.String(),
|
Topic: pubSubTopic.String(),
|
||||||
ContentTopics: []string{conf.ContentTopic},
|
ContentTopics: []string{conf.ContentTopic},
|
||||||
}
|
}
|
||||||
_, theFilter, err := lightNode.Filter().Subscribe(ctx, cf)
|
_, theFilter, err := filterNode.Filter().Subscribe(ctx, cf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
@ -118,9 +121,9 @@ func main() {
|
|||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
|
log.Info("Waiting to receive the message")
|
||||||
for env := range theFilter.Chan {
|
for env := range theFilter.Chan {
|
||||||
msg := env.Message()
|
msg := env.Message()
|
||||||
log.Info("Light node received msg, ", string(msg.Payload))
|
|
||||||
|
|
||||||
rbuf := bytes.NewBuffer(msg.Payload)
|
rbuf := bytes.NewBuffer(msg.Payload)
|
||||||
var r32 int32 //:= make([]int64, (len(msg.Payload)+7)/8)
|
var r32 int32 //:= make([]int64, (len(msg.Payload)+7)/8)
|
||||||
@ -131,19 +134,21 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
msg_delay := time.Since(time.Unix(0, msg.Timestamp))
|
msg_delay := time.Since(time.Unix(0, msg.Timestamp))
|
||||||
str := fmt.Sprintf("Received msg, @", r32, "@", string(msg.ContentTopic), "@", msg.Timestamp, "@", utils.GetUnixEpochFrom(lightNode.Timesource().Now()), "@", msg_delay.Microseconds(), "@", msg_delay.Milliseconds() )
|
str := fmt.Sprintf("GOT : %d %s %d %d\n", r32, msg, msg_delay.Microseconds(), msg_delay.Milliseconds())
|
||||||
log.Info(str)
|
//str := fmt.Sprintf("GOT: %d %s %s %s %s\n", r32, msg, utils.GetUnixEpochFrom(lightNode.Timesource().Now()), msg_delay.Microseconds(), msg_delay.Milliseconds())
|
||||||
//"Received msg, @", string(msg.ContentTopic), "@", msg.Timestamp, "@", utils.GetUnixEpochFrom(lightNode.Timesource().Now()) )
|
//"Received msg, @", string(msg.ContentTopic), "@", msg.Timestamp, "@", utils.GetUnixEpochFrom(lightNode.Timesource().Now()) )
|
||||||
if _, err = f.WriteString(str + "\n"); err != nil {
|
log.Info(str)
|
||||||
|
if _, err = f.WriteString(str); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Info("Message channel closed!")
|
log.Error("Out of the Write loop: Message channel closed - timeout")
|
||||||
stopC <- struct{}{}
|
stopC <- struct{}{}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
<-time.After(conf.Duration)
|
<-time.After(conf.Duration)
|
||||||
|
log.Error(conf.Duration, " elapsed, closing the " + nodeType + " node!");
|
||||||
|
|
||||||
// shut the nodes down
|
// shut the nodes down
|
||||||
lightNode.Stop()
|
filterNode.Stop()
|
||||||
}
|
}
|
||||||
|
@ -1,8 +0,0 @@
|
|||||||
{
|
|
||||||
"output_file": "lightpush.out",
|
|
||||||
"duration": "1000s",
|
|
||||||
"iat": "500ms",
|
|
||||||
"mount_src": "./data",
|
|
||||||
"mount_target": "/go/bin/out",
|
|
||||||
"log_file": "lightpush.log"
|
|
||||||
}
|
|
@ -33,6 +33,7 @@ import (
|
|||||||
var log = logging.Logger("lightpush")
|
var log = logging.Logger("lightpush")
|
||||||
var seqNumber int32 = 0
|
var seqNumber int32 = 0
|
||||||
var conf = common.Config{}
|
var conf = common.Config{}
|
||||||
|
var nodeType = "lightpush"
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
// args
|
// args
|
||||||
@ -52,14 +53,16 @@ func main() {
|
|||||||
}
|
}
|
||||||
logging.SetAllLoggers(lvl)
|
logging.SetAllLoggers(lvl)
|
||||||
|
|
||||||
tcpEndPoint := "0.0.0.0:" + strconv.Itoa(common.StartPort + common.RandInt(0, common.Offset))
|
tcpEndPoint := common.LocalHost +
|
||||||
|
":" +
|
||||||
|
strconv.Itoa(common.StartPort + common.RandInt(0, common.PortRange))
|
||||||
// create the waku node
|
// create the waku node
|
||||||
hostAddr, _ := net.ResolveTCPAddr("tcp", tcpEndPoint)
|
hostAddr, _ := net.ResolveTCPAddr("tcp", tcpEndPoint)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
lightNode, err := node.New(ctx,
|
lightNode, err := node.New(ctx,
|
||||||
node.WithWakuRelay(),
|
|
||||||
node.WithHostAddress(hostAddr),
|
node.WithHostAddress(hostAddr),
|
||||||
node.WithWakuFilter(false),
|
//node.WithNTP(), // don't use NTP, fails at msec granularity
|
||||||
|
node.WithWakuRelay(),
|
||||||
node.WithLightPush(),
|
node.WithLightPush(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -92,37 +95,31 @@ func main() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("STARTING THE LIGHTPUSH NODE ", conf.ContentTopic)
|
log.Info("Starting the ", nodeType, " node ", conf.ContentTopic)
|
||||||
// start the light node
|
// start the light node
|
||||||
err = lightNode.Start()
|
err = lightNode.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("COULD NOT START THE LIGHTPUSH ", peerID, err)
|
log.Error("Could not start the", nodeType, " node ", conf.ContentTopic)
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
go writeLoop(ctx, &conf, lightNode)
|
go func() {
|
||||||
|
log.Info("IN THE WRITELOOP ", conf.ContentTopic)
|
||||||
|
|
||||||
<-time.After(conf.Duration)
|
f, err := os.OpenFile(conf.Ofname,
|
||||||
|
os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
|
||||||
// shut the nodes down
|
|
||||||
lightNode.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func writeLoop(ctx context.Context, conf *common.Config, wakuNode *node.WakuNode) {
|
|
||||||
log.Info("STARTING THE WRITELOOP ", conf.ContentTopic)
|
|
||||||
|
|
||||||
f, err := os.OpenFile(conf.Ofname, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Could not open file: ", err)
|
log.Error("Could not open file: ", err)
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
|
prevTStamp := lightNode.Timesource().Now()
|
||||||
for {
|
for {
|
||||||
time.Sleep(conf.Iat)
|
time.Sleep(conf.Iat)
|
||||||
seqNumber++
|
seqNumber++
|
||||||
|
|
||||||
// build the message & seq number
|
// build the message with seq number //TODO: message size
|
||||||
p := new(payload.Payload)
|
p := new(payload.Payload)
|
||||||
wbuf := new(bytes.Buffer)
|
wbuf := new(bytes.Buffer)
|
||||||
err := binary.Write(wbuf, binary.LittleEndian, seqNumber)
|
err := binary.Write(wbuf, binary.LittleEndian, seqNumber)
|
||||||
@ -131,6 +128,7 @@ func writeLoop(ctx context.Context, conf *common.Config, wakuNode *node.WakuNode
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
p.Data = wbuf.Bytes()
|
p.Data = wbuf.Bytes()
|
||||||
|
|
||||||
var version uint32 = 0
|
var version uint32 = 0
|
||||||
payload, err := p.Encode(version)
|
payload, err := p.Encode(version)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -141,20 +139,30 @@ func writeLoop(ctx context.Context, conf *common.Config, wakuNode *node.WakuNode
|
|||||||
Payload: payload,
|
Payload: payload,
|
||||||
Version: version,
|
Version: version,
|
||||||
ContentTopic: conf.ContentTopic,
|
ContentTopic: conf.ContentTopic,
|
||||||
Timestamp: utils.GetUnixEpochFrom(wakuNode.Timesource().Now()),
|
Timestamp: utils.GetUnixEpochFrom(lightNode.Timesource().Now()),
|
||||||
}
|
}
|
||||||
|
|
||||||
// publish the message
|
// publish the message
|
||||||
_, err = wakuNode.Lightpush().Publish(ctx, msg)
|
_, err = lightNode.Lightpush().Publish(ctx, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Could not publish: ", err)
|
log.Error("Could not publish: ", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
str := fmt.Sprintf("MSG: %s\n", msg)
|
iat := time.Since(prevTStamp)
|
||||||
|
str := fmt.Sprintf("SENT: %d %s %d\n", seqNumber, msg, iat.Milliseconds())
|
||||||
if _, err = f.WriteString(str); err != nil {
|
if _, err = f.WriteString(str); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
log.Info("PUBLISHED/PUSHED... ", seqNumber, msg)
|
log.Info(str)
|
||||||
|
prevTStamp = time.Unix(0, msg.Timestamp)
|
||||||
}
|
}
|
||||||
|
log.Error("Out of the Write loop: Message channel closed - timeout")
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-time.After(conf.Duration)
|
||||||
|
log.Error(conf.Duration, " elapsed, stopping the " + nodeType + " node!");
|
||||||
|
|
||||||
|
// shut the nodes down
|
||||||
|
lightNode.Stop()
|
||||||
}
|
}
|
||||||
|
165
waku/publish/publish.go
Normal file
165
waku/publish/publish.go
Normal file
@ -0,0 +1,165 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"bytes"
|
||||||
|
//"math/rand"
|
||||||
|
"strconv"
|
||||||
|
"encoding/binary"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/node"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/payload"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
|
"github.com/logos-co/wadoku/waku/common"
|
||||||
|
|
||||||
|
//"crypto/rand"
|
||||||
|
//"encoding/hex"
|
||||||
|
//"github.com/ethereum/go-ethereum/crypto"
|
||||||
|
//"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
|
//"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
|
|
||||||
|
//"github.com/wadoku/wadoku/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("lightpush")
|
||||||
|
var seqNumber int32 = 0
|
||||||
|
var conf = common.Config{}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// args
|
||||||
|
fmt.Println("Populating CLI params...")
|
||||||
|
common.ArgInit(&conf)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
// setup the log
|
||||||
|
lvl, err := logging.LevelFromString(conf.LogLevel)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
logging.SetAllLoggers(lvl)
|
||||||
|
|
||||||
|
tcpEndPoint := "0.0.0.0:" + strconv.Itoa(common.StartPort + common.RandInt(0, common.Offset))
|
||||||
|
// create the waku node
|
||||||
|
hostAddr, _ := net.ResolveTCPAddr("tcp", tcpEndPoint)
|
||||||
|
ctx := context.Background()
|
||||||
|
wakuNode, err := node.New(ctx,
|
||||||
|
node.WithHostAddress(hostAddr),
|
||||||
|
//node.WithNTP(), // don't use NTP, fails at msec granularity
|
||||||
|
node.WithWakuRelay(),
|
||||||
|
node.WithWakuFilter(false),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("config: ", conf)
|
||||||
|
// find the list of full node fleet peers
|
||||||
|
log.Info("attempting DNS discovery with: ", common.DnsDiscoveryUrl)
|
||||||
|
nodes, err := dnsdisc.RetrieveNodes(ctx, common.DnsDiscoveryUrl, dnsdisc.WithNameserver(common.NameServer))
|
||||||
|
if err != nil {
|
||||||
|
panic(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// connect to the first peer
|
||||||
|
var nodeList []multiaddr.Multiaddr
|
||||||
|
for _, n := range nodes {
|
||||||
|
nodeList = append(nodeList, n.Addresses...)
|
||||||
|
}
|
||||||
|
log.Info("Discovered and connecting to: ", nodeList[0])
|
||||||
|
peerID, err := nodeList[0].ValueForProtocol(multiaddr.P_P2P)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("could not get peerID: ", err)
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = wakuNode.DialPeerWithMultiAddress(ctx, nodeList[0])
|
||||||
|
if err != nil {
|
||||||
|
log.Error("could not connect to ", peerID, err)
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("STARTING THE LIGHTPUSH NODE ", conf.ContentTopic)
|
||||||
|
// start the light node
|
||||||
|
err = wakuNode.Start()
|
||||||
|
if err != nil {
|
||||||
|
log.Error("COULD NOT START THE LIGHTPUSH ", peerID, err)
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
go writeLoop(ctx, &conf, wakuNode)
|
||||||
|
|
||||||
|
<-time.After(conf.Duration)
|
||||||
|
log.Error(conf.Duration, " elapsed, closing the node!");
|
||||||
|
|
||||||
|
// shut the nodes down
|
||||||
|
wakuNode.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeLoop(ctx context.Context, conf *common.Config, wakuNode *node.WakuNode) {
|
||||||
|
log.Info("IN THE WRITELOOP ", conf.ContentTopic)
|
||||||
|
|
||||||
|
f, err := os.OpenFile(conf.Ofname, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Could not open file: ", err)
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
prevTStamp := wakuNode.Timesource().Now()
|
||||||
|
for {
|
||||||
|
time.Sleep(conf.Iat)
|
||||||
|
seqNumber++
|
||||||
|
|
||||||
|
// build the message & seq number
|
||||||
|
p := new(payload.Payload)
|
||||||
|
wbuf := new(bytes.Buffer)
|
||||||
|
err := binary.Write(wbuf, binary.LittleEndian, seqNumber)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("binary.Write failed:", err)
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
p.Data = wbuf.Bytes()
|
||||||
|
var version uint32 = 0
|
||||||
|
payload, err := p.Encode(version)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Could not Encode: ", err)
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
msg := &pb.WakuMessage{
|
||||||
|
Payload: payload,
|
||||||
|
Version: version,
|
||||||
|
ContentTopic: conf.ContentTopic,
|
||||||
|
Timestamp: utils.GetUnixEpochFrom(wakuNode.Timesource().Now()),
|
||||||
|
}
|
||||||
|
|
||||||
|
// publish the message
|
||||||
|
_, err = wakuNode.Relay().Publish(ctx, msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Could not publish: ", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
iat := time.Since(prevTStamp)
|
||||||
|
str := fmt.Sprintf("SENT: %d %s %d\n", seqNumber, msg, iat.Milliseconds())
|
||||||
|
if _, err = f.WriteString(str); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
log.Info(str)
|
||||||
|
prevTStamp = time.Unix(0, msg.Timestamp)
|
||||||
|
}
|
||||||
|
log.Error("Out of the Write loop: Message channel closed (timeout?)!")
|
||||||
|
}
|
155
waku/subscribe/subscribe.go
Normal file
155
waku/subscribe/subscribe.go
Normal file
@ -0,0 +1,155 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"bytes"
|
||||||
|
// "math/rand"
|
||||||
|
"strconv"
|
||||||
|
"encoding/binary"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/node"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
|
// "github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
|
// "github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
|
"github.com/logos-co/wadoku/waku/common"
|
||||||
|
//"crypto/rand"
|
||||||
|
//"encoding/hex"
|
||||||
|
//"github.com/ethereum/go-ethereum/crypto"
|
||||||
|
//"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
|
//"github.com/waku-org/go-waku/waku/v2/payload"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("subscribe")
|
||||||
|
var pubSubTopic = protocol.DefaultPubsubTopic()
|
||||||
|
var conf = common.Config{}
|
||||||
|
|
||||||
|
//const dnsDiscoveryUrl = "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im"
|
||||||
|
//const nameServer = "1.1.1.1" // your local dns provider might be blocking entr
|
||||||
|
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// args
|
||||||
|
fmt.Println("Populating CLI params...")
|
||||||
|
common.ArgInit(&conf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
// setup the log
|
||||||
|
lvl, err := logging.LevelFromString(conf.LogLevel)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
logging.SetAllLoggers(lvl)
|
||||||
|
|
||||||
|
tcpEndPoint := "0.0.0.0:" + strconv.Itoa(common.StartPort + common.RandInt(0, common.Offset))
|
||||||
|
// create the waku node
|
||||||
|
hostAddr, _ := net.ResolveTCPAddr("tcp", tcpEndPoint)
|
||||||
|
ctx := context.Background()
|
||||||
|
wakuNode, err := node.New(ctx,
|
||||||
|
//node.WithNTP(), // don't use NTP, fails at msec granularity
|
||||||
|
node.WithWakuRelay(),
|
||||||
|
node.WithHostAddress(hostAddr),
|
||||||
|
node.WithWakuFilter(false),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("CONFIG : ", conf)
|
||||||
|
// find the list of full node fleet peers
|
||||||
|
log.Info("attempting DNS discovery with: ", common.DnsDiscoveryUrl)
|
||||||
|
nodes, err := dnsdisc.RetrieveNodes(ctx, common.DnsDiscoveryUrl, dnsdisc.WithNameserver(common.NameServer))
|
||||||
|
if err != nil {
|
||||||
|
panic(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// connect to the first peer
|
||||||
|
var nodeList []multiaddr.Multiaddr
|
||||||
|
for _, n := range nodes {
|
||||||
|
nodeList = append(nodeList, n.Addresses...)
|
||||||
|
}
|
||||||
|
log.Info("Discovered and connecting to: ", nodeList[0])
|
||||||
|
peerID, err := nodeList[0].ValueForProtocol(multiaddr.P_P2P)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("could not get peerID: ", err)
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
err = wakuNode.DialPeerWithMultiAddress(ctx, nodeList[0])
|
||||||
|
if err != nil {
|
||||||
|
log.Error("could not connect to ", peerID, err)
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("STARTING THE SUB NODE ", conf.ContentTopic)
|
||||||
|
// start the sub node
|
||||||
|
err = wakuNode.Start()
|
||||||
|
if err != nil {
|
||||||
|
log.Error("COULD NOT START THE SUB NODE ", conf.ContentTopic)
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("SUBSCRIBING TO THE TOPIC ", conf.ContentTopic)
|
||||||
|
// Subscribe to our ContentTopic and send Sub Request
|
||||||
|
/*cf := filter.ContentFilter{
|
||||||
|
Topic: pubSubTopic.String(),
|
||||||
|
ContentTopics: []string{conf.ContentTopic},
|
||||||
|
}*/
|
||||||
|
sub, err := wakuNode.Relay().Subscribe(ctx)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stopC := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
f, err := os.OpenFile(conf.Ofname, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Could not open file: ", err)
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
log.Info("Waiting to receive the message")
|
||||||
|
for env := range sub.C {
|
||||||
|
msg := env.Message()
|
||||||
|
|
||||||
|
if msg.ContentTopic != conf.ContentTopic {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rbuf := bytes.NewBuffer(msg.Payload)
|
||||||
|
var r32 int32 //:= make([]int64, (len(msg.Payload)+7)/8)
|
||||||
|
err = binary.Read(rbuf, binary.LittleEndian, &r32)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("binary.Read failed:", err)
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rtt := time.Since(time.Unix(0, msg.Timestamp))
|
||||||
|
str := fmt.Sprintf("GOT: %d %s %d %d\n", r32, msg, rtt.Microseconds(), rtt.Milliseconds())
|
||||||
|
//str := fmt.Sprintf("GOT: %d %s %s %s %s\n", r32, msg, utils.GetUnixEpochFrom(lightNode.Timesource().Now()), msg_delay.Microseconds(), msg_delay.Milliseconds())
|
||||||
|
//"Received msg, @", string(msg.ContentTopic), "@", msg.Timestamp, "@", utils.GetUnixEpochFrom(lightNode.Timesource().Now()) )
|
||||||
|
log.Info(str)
|
||||||
|
if _, err = f.WriteString(str); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Error("Out of the Write loop: Message channel closed (timeout?)!")
|
||||||
|
stopC <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-time.After(conf.Duration)
|
||||||
|
log.Error(conf.Duration, " elapsed, closing the node!");
|
||||||
|
|
||||||
|
// shut the nodes down
|
||||||
|
wakuNode.Stop()
|
||||||
|
}
|
@ -1,3 +0,0 @@
|
|||||||
module github.com/logos-co/wadoku/waku/utils
|
|
||||||
|
|
||||||
go 1.18
|
|
@ -1,35 +0,0 @@
|
|||||||
package utils
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
"math/rand"
|
|
||||||
"flag"
|
|
||||||
)
|
|
||||||
|
|
||||||
const DnsDiscoveryUrl = "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im"
|
|
||||||
const NameServer = "1.1.1.1"
|
|
||||||
|
|
||||||
type Config struct {
|
|
||||||
LogLevel string
|
|
||||||
Ofname string
|
|
||||||
ContentTopic string
|
|
||||||
Iat time.Duration
|
|
||||||
Duration time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
func RandInt(min, max int) int {
|
|
||||||
return min + rand.Intn(max - min)
|
|
||||||
}
|
|
||||||
|
|
||||||
func ArgInit(conf *Config){
|
|
||||||
flag.DurationVar(&(*conf).Duration, "d", 1000*time.Second,
|
|
||||||
"Specify the duration (1s,2m,4h)")
|
|
||||||
flag.DurationVar(&(*conf).Iat, "i", 300*time.Millisecond,
|
|
||||||
"Specify the interarrival time in millisecs")
|
|
||||||
flag.StringVar(&(*conf).LogLevel, "l", "info",
|
|
||||||
"Specify the log level")
|
|
||||||
flag.StringVar(&(*conf).Ofname, "o", "lightpush.out",
|
|
||||||
"Specify the output file")
|
|
||||||
flag.StringVar(&(*conf).ContentTopic, "c", "d608b04e6b6fd7006afdfe916f08b5d",
|
|
||||||
"Specify the content topic")
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user