From 4ce04014d164ee375d7dc41b15d2764e32c5430c Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 6 Sep 2021 09:10:19 -0400 Subject: [PATCH] fix: make resume optional and trigger connect/disconnect when dialing a peer or dropping it --- examples/chat2/main.go | 2 +- examples/peer_events/main.go | 2 +- waku/node.go | 2 +- waku/v2/node/wakunode2.go | 35 ++++++++++++++++++++++++++++++----- waku/v2/node/wakuoptions.go | 10 ++++++---- 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/examples/chat2/main.go b/examples/chat2/main.go index c1f3a695..77e1667a 100644 --- a/examples/chat2/main.go +++ b/examples/chat2/main.go @@ -68,7 +68,7 @@ func main() { opts := []node.WakuNodeOption{ node.WithPrivateKey(prvKey), node.WithHostAddress([]net.Addr{hostAddr}), - node.WithWakuStore(false), + node.WithWakuStore(false, true), node.WithKeepAlive(time.Duration(*keepAliveFlag) * time.Second), } diff --git a/examples/peer_events/main.go b/examples/peer_events/main.go index cf7079fa..d3a6ff0b 100644 --- a/examples/peer_events/main.go +++ b/examples/peer_events/main.go @@ -66,7 +66,7 @@ func main() { node.WithHostAddress([]net.Addr{addrsAndKeys[0].addr}), node.WithWakuRelay(), //node.WithConnStatusChan(connStatusChan), - node.WithWakuStore(true), + node.WithWakuStore(true, false), node.WithKeepAlive(time.Duration(2)*time.Second), ) diff --git a/waku/node.go b/waku/node.go index c85867e1..9e7a4a93 100644 --- a/waku/node.go +++ b/waku/node.go @@ -143,7 +143,7 @@ var rootCmd = &cobra.Command{ } if store { - nodeOpts = append(nodeOpts, node.WithWakuStore(true)) + nodeOpts = append(nodeOpts, node.WithWakuStore(true, true)) if useDB { dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) checkError(err, "DBStore") diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 332a7d5f..f06a500a 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -409,8 +409,11 @@ func (w *WakuNode) startStore() { peerChan := make(chan *event.EvtPeerConnectednessChanged) w.opts.store.Start(w.ctx, w.host, peerChan) w.peerListeners = append(w.peerListeners, peerChan) - if _, err := w.opts.store.Resume(string(relay.GetTopic(nil)), nil); err != nil { - log.Error("failed to resume", err) + + if w.opts.shouldResume { + if _, err := w.opts.store.Resume(string(relay.GetTopic(nil)), nil); err != nil { + log.Error("failed to resume", err) + } } } @@ -693,12 +696,26 @@ func (w *WakuNode) DialPeer(address string) error { return err } - return w.host.Connect(w.ctx, *info) + return w.connect(*info) +} + +func (w *WakuNode) connect(info peer.AddrInfo) error { + err := w.host.Connect(w.ctx, info) + if err != nil { + return err + } + + w.processHostEvent(event.EvtPeerConnectednessChanged{ + Peer: info.ID, + Connectedness: network.Connected, + }) + + return nil } func (w *WakuNode) DialPeerByID(peerID peer.ID) error { info := w.host.Peerstore().PeerInfo(peerID) - return w.host.Connect(w.ctx, info) + return w.connect(info) } func (w *WakuNode) ClosePeerByAddress(address string) error { @@ -717,7 +734,15 @@ func (w *WakuNode) ClosePeerByAddress(address string) error { } func (w *WakuNode) ClosePeerById(id peer.ID) error { - return w.host.Network().ClosePeer(id) + err := w.host.Network().ClosePeer(id) + if err != nil { + return err + } + + w.processHostEvent(event.EvtPeerConnectednessChanged{ + Peer: id, + Connectedness: network.NotConnected, + }) } func (w *WakuNode) PeerCount() int { diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 2ff14021..3dbd9c89 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -26,9 +26,10 @@ type WakuNodeParameters struct { enableFilter bool wOpts []wakurelay.Option - enableStore bool - storeMsgs bool - store *store.WakuStore + enableStore bool + shouldResume bool + storeMsgs bool + store *store.WakuStore // filter *filter.WakuFilter keepAliveInterval time.Duration @@ -107,11 +108,12 @@ func WithWakuFilter(opts ...wakurelay.Option) WakuNodeOption { // WithWakuStore enables the Waku V2 Store protocol and if the messages should // be stored or not in a message provider -func WithWakuStore(shouldStoreMessages bool) WakuNodeOption { +func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableStore = true params.storeMsgs = shouldStoreMessages params.store = store.NewWakuStore(shouldStoreMessages, nil) + params.shouldResume = shouldResume return nil } }