From 25562d6240cfd89a01645a3d07f5311c222c9c85 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 10 May 2023 10:13:10 -0400 Subject: [PATCH] feat: add peer connection notif channel and allow dialing peers with the peer.AddrInfo --- examples/chat2/chat.go | 48 ++++++++++++++++------------------- examples/chat2/exec.go | 5 +++- mobile/api_discovery.go | 2 +- waku/node.go | 25 +++++++++--------- waku/v2/dnsdisc/enr.go | 25 +++++++++++++----- waku/v2/node/connectedness.go | 25 ++++++++++++++++-- waku/v2/node/wakunode2.go | 9 +++++-- waku/v2/node/wakuoptions.go | 10 +++++++- 8 files changed, 96 insertions(+), 53 deletions(-) diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index 2e6b69f1..c55e26e4 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -46,7 +46,7 @@ type Chat struct { nick string } -func NewChat(ctx context.Context, node *node.WakuNode, options Options) *Chat { +func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.PeerConnection, options Options) *Chat { chat := &Chat{ ctx: ctx, node: node, @@ -114,7 +114,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, options Options) *Chat { } } - chat.wg.Add(6) + chat.wg.Add(7) go chat.parseInput() go chat.receiveMessages() @@ -123,6 +123,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, options Options) *Chat { go chat.welcomeMessage() + go chat.connectionWatcher(&connectionWg, connNotifier) go chat.staticNodes(&connectionWg) go chat.discoverNodes(&connectionWg) go chat.retrieveHistory(&connectionWg) @@ -135,6 +136,18 @@ func (c *Chat) Stop() { close(c.inputChan) } +func (c *Chat) connectionWatcher(connectionWg *sync.WaitGroup, connNotifier <-chan node.PeerConnection) { + defer c.wg.Done() + + for conn := range connNotifier { + if conn.Connected { + c.ui.InfoMessage(fmt.Sprintf("Peer %s connected", conn.PeerID.Pretty())) + } else { + c.ui.InfoMessage(fmt.Sprintf("Peer %s disconnected", conn.PeerID.Pretty())) + } + } +} + func (c *Chat) receiveMessages() { defer c.wg.Done() for { @@ -200,8 +213,6 @@ func (c *Chat) parseInput() { err = c.node.DialPeerWithMultiAddress(ctx, ma) if err != nil { c.ui.ErrorMessage(err) - } else { - c.ui.InfoMessage(fmt.Sprintf("Connected to %s", peerID)) } }(peer) return @@ -447,19 +458,11 @@ func (c *Chat) staticNodes(connectionWg *sync.WaitGroup) { ctx, cancel := context.WithTimeout(c.ctx, time.Duration(10)*time.Second) defer cancel() - peerID, err := addr.ValueForProtocol(multiaddr.P_P2P) - if err != nil { - c.ui.ErrorMessage(err) - return - } - c.ui.InfoMessage(fmt.Sprintf("Connecting to %s", addr.String())) - err = c.node.DialPeerWithMultiAddress(ctx, addr) + err := c.node.DialPeerWithMultiAddress(ctx, addr) if err != nil { c.ui.ErrorMessage(err) - } else { - c.ui.InfoMessage(fmt.Sprintf("Connected to %s", peerID)) } }(n) } @@ -539,30 +542,23 @@ func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) { if err != nil { c.ui.ErrorMessage(errors.New(err.Error())) } else { - var nodeList []multiaddr.Multiaddr + var nodeList []peer.AddrInfo for _, n := range nodes { - nodeList = append(nodeList, n.Addresses...) + nodeList = append(nodeList, n.PeerInfo) } c.ui.InfoMessage(fmt.Sprintf("Discovered and connecting to %v ", nodeList)) wg := sync.WaitGroup{} wg.Add(len(nodeList)) for _, n := range nodeList { - go func(ctx context.Context, addr multiaddr.Multiaddr) { + go func(ctx context.Context, info peer.AddrInfo) { defer wg.Done() - peerID, err := addr.ValueForProtocol(multiaddr.P_P2P) - if err != nil { - c.ui.ErrorMessage(err) - return - } - ctx, cancel := context.WithTimeout(ctx, time.Duration(10)*time.Second) defer cancel() - err = c.node.DialPeerWithMultiAddress(ctx, addr) + err = c.node.DialPeerWithInfo(ctx, n) if err != nil { - c.ui.ErrorMessage(fmt.Errorf("could not connect to %s: %w", peerID, err)) - } else { - c.ui.InfoMessage(fmt.Sprintf("Connected to %s", peerID)) + + c.ui.ErrorMessage(fmt.Errorf("co!!uld not connect to %s: %w", info.ID.Pretty(), err)) } }(c.ctx, n) diff --git a/examples/chat2/exec.go b/examples/chat2/exec.go index 113fd2a7..84e3e73e 100644 --- a/examples/chat2/exec.go +++ b/examples/chat2/exec.go @@ -30,10 +30,13 @@ func execute(options Options) { } } + connNotifier := make(chan node.PeerConnection) + opts := []node.WakuNodeOption{ node.WithPrivateKey(options.NodeKey), node.WithNTP(), node.WithHostAddress(hostAddr), + node.WithConnectionNotification(connNotifier), } if options.Relay.Enable { @@ -130,7 +133,7 @@ func execute(options Options) { return } - chat := NewChat(ctx, wakuNode, options) + chat := NewChat(ctx, wakuNode, connNotifier, options) p := tea.NewProgram(chat.ui) if err := p.Start(); err != nil { fmt.Println(err.Error()) diff --git a/mobile/api_discovery.go b/mobile/api_discovery.go index 520fc120..aeb2a351 100644 --- a/mobile/api_discovery.go +++ b/mobile/api_discovery.go @@ -40,7 +40,7 @@ func DnsDiscovery(url string, nameserver string, ms int) string { item := DnsDiscoveryItem{ PeerID: n.PeerID.String(), } - for _, addr := range n.Addresses { + for _, addr := range n.PeerInfo.Addrs { item.Addresses = append(item.Addresses, addr.String()) } diff --git a/waku/node.go b/waku/node.go index 7375deca..9368b99d 100644 --- a/waku/node.go +++ b/waku/node.go @@ -211,11 +211,11 @@ func Execute(options Options) { if err != nil { logger.Warn("dns discovery error ", zap.Error(err)) } else { - var discAddresses []multiaddr.Multiaddr + var discPeerInfo []peer.AddrInfo for _, n := range nodes { - discAddresses = append(discAddresses, n.Addresses...) + discPeerInfo = append(discPeerInfo, n.PeerInfo) } - logger.Info("found dns entries ", logging.MultiAddrs("nodes", discAddresses...)) + logger.Info("found dns entries ", zap.Any("nodes", discPeerInfo)) discoveredNodes = append(discoveredNodes, nodes...) } } @@ -329,16 +329,15 @@ func Execute(options Options) { if len(discoveredNodes) != 0 { for _, n := range discoveredNodes { - for _, m := range n.Addresses { - go func(ctx context.Context, m multiaddr.Multiaddr) { - ctx, cancel := context.WithTimeout(ctx, dialTimeout) - defer cancel() - err = wakuNode.DialPeerWithMultiAddress(ctx, m) - if err != nil { - logger.Error("dialing peer", logging.MultiAddrs("peer", m), zap.Error(err)) - } - }(ctx, m) - } + go func(ctx context.Context, info peer.AddrInfo) { + ctx, cancel := context.WithTimeout(ctx, dialTimeout) + defer cancel() + err = wakuNode.DialPeerWithInfo(ctx, info) + if err != nil { + logger.Error("dialing peer", logging.HostID("peer", info.ID), zap.Error(err)) + } + }(ctx, n.PeerInfo) + } } diff --git a/waku/v2/dnsdisc/enr.go b/waku/v2/dnsdisc/enr.go index ac32ef38..7129a463 100644 --- a/waku/v2/dnsdisc/enr.go +++ b/waku/v2/dnsdisc/enr.go @@ -9,8 +9,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/metrics" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" - - ma "github.com/multiformats/go-multiaddr" ) type dnsDiscoveryParameters struct { @@ -27,9 +25,9 @@ func WithNameserver(nameserver string) DnsDiscoveryOption { } type DiscoveredNode struct { - PeerID peer.ID - Addresses []ma.Multiaddr - ENR *enode.Node + PeerID peer.ID + PeerInfo peer.AddrInfo + ENR *enode.Node } // RetrieveNodes returns a list of multiaddress given a url to a DNS discoverable ENR tree @@ -58,9 +56,22 @@ func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption) return nil, err } + infoAddr, err := peer.AddrInfosFromP2pAddrs(m...) + if err != nil { + return nil, err + } + + var info peer.AddrInfo + for _, i := range infoAddr { + if i.ID == peerID { + info = i + break + } + } + d := DiscoveredNode{ - PeerID: peerID, - Addresses: m, + PeerID: peerID, + PeerInfo: info, } if hasUDP(node) { diff --git a/waku/v2/node/connectedness.go b/waku/v2/node/connectedness.go index d7181204..5a58cf0c 100644 --- a/waku/v2/node/connectedness.go +++ b/waku/v2/node/connectedness.go @@ -29,23 +29,30 @@ type ConnStatus struct { Peers PeerStats } +type PeerConnection struct { + PeerID peer.ID + Connected bool +} + // ConnectionNotifier is a custom Notifier to be used to display when a peer // connects or disconnects to the node type ConnectionNotifier struct { h host.Host ctx context.Context log *zap.Logger + connNotifCh chan<- PeerConnection DisconnectChan chan peer.ID quit chan struct{} } -func NewConnectionNotifier(ctx context.Context, h host.Host, log *zap.Logger) ConnectionNotifier { +func NewConnectionNotifier(ctx context.Context, h host.Host, connNotifCh chan<- PeerConnection, log *zap.Logger) ConnectionNotifier { return ConnectionNotifier{ h: h, ctx: ctx, DisconnectChan: make(chan peer.ID, 100), + connNotifCh: connNotifCh, quit: make(chan struct{}), - log: log, + log: log.Named("connection-notifier"), } } @@ -60,6 +67,13 @@ func (c ConnectionNotifier) ListenClose(n network.Network, m multiaddr.Multiaddr // Connected is called when a connection is opened func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) { c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer())) + if c.connNotifCh != nil { + select { + case c.connNotifCh <- PeerConnection{cc.RemotePeer(), true}: + default: + c.log.Warn("subscriber is too slow") + } + } stats.Record(c.ctx, metrics.Peers.M(1)) } @@ -68,6 +82,13 @@ func (c ConnectionNotifier) Disconnected(n network.Network, cc network.Conn) { c.log.Info("peer disconnected", logging.HostID("peer", cc.RemotePeer())) stats.Record(c.ctx, metrics.Peers.M(-1)) c.DisconnectChan <- cc.RemotePeer() + if c.connNotifCh != nil { + select { + case c.connNotifCh <- PeerConnection{cc.RemotePeer(), false}: + default: + c.log.Warn("subscriber is too slow") + } + } } // OpenedStream is called when a stream opened diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 5f408563..02515660 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -109,7 +109,7 @@ type WakuNode struct { // Channel passed to WakuNode constructor // receiving connection status notifications - connStatusChan chan ConnStatus + connStatusChan chan<- ConnStatus storeFactory storeFactory } @@ -300,7 +300,7 @@ func (w *WakuNode) Start(ctx context.Context) error { return err } - w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.log) + w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.opts.connNotifCh, w.log) w.host.Network().Notify(w.connectionNotif) w.enrChangeCh = make(chan struct{}, 10) @@ -722,6 +722,11 @@ func (w *WakuNode) DialPeer(ctx context.Context, address string) error { return w.connect(ctx, *info) } +// DialPeerWithInfo is used to connect to a peer using its address information +func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo) error { + return w.connect(ctx, peerInfo) +} + func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { err := w.host.Connect(ctx, info) if err != nil { diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 6222e953..d126c4c3 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -112,7 +112,8 @@ type WakuNodeParameters struct { enableLightPush bool - connStatusC chan ConnStatus + connStatusC chan<- ConnStatus + connNotifCh chan<- PeerConnection storeFactory storeFactory } @@ -433,6 +434,13 @@ func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption { } } +func WithConnectionNotification(ch chan<- PeerConnection) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.connNotifCh = ch + return nil + } +} + // WithWebsockets is a WakuNodeOption used to enable websockets support func WithWebsockets(address string, port int) WakuNodeOption { return func(params *WakuNodeParameters) error {