From 89507b84c4fe01e3e2c8b8ca4ca8ca13e1822e4b Mon Sep 17 00:00:00 2001 From: 0xFugue <119708655+0xFugue@users.noreply.github.com> Date: Sun, 22 Jan 2023 12:55:32 +0530 Subject: [PATCH] added retries & exponential backoff for DialPeer attempts --- waku/filter/filter.go | 24 +++++++++++++++++++++++- waku/lightpush/lightpush.go | 35 ++++++++++++++++++++++++++++------- waku/publish/publish.go | 23 +++++++++++++++++++++++ waku/subscribe/subscribe.go | 22 ++++++++++++++++++++++ 4 files changed, 96 insertions(+), 8 deletions(-) diff --git a/waku/filter/filter.go b/waku/filter/filter.go index bdcf4c8..21e18e6 100644 --- a/waku/filter/filter.go +++ b/waku/filter/filter.go @@ -87,11 +87,33 @@ func main() { log.Error("could not get peerID: ", err) panic(err) } + + // initialise the exponential back off + retries, sleepTime := 0, common.ExpBackOffInit + for { + // try / retry + if err = filterNode.DialPeerWithMultiAddress(ctx, nodeList[0]); err == nil { + break // success! done + } + // failed, back off for sleepTime and retry + log.Error("could not connect to ", peerID, err, + " : will retry in ", sleepTime, " retry# ", retries) + time.Sleep(sleepTime) // back off + retries++ + sleepTime *= 2 // exponential : double the next wait time + // bail out + if retries > common.ExpBackOffRetries { + log.Error("Exhausted retries, could not connect to ", peerID, err, + "number of retries performed ", retries) + panic(err) + } + } + /* err = filterNode.DialPeerWithMultiAddress(ctx, nodeList[0]) if err != nil { log.Error("could not connect to ", peerID, err) panic(err) - } + }*/ log.Info("Starting the ", nodeType, " node ", conf.ContentTopic) // start the light node diff --git a/waku/lightpush/lightpush.go b/waku/lightpush/lightpush.go index ced9e9f..1560f79 100644 --- a/waku/lightpush/lightpush.go +++ b/waku/lightpush/lightpush.go @@ -59,7 +59,7 @@ func main() { // create the waku node hostAddr, _ := net.ResolveTCPAddr("tcp", tcpEndPoint) ctx := context.Background() - lightNode, err := node.New(ctx, + lightpushNode, err := node.New(ctx, node.WithHostAddress(hostAddr), //node.WithNTP(), // don't use NTP, fails at msec granularity //node.WithWakuRelay(), @@ -89,15 +89,36 @@ func main() { panic(err) } + // initialise the exponential back off + retries, sleepTime := 0, common.ExpBackOffInit + for { + // try / retry + if err = lightpushNode.DialPeerWithMultiAddress(ctx, nodeList[0]); err == nil { + break // success! done + } + // failed, back off for sleepTime and retry + log.Error("could not connect to ", peerID, err, + " : will retry in ", sleepTime, " retry# ", retries) + time.Sleep(sleepTime) // back off + retries++ + sleepTime *= 2 // exponential : double the next wait time + // bail out + if retries > common.ExpBackOffRetries { + log.Error("Exhausted retries, could not connect to ", peerID, err, + "number of retries performed ", retries) + panic(err) + } + } + /* err = lightNode.DialPeerWithMultiAddress(ctx, nodeList[0]) if err != nil { log.Error("could not connect to ", peerID, err) panic(err) - } + }*/ log.Info("Starting the ", nodeType, " node ", conf.ContentTopic) // start the light node - err = lightNode.Start() + err = lightpushNode.Start() if err != nil { log.Error("Could not start the", nodeType, " node ", conf.ContentTopic) panic(err) @@ -114,7 +135,7 @@ func main() { } defer f.Close() - prevTStamp := lightNode.Timesource().Now() + prevTStamp := lightpushNode.Timesource().Now() for { time.Sleep(conf.Iat) seqNumber++ @@ -139,11 +160,11 @@ func main() { Payload: payload, Version: version, ContentTopic: conf.ContentTopic, - Timestamp: utils.GetUnixEpochFrom(lightNode.Timesource().Now()), + Timestamp: utils.GetUnixEpochFrom(lightpushNode.Timesource().Now()), } // publish the message - _, err = lightNode.Lightpush().Publish(ctx, msg) + _, err = lightpushNode.Lightpush().Publish(ctx, msg) if err != nil { log.Error("Could not publish: ", err) return @@ -165,5 +186,5 @@ func main() { log.Error(conf.Duration, " elapsed, stopping the " + nodeType + " node!"); // shut the nodes down - lightNode.Stop() + lightpushNode.Stop() } diff --git a/waku/publish/publish.go b/waku/publish/publish.go index 8690943..1a83b83 100644 --- a/waku/publish/publish.go +++ b/waku/publish/publish.go @@ -89,11 +89,34 @@ func main() { panic(err) } + // initialise the exponential back off + retries, sleepTime := 0, common.ExpBackOffInit + for { + // try / retry + if err = pubNode.DialPeerWithMultiAddress(ctx, nodeList[0]); err == nil { + break // success! done + } + // failed, back off for sleepTime and retry + log.Error("could not connect to ", peerID, err, + " : will retry in ", sleepTime, " retry# ", retries) + time.Sleep(sleepTime) // back off + retries++ + sleepTime *= 2 // exponential : double the next wait time + // bail out + if retries > common.ExpBackOffRetries { + log.Error("Exhausted retries, could not connect to ", peerID, err, + "number of retries performed ", retries) + panic(err) + } + } + + /* err = pubNode.DialPeerWithMultiAddress(ctx, nodeList[0]) if err != nil { log.Error("could not connect to ", peerID, err) panic(err) } + */ log.Info("Starting the ", nodeType, " node ", conf.ContentTopic) // start the pub node diff --git a/waku/subscribe/subscribe.go b/waku/subscribe/subscribe.go index d72dfbc..e7c70e6 100644 --- a/waku/subscribe/subscribe.go +++ b/waku/subscribe/subscribe.go @@ -87,11 +87,33 @@ func main() { log.Error("could not get peerID: ", err) panic(err) } + // initialise the exponential back off + retries, sleepTime := 0, common.ExpBackOffInit + for { + // try / retry + if err = subNode.DialPeerWithMultiAddress(ctx, nodeList[0]); err == nil { + break // success! done + } + // failed, back off for sleepTime and retry + log.Error("could not connect to ", peerID, err, + " : will retry in ", sleepTime, " retry# ", retries) + time.Sleep(sleepTime) // back off + retries++ + sleepTime *= 2 // exponential : double the next wait time + // bail out + if retries > common.ExpBackOffRetries { + log.Error("Exhausted retries, could not connect to ", peerID, err, + "number of retries performed ", retries) + panic(err) + } + } + /* err = subNode.DialPeerWithMultiAddress(ctx, nodeList[0]) if err != nil { log.Error("could not connect to ", peerID, err) panic(err) } + */ log.Info("Starting the ", nodeType, " node ", conf.ContentTopic) // start the light node