feat: add peer connection notif channel and allow dialing peers with the peer.AddrInfo

This commit is contained in:
Richard Ramos 2023-05-10 10:13:10 -04:00 committed by RichΛrd
parent 38a9fc4b19
commit 25562d6240
8 changed files with 96 additions and 53 deletions

View File

@ -46,7 +46,7 @@ type Chat struct {
nick string
}
func NewChat(ctx context.Context, node *node.WakuNode, options Options) *Chat {
func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.PeerConnection, options Options) *Chat {
chat := &Chat{
ctx: ctx,
node: node,
@ -114,7 +114,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, options Options) *Chat {
}
}
chat.wg.Add(6)
chat.wg.Add(7)
go chat.parseInput()
go chat.receiveMessages()
@ -123,6 +123,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, options Options) *Chat {
go chat.welcomeMessage()
go chat.connectionWatcher(&connectionWg, connNotifier)
go chat.staticNodes(&connectionWg)
go chat.discoverNodes(&connectionWg)
go chat.retrieveHistory(&connectionWg)
@ -135,6 +136,18 @@ func (c *Chat) Stop() {
close(c.inputChan)
}
func (c *Chat) connectionWatcher(connectionWg *sync.WaitGroup, connNotifier <-chan node.PeerConnection) {
defer c.wg.Done()
for conn := range connNotifier {
if conn.Connected {
c.ui.InfoMessage(fmt.Sprintf("Peer %s connected", conn.PeerID.Pretty()))
} else {
c.ui.InfoMessage(fmt.Sprintf("Peer %s disconnected", conn.PeerID.Pretty()))
}
}
}
func (c *Chat) receiveMessages() {
defer c.wg.Done()
for {
@ -200,8 +213,6 @@ func (c *Chat) parseInput() {
err = c.node.DialPeerWithMultiAddress(ctx, ma)
if err != nil {
c.ui.ErrorMessage(err)
} else {
c.ui.InfoMessage(fmt.Sprintf("Connected to %s", peerID))
}
}(peer)
return
@ -447,19 +458,11 @@ func (c *Chat) staticNodes(connectionWg *sync.WaitGroup) {
ctx, cancel := context.WithTimeout(c.ctx, time.Duration(10)*time.Second)
defer cancel()
peerID, err := addr.ValueForProtocol(multiaddr.P_P2P)
if err != nil {
c.ui.ErrorMessage(err)
return
}
c.ui.InfoMessage(fmt.Sprintf("Connecting to %s", addr.String()))
err = c.node.DialPeerWithMultiAddress(ctx, addr)
err := c.node.DialPeerWithMultiAddress(ctx, addr)
if err != nil {
c.ui.ErrorMessage(err)
} else {
c.ui.InfoMessage(fmt.Sprintf("Connected to %s", peerID))
}
}(n)
}
@ -539,30 +542,23 @@ func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) {
if err != nil {
c.ui.ErrorMessage(errors.New(err.Error()))
} else {
var nodeList []multiaddr.Multiaddr
var nodeList []peer.AddrInfo
for _, n := range nodes {
nodeList = append(nodeList, n.Addresses...)
nodeList = append(nodeList, n.PeerInfo)
}
c.ui.InfoMessage(fmt.Sprintf("Discovered and connecting to %v ", nodeList))
wg := sync.WaitGroup{}
wg.Add(len(nodeList))
for _, n := range nodeList {
go func(ctx context.Context, addr multiaddr.Multiaddr) {
go func(ctx context.Context, info peer.AddrInfo) {
defer wg.Done()
peerID, err := addr.ValueForProtocol(multiaddr.P_P2P)
if err != nil {
c.ui.ErrorMessage(err)
return
}
ctx, cancel := context.WithTimeout(ctx, time.Duration(10)*time.Second)
defer cancel()
err = c.node.DialPeerWithMultiAddress(ctx, addr)
err = c.node.DialPeerWithInfo(ctx, n)
if err != nil {
c.ui.ErrorMessage(fmt.Errorf("could not connect to %s: %w", peerID, err))
} else {
c.ui.InfoMessage(fmt.Sprintf("Connected to %s", peerID))
c.ui.ErrorMessage(fmt.Errorf("co!!uld not connect to %s: %w", info.ID.Pretty(), err))
}
}(c.ctx, n)

View File

@ -30,10 +30,13 @@ func execute(options Options) {
}
}
connNotifier := make(chan node.PeerConnection)
opts := []node.WakuNodeOption{
node.WithPrivateKey(options.NodeKey),
node.WithNTP(),
node.WithHostAddress(hostAddr),
node.WithConnectionNotification(connNotifier),
}
if options.Relay.Enable {
@ -130,7 +133,7 @@ func execute(options Options) {
return
}
chat := NewChat(ctx, wakuNode, options)
chat := NewChat(ctx, wakuNode, connNotifier, options)
p := tea.NewProgram(chat.ui)
if err := p.Start(); err != nil {
fmt.Println(err.Error())

View File

@ -40,7 +40,7 @@ func DnsDiscovery(url string, nameserver string, ms int) string {
item := DnsDiscoveryItem{
PeerID: n.PeerID.String(),
}
for _, addr := range n.Addresses {
for _, addr := range n.PeerInfo.Addrs {
item.Addresses = append(item.Addresses, addr.String())
}

View File

@ -211,11 +211,11 @@ func Execute(options Options) {
if err != nil {
logger.Warn("dns discovery error ", zap.Error(err))
} else {
var discAddresses []multiaddr.Multiaddr
var discPeerInfo []peer.AddrInfo
for _, n := range nodes {
discAddresses = append(discAddresses, n.Addresses...)
discPeerInfo = append(discPeerInfo, n.PeerInfo)
}
logger.Info("found dns entries ", logging.MultiAddrs("nodes", discAddresses...))
logger.Info("found dns entries ", zap.Any("nodes", discPeerInfo))
discoveredNodes = append(discoveredNodes, nodes...)
}
}
@ -329,16 +329,15 @@ func Execute(options Options) {
if len(discoveredNodes) != 0 {
for _, n := range discoveredNodes {
for _, m := range n.Addresses {
go func(ctx context.Context, m multiaddr.Multiaddr) {
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
err = wakuNode.DialPeerWithMultiAddress(ctx, m)
if err != nil {
logger.Error("dialing peer", logging.MultiAddrs("peer", m), zap.Error(err))
}
}(ctx, m)
}
go func(ctx context.Context, info peer.AddrInfo) {
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
err = wakuNode.DialPeerWithInfo(ctx, info)
if err != nil {
logger.Error("dialing peer", logging.HostID("peer", info.ID), zap.Error(err))
}
}(ctx, n.PeerInfo)
}
}

View File

@ -9,8 +9,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/metrics"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
ma "github.com/multiformats/go-multiaddr"
)
type dnsDiscoveryParameters struct {
@ -27,9 +25,9 @@ func WithNameserver(nameserver string) DnsDiscoveryOption {
}
type DiscoveredNode struct {
PeerID peer.ID
Addresses []ma.Multiaddr
ENR *enode.Node
PeerID peer.ID
PeerInfo peer.AddrInfo
ENR *enode.Node
}
// RetrieveNodes returns a list of multiaddress given a url to a DNS discoverable ENR tree
@ -58,9 +56,22 @@ func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption)
return nil, err
}
infoAddr, err := peer.AddrInfosFromP2pAddrs(m...)
if err != nil {
return nil, err
}
var info peer.AddrInfo
for _, i := range infoAddr {
if i.ID == peerID {
info = i
break
}
}
d := DiscoveredNode{
PeerID: peerID,
Addresses: m,
PeerID: peerID,
PeerInfo: info,
}
if hasUDP(node) {

View File

@ -29,23 +29,30 @@ type ConnStatus struct {
Peers PeerStats
}
type PeerConnection struct {
PeerID peer.ID
Connected bool
}
// ConnectionNotifier is a custom Notifier to be used to display when a peer
// connects or disconnects to the node
type ConnectionNotifier struct {
h host.Host
ctx context.Context
log *zap.Logger
connNotifCh chan<- PeerConnection
DisconnectChan chan peer.ID
quit chan struct{}
}
func NewConnectionNotifier(ctx context.Context, h host.Host, log *zap.Logger) ConnectionNotifier {
func NewConnectionNotifier(ctx context.Context, h host.Host, connNotifCh chan<- PeerConnection, log *zap.Logger) ConnectionNotifier {
return ConnectionNotifier{
h: h,
ctx: ctx,
DisconnectChan: make(chan peer.ID, 100),
connNotifCh: connNotifCh,
quit: make(chan struct{}),
log: log,
log: log.Named("connection-notifier"),
}
}
@ -60,6 +67,13 @@ func (c ConnectionNotifier) ListenClose(n network.Network, m multiaddr.Multiaddr
// Connected is called when a connection is opened
func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) {
c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer()))
if c.connNotifCh != nil {
select {
case c.connNotifCh <- PeerConnection{cc.RemotePeer(), true}:
default:
c.log.Warn("subscriber is too slow")
}
}
stats.Record(c.ctx, metrics.Peers.M(1))
}
@ -68,6 +82,13 @@ func (c ConnectionNotifier) Disconnected(n network.Network, cc network.Conn) {
c.log.Info("peer disconnected", logging.HostID("peer", cc.RemotePeer()))
stats.Record(c.ctx, metrics.Peers.M(-1))
c.DisconnectChan <- cc.RemotePeer()
if c.connNotifCh != nil {
select {
case c.connNotifCh <- PeerConnection{cc.RemotePeer(), false}:
default:
c.log.Warn("subscriber is too slow")
}
}
}
// OpenedStream is called when a stream opened

View File

@ -109,7 +109,7 @@ type WakuNode struct {
// Channel passed to WakuNode constructor
// receiving connection status notifications
connStatusChan chan ConnStatus
connStatusChan chan<- ConnStatus
storeFactory storeFactory
}
@ -300,7 +300,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
return err
}
w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.log)
w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.opts.connNotifCh, w.log)
w.host.Network().Notify(w.connectionNotif)
w.enrChangeCh = make(chan struct{}, 10)
@ -722,6 +722,11 @@ func (w *WakuNode) DialPeer(ctx context.Context, address string) error {
return w.connect(ctx, *info)
}
// DialPeerWithInfo is used to connect to a peer using its address information
func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo) error {
return w.connect(ctx, peerInfo)
}
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
err := w.host.Connect(ctx, info)
if err != nil {

View File

@ -112,7 +112,8 @@ type WakuNodeParameters struct {
enableLightPush bool
connStatusC chan ConnStatus
connStatusC chan<- ConnStatus
connNotifCh chan<- PeerConnection
storeFactory storeFactory
}
@ -433,6 +434,13 @@ func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption {
}
}
func WithConnectionNotification(ch chan<- PeerConnection) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.connNotifCh = ch
return nil
}
}
// WithWebsockets is a WakuNodeOption used to enable websockets support
func WithWebsockets(address string, port int) WakuNodeOption {
return func(params *WakuNodeParameters) error {