fix: make resume optional and trigger connect/disconnect when dialing a peer or dropping it

This commit is contained in:
Richard Ramos 2021-09-06 09:10:19 -04:00 committed by Vitaliy Vlasov
parent cf32e10236
commit 4ce04014d1
5 changed files with 39 additions and 12 deletions

View File

@ -68,7 +68,7 @@ func main() {
opts := []node.WakuNodeOption{ opts := []node.WakuNodeOption{
node.WithPrivateKey(prvKey), node.WithPrivateKey(prvKey),
node.WithHostAddress([]net.Addr{hostAddr}), node.WithHostAddress([]net.Addr{hostAddr}),
node.WithWakuStore(false), node.WithWakuStore(false, true),
node.WithKeepAlive(time.Duration(*keepAliveFlag) * time.Second), node.WithKeepAlive(time.Duration(*keepAliveFlag) * time.Second),
} }

View File

@ -66,7 +66,7 @@ func main() {
node.WithHostAddress([]net.Addr{addrsAndKeys[0].addr}), node.WithHostAddress([]net.Addr{addrsAndKeys[0].addr}),
node.WithWakuRelay(), node.WithWakuRelay(),
//node.WithConnStatusChan(connStatusChan), //node.WithConnStatusChan(connStatusChan),
node.WithWakuStore(true), node.WithWakuStore(true, false),
node.WithKeepAlive(time.Duration(2)*time.Second), node.WithKeepAlive(time.Duration(2)*time.Second),
) )

View File

@ -143,7 +143,7 @@ var rootCmd = &cobra.Command{
} }
if store { if store {
nodeOpts = append(nodeOpts, node.WithWakuStore(true)) nodeOpts = append(nodeOpts, node.WithWakuStore(true, true))
if useDB { if useDB {
dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
checkError(err, "DBStore") checkError(err, "DBStore")

View File

@ -409,10 +409,13 @@ func (w *WakuNode) startStore() {
peerChan := make(chan *event.EvtPeerConnectednessChanged) peerChan := make(chan *event.EvtPeerConnectednessChanged)
w.opts.store.Start(w.ctx, w.host, peerChan) w.opts.store.Start(w.ctx, w.host, peerChan)
w.peerListeners = append(w.peerListeners, peerChan) w.peerListeners = append(w.peerListeners, peerChan)
if w.opts.shouldResume {
if _, err := w.opts.store.Resume(string(relay.GetTopic(nil)), nil); err != nil { if _, err := w.opts.store.Resume(string(relay.GetTopic(nil)), nil); err != nil {
log.Error("failed to resume", err) log.Error("failed to resume", err)
} }
} }
}
func (w *WakuNode) addPeerWithProtocol(address string, proto p2pproto.ID) (*peer.ID, error) { func (w *WakuNode) addPeerWithProtocol(address string, proto p2pproto.ID) (*peer.ID, error) {
info, err := addrInfoFromMultiaddrString(address) info, err := addrInfoFromMultiaddrString(address)
@ -693,12 +696,26 @@ func (w *WakuNode) DialPeer(address string) error {
return err return err
} }
return w.host.Connect(w.ctx, *info) return w.connect(*info)
}
func (w *WakuNode) connect(info peer.AddrInfo) error {
err := w.host.Connect(w.ctx, info)
if err != nil {
return err
}
w.processHostEvent(event.EvtPeerConnectednessChanged{
Peer: info.ID,
Connectedness: network.Connected,
})
return nil
} }
func (w *WakuNode) DialPeerByID(peerID peer.ID) error { func (w *WakuNode) DialPeerByID(peerID peer.ID) error {
info := w.host.Peerstore().PeerInfo(peerID) info := w.host.Peerstore().PeerInfo(peerID)
return w.host.Connect(w.ctx, info) return w.connect(info)
} }
func (w *WakuNode) ClosePeerByAddress(address string) error { func (w *WakuNode) ClosePeerByAddress(address string) error {
@ -717,7 +734,15 @@ func (w *WakuNode) ClosePeerByAddress(address string) error {
} }
func (w *WakuNode) ClosePeerById(id peer.ID) error { func (w *WakuNode) ClosePeerById(id peer.ID) error {
return w.host.Network().ClosePeer(id) err := w.host.Network().ClosePeer(id)
if err != nil {
return err
}
w.processHostEvent(event.EvtPeerConnectednessChanged{
Peer: id,
Connectedness: network.NotConnected,
})
} }
func (w *WakuNode) PeerCount() int { func (w *WakuNode) PeerCount() int {

View File

@ -27,6 +27,7 @@ type WakuNodeParameters struct {
wOpts []wakurelay.Option wOpts []wakurelay.Option
enableStore bool enableStore bool
shouldResume bool
storeMsgs bool storeMsgs bool
store *store.WakuStore store *store.WakuStore
// filter *filter.WakuFilter // filter *filter.WakuFilter
@ -107,11 +108,12 @@ func WithWakuFilter(opts ...wakurelay.Option) WakuNodeOption {
// WithWakuStore enables the Waku V2 Store protocol and if the messages should // WithWakuStore enables the Waku V2 Store protocol and if the messages should
// be stored or not in a message provider // be stored or not in a message provider
func WithWakuStore(shouldStoreMessages bool) WakuNodeOption { func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.enableStore = true params.enableStore = true
params.storeMsgs = shouldStoreMessages params.storeMsgs = shouldStoreMessages
params.store = store.NewWakuStore(shouldStoreMessages, nil) params.store = store.NewWakuStore(shouldStoreMessages, nil)
params.shouldResume = shouldResume
return nil return nil
} }
} }