diff --git a/examples/peer_events/build/peer_events b/examples/peer_events/build/peer_events deleted file mode 100755 index 26c343a8..00000000 Binary files a/examples/peer_events/build/peer_events and /dev/null differ diff --git a/examples/peer_events/go.sum b/examples/peer_events/go.sum index 4a5c1fb0..d9566437 100644 --- a/examples/peer_events/go.sum +++ b/examples/peer_events/go.sum @@ -814,6 +814,8 @@ github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210711180556-9afd35dadd3f h1 github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210711180556-9afd35dadd3f/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4 h1:LaoaRVhtEu5kCzcEVn+l3sprDbTqpzWFsVT2lmAcHcA= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= +github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a h1:eCna/q/PuZVqtmOMBqytw9yzZwMNKpao4au0OJDvesI= +github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/status-go/extkeys v1.0.0/go.mod h1:GdqJbrcpkNm5ZsSCpp+PdMxnXx+OcRBdm3PI0rs1FpU= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/examples/peer_events/main.go b/examples/peer_events/main.go index b53ea914..cf7079fa 100644 --- a/examples/peer_events/main.go +++ b/examples/peer_events/main.go @@ -7,6 +7,9 @@ import ( "encoding/hex" "fmt" "net" + "os" + "os/signal" + "syscall" "time" "github.com/ethereum/go-ethereum/crypto" @@ -14,7 +17,8 @@ import ( "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" - "github.com/status-im/go-waku/waku/v2/protocol/store" + + //"github.com/status-im/go-waku/waku/v2/protocol/store" "github.com/status-im/go-waku/waku/v2/utils" ) @@ -55,107 +59,114 @@ func main() { ctx := context.Background() - connStatusChan := make(chan node.ConnStatus) + //connStatusChan := make(chan node.ConnStatus) + log.Info("### create relayNode1") relayNode1, err := node.New(ctx, node.WithPrivateKey(addrsAndKeys[0].key), node.WithHostAddress([]net.Addr{addrsAndKeys[0].addr}), node.WithWakuRelay(), - node.WithConnStatusChan(connStatusChan), + //node.WithConnStatusChan(connStatusChan), node.WithWakuStore(true), + node.WithKeepAlive(time.Duration(2)*time.Second), ) - relayNode2, err := node.New(ctx, - node.WithPrivateKey(addrsAndKeys[1].key), - node.WithHostAddress([]net.Addr{addrsAndKeys[1].addr}), - node.WithWakuRelay(), - ) + // relayNode2, err := node.New(ctx, + // node.WithPrivateKey(addrsAndKeys[1].key), + // node.WithHostAddress([]net.Addr{addrsAndKeys[1].addr}), + // node.WithWakuRelay(), + // ) - relayNode2.DialPeer(relayNode1.ListenAddresses()[0]) + log.Info("### before DialPeer") + //staticNode := "/ip4/8.210.222.231/tcp/30303/p2p/16Uiu2HAm4v86W3bmT1BiH6oSPzcsSr24iDQpSN5Qa992BCjjwgrD" + staticNode := "/ip4/188.166.135.145/tcp/30303/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e" + relayNode1.DialPeer(staticNode) + //relayNode2.DialPeer(relayNode1.ListenAddresses()[0]) //go writeLoop(ctx, relayNode1) //go readLoop(relayNode1) //go readLoop(relayNode2) - printNodeConns := func(node *node.WakuNode) { - log.Info(node.Host().ID(), ": ", "peerCount: ", len(node.Host().Peerstore().Peers())) - log.Info("node peers: ") - for k, v := range node.GetPeerStats() { - log.Info(k, " ", v) - } - log.Info(node.Host().ID(), ": ", "isOnline/hasHistory ", node.IsOnline(), " ", node.HasHistory()) - log.Info("end") + log.Info("### Peer dialled") + // printNodeConns := func(node *node.WakuNode) { + // log.Info(node.Host().ID(), ": ", "peerCount: ", len(node.Host().Peerstore().Peers())) + // log.Info("node peers: ") + // for k, v := range node.GetPeerStats() { + // log.Info(k, " ", v) + // } + // log.Info(node.Host().ID(), ": ", "isOnline/hasHistory ", node.IsOnline(), " ", node.HasHistory()) + // log.Info("end") - } + // } - go func() { - for connStatus := range connStatusChan { - log.Info("Conn status update: ", connStatus) - } - }() - go func() { - ticker := time.NewTicker(time.Millisecond * 1000) - defer ticker.Stop() - for { - select { - case <-ticker.C: - printNodeConns(relayNode1) - } - } - }() + // go func() { + // for connStatus := range connStatusChan { + // log.Info("Conn status update: ", connStatus) + // } + // }() + // go func() { + // ticker := time.NewTicker(time.Millisecond * 1000) + // defer ticker.Stop() + // for { + // select { + // case <-ticker.C: + // printNodeConns(relayNode1) + // } + // } + // }() - time.Sleep(3 * time.Second) - log.Info("stop relayNode2") - //relayNode2.Stop() + // time.Sleep(3 * time.Second) + // log.Info("stop relayNode2") + // relayNode2.Host().Close() - time.Sleep(3 * time.Second) + // time.Sleep(3 * time.Second) - log.Info("start relayNode3") + // log.Info("start relayNode3") - relayNode3, err := node.New(ctx, - node.WithPrivateKey(addrsAndKeys[2].key), - node.WithHostAddress([]net.Addr{addrsAndKeys[2].addr}), - node.WithWakuRelay(), - ) + // relayNode3, err := node.New(ctx, + // node.WithPrivateKey(addrsAndKeys[2].key), + // node.WithHostAddress([]net.Addr{addrsAndKeys[2].addr}), + // node.WithWakuRelay(), + // ) - relayNode3.DialPeer(relayNode1.ListenAddresses()[0]) + // relayNode3.DialPeer(relayNode1.ListenAddresses()[0]) - time.Sleep(3 * time.Second) - log.Info("stop relayNode3") - //relayNode3.Stop() + // time.Sleep(3 * time.Second) + // log.Info("stop relayNode3") + // //relayNode3.Stop() - log.Info("start storeNode") - // Start a store node - storeNode, _ := node.New(ctx, - node.WithPrivateKey(addrsAndKeys[3].key), - node.WithHostAddress([]net.Addr{addrsAndKeys[3].addr}), - node.WithWakuRelay(), - node.WithWakuStore(true), - ) - tCtx, _ := context.WithTimeout(ctx, 5*time.Second) - log.Info("#before AddStorePeer") - storeNodeId, err := relayNode1.AddStorePeer(storeNode.ListenAddresses()[0]) - time.Sleep(3 * time.Second) - log.Info("#before Query") - _, err = relayNode1.Query(tCtx, []string{contentTopic}, 0, 0, store.WithPeer(*storeNodeId)) - log.Info("storeNode.ListenAddresses(): ", storeNode.ListenAddresses(), storeNodeId) - if err != nil { - log.Info("### error adding store peer: ", err) - } + // log.Info("start storeNode") + // // Start a store node + // storeNode, _ := node.New(ctx, + // node.WithPrivateKey(addrsAndKeys[3].key), + // node.WithHostAddress([]net.Addr{addrsAndKeys[3].addr}), + // node.WithWakuRelay(), + // node.WithWakuStore(true), + // ) + // tCtx, _ := context.WithTimeout(ctx, 5*time.Second) + // log.Info("#before AddStorePeer") + // storeNodeId, err := relayNode1.AddStorePeer(storeNode.ListenAddresses()[0]) + // time.Sleep(3 * time.Second) + // log.Info("#before Query") + // _, err = relayNode1.Query(tCtx, []string{contentTopic}, 0, 0, store.WithPeer(*storeNodeId)) + // log.Info("storeNode.ListenAddresses(): ", storeNode.ListenAddresses(), storeNodeId) + // if err != nil { + // log.Info("### error adding store peer: ", err) + // } - time.Sleep(3 * time.Second) - log.Info("stop storeNode") - storeNode.Stop() + // time.Sleep(3 * time.Second) + // log.Info("stop storeNode") + // storeNode.Stop() - time.Sleep(3 * time.Second) - // // Wait for a SIGINT or SIGTERM signal - // ch := make(chan os.Signal, 1) - // signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) - // <-ch - // fmt.Println("\n\n\nReceived signal, shutting down...") + // time.Sleep(3 * time.Second) + // // // Wait for a SIGINT or SIGTERM signal + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + <-ch + fmt.Println("\n\n\nReceived signal, shutting down...") - // shut the nodes down - relayNode1.Stop() + // // shut the nodes down + // relayNode1.Stop() } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index c6ef5f57..1ebc98d0 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -77,78 +77,104 @@ type WakuNode struct { // Channel passed to WakuNode constructor // receiving connection status notifications connStatusChan chan ConnStatus + pingEventsChan chan interface{} } +func (w *WakuNode) handleConnectednessChanged(ev event.EvtPeerConnectednessChanged) { + + log.Info("### EvtPeerConnectednessChanged ", w.Host().ID(), " to ", ev.Peer, " : ", ev.Connectedness) + if ev.Connectedness == network.Connected { + _, ok := w.peers[ev.Peer] + if !ok { + peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) + log.Info("protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) + w.peers[ev.Peer] = peerProtocols + } else { + log.Info("### Peer already exists") + } + } else if ev.Connectedness == network.NotConnected { + log.Info("Peer down: ", ev.Peer) + delete(w.peers, ev.Peer) + // for _, pl := range w.peerListeners { + // pl <- &ev + // } + // TODO + // There seems to be no proper way to + // remove a dropped peer from Host's Peerstore + // https://github.com/libp2p/go-libp2p-host/issues/13 + //w.Host().Network().ClosePeer(ev.Peer) + } + +} +func (w *WakuNode) handleProtocolsUpdated(ev event.EvtPeerProtocolsUpdated) { + log.Info("### EvtPeerProtocolsUpdated ", w.Host().ID(), " to ", ev.Peer, " added: ", ev.Added, ", removed: ", ev.Removed) + _, ok := w.peers[ev.Peer] + if ok { + peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) + log.Info("updated protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) + w.peers[ev.Peer] = peerProtocols + } + +} +func (w *WakuNode) handlePeerIdentificationCompleted(ev event.EvtPeerIdentificationCompleted) { + + log.Info("### EvtPeerIdentificationCompleted ", w.Host().ID(), " to ", ev.Peer) + peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) + log.Info("identified protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) + _, ok := w.peers[ev.Peer] + if ok { + peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) + w.peers[ev.Peer] = peerProtocols + } + +} +func (w *WakuNode) processHostEvent(e interface{}) { + if e == nil { + log.Info("processHostEvent nil event") + return + } + isOnline := w.IsOnline() + hasHistory := w.HasHistory() + switch e.(type) { + case event.EvtPeerConnectednessChanged: + w.handleConnectednessChanged(e.(event.EvtPeerConnectednessChanged)) + case event.EvtPeerProtocolsUpdated: + w.handleProtocolsUpdated(e.(event.EvtPeerProtocolsUpdated)) + case event.EvtPeerIdentificationCompleted: + w.handlePeerIdentificationCompleted(e.(event.EvtPeerIdentificationCompleted)) + } + + log.Info("###processHostEvent before isOnline()") + newIsOnline := w.IsOnline() + log.Info("###processHostEvent before hasHistory()") + newHasHistory := w.HasHistory() + log.Info("###ConnStatus isOnline: ", isOnline, "/", newIsOnline, " hasHistory: ", + hasHistory, "/", newHasHistory) + if w.connStatusChan != nil && + (isOnline != newIsOnline || hasHistory != newHasHistory) { + log.Info("New ConnStatus: ", ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory}) + w.connStatusChan <- ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory} + } + +} func (w *WakuNode) connectednessListener() { for { - isOnline := w.IsOnline() - hasHistory := w.HasHistory() - + var e interface{} + log.Info("connectednessListener before select") select { - case e := <-w.connectednessEventSub.Out(): - if e == nil { - break - } - ev := e.(event.EvtPeerConnectednessChanged) - - log.Info("### EvtPeerConnectednessChanged ", w.Host().ID(), " to ", ev.Peer, " : ", ev.Connectedness) - if ev.Connectedness == network.Connected { - _, ok := w.peers[ev.Peer] - if !ok { - peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) - log.Info("protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) - w.peers[ev.Peer] = peerProtocols - } else { - log.Info("### Peer already exists") - } - } else if ev.Connectedness == network.NotConnected { - log.Info("Peer down: ", ev.Peer) - delete(w.peers, ev.Peer) - for _, pl := range w.peerListeners { - pl <- &ev - } - // TODO - // There seems to be no proper way to - // remove a dropped peer from Host's Peerstore - // https://github.com/libp2p/go-libp2p-host/issues/13 - //w.Host().Network().ClosePeer(ev.Peer) - } - case e := <-w.protocolEventSub.Out(): - if e == nil { - break - } - ev := e.(event.EvtPeerProtocolsUpdated) - - log.Info("### EvtPeerProtocolsUpdated ", w.Host().ID(), " to ", ev.Peer, " added: ", ev.Added, ", removed: ", ev.Removed) - _, ok := w.peers[ev.Peer] - if ok { - peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) - log.Info("updated protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) - w.peers[ev.Peer] = peerProtocols - } - - case e := <-w.identificationEventSub.Out(): - if e == nil { - break - } - ev := e.(event.EvtPeerIdentificationCompleted) - - log.Info("### EvtPeerIdentificationCompleted ", w.Host().ID(), " to ", ev.Peer) - peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) - log.Info("identified protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) - _, ok := w.peers[ev.Peer] - if ok { - peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) - w.peers[ev.Peer] = peerProtocols - } - - } - newIsOnline := w.IsOnline() - newHasHistory := w.HasHistory() - if w.connStatusChan != nil && - (isOnline != newIsOnline || hasHistory != newHasHistory) { - w.connStatusChan <- ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory} + case e = <-w.connectednessEventSub.Out(): + log.Info("connectednessListener connectednessEvent") + case e = <-w.protocolEventSub.Out(): + log.Info("connectednessListener protocolEvent") + case e = <-w.identificationEventSub.Out(): + log.Info("connectednessListener identificationEvent") + case e = <-w.pingEventsChan: + log.Info("connectednessListener pingEvent") } + log.Info("connectednessListener after select") + + w.processHostEvent(e) + log.Info("connectednessListener after processHostEvent") } } @@ -205,6 +231,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { if params.connStatusChan != nil { w.connStatusChan = params.connStatusChan } + w.pingEventsChan = make(chan interface{}) go w.connectednessListener() if params.enableStore { @@ -688,12 +715,69 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { w.ping = ping.NewPingService(w.host) ticker := time.NewTicker(t) go func() { + + peerMap := make(map[peer.ID]<-chan ping.Result) + em, _ := w.Host().EventBus().Emitter(new(event.EvtPeerConnectednessChanged)) + defer em.Close() + var mu sync.Mutex for { select { case <-ticker.C: for _, peer := range w.host.Network().Peers() { - log.Info("Pinging", peer) - w.ping.Ping(w.ctx, peer) + mu.Lock() + _, ok := peerMap[peer] + mu.Unlock() + if !ok { + log.Info("###Pinging", peer) + result := w.ping.Ping(w.ctx, peer) + mu.Lock() + peerMap[peer] = result + mu.Unlock() + + go func() { + peerFound := false + for p, _ := range w.peers { + if p == peer { + peerFound = true + break + } + } + + log.Info("###PING before fetching result") + pingTicker := time.NewTicker(time.Duration(1) * time.Second) + isError := false + select { + case resVal := <-result: + isError = resVal.Error != nil + case <-pingTicker.C: + isError = true + } + pingTicker.Stop() + log.Info("###PING after fetching result") + if !peerFound && !isError { + log.Info("###PING peer added") + //EventBus Emitter doesn't seem to work when there's no connection + w.pingEventsChan <- event.EvtPeerConnectednessChanged{ + Peer: peer, + Connectedness: network.Connected, + } + } else if peerFound && isError { + log.Info("###PING peer removed") + w.pingEventsChan <- event.EvtPeerConnectednessChanged{ + Peer: peer, + Connectedness: network.NotConnected, + } + log.Info("###PING wrote to ping chan") + } + + mu.Lock() + delete(peerMap, peer) + mu.Unlock() + }() + } else { + log.Info("###PING already pinged") + } + } case <-w.quit: ticker.Stop()