diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index c607c1984..8c016d77d 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -38,6 +38,32 @@ func (w *gethWakuWrapper) Version() uint { return 1 } +// Added for compatibility with waku V2 +func (w *gethWakuWrapper) PeerCount() int { + return -1 +} + +// PeerCount function only added for compatibility with waku V2 +func (w *gethWakuWrapper) AddStorePeer(address string) error { + return errors.New("not available in WakuV1") +} + +// AddRelayPeer function only added for compatibility with waku V2 +func (w *gethWakuWrapper) AddRelayPeer(address string) error { + return errors.New("not available in WakuV1") +} + +// PeerCount function only added for compatibility with waku V2 +func (w *gethWakuWrapper) DropPeer(peerID string) error { + return errors.New("not available in WakuV1") +} + +// Peers function only added for compatibility with waku V2 +func (w *gethWakuWrapper) Peers() map[string][]string { + p := make(map[string][]string) + return p +} + // MinPow returns the PoW value required by this node. func (w *gethWakuWrapper) MinPow() float64 { return w.waku.MinPow() diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index e4d5b4220..a29e0c9b4 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -42,7 +42,11 @@ func (w *gethWakuV2Wrapper) Version() uint { return 2 } -// MinPow returns the PoW value required by this node. +func (w *gethWakuV2Wrapper) PeerCount() int { + return w.waku.PeerCount() +} + +// DEPRECATED: Not used in WakuV2 func (w *gethWakuV2Wrapper) MinPow() float64 { return 0 } @@ -52,10 +56,7 @@ func (w *gethWakuV2Wrapper) MaxMessageSize() uint32 { return w.waku.MaxMessageSize() } -// BloomFilter returns the aggregated bloom filter for all the topics of interest. -// The nodes are required to send only messages that match the advertised bloom filter. -// If a message does not match the bloom, it will tantamount to spam, and the peer will -// be disconnected. +// DEPRECATED: not used in WakuV2 func (w *gethWakuV2Wrapper) BloomFilter() []byte { return nil } @@ -170,6 +171,7 @@ func (w *gethWakuV2Wrapper) createFilterWrapper(id string, keyAsym *ecdsa.Privat }, id), nil } +// DEPRECATED: Not used in waku V2 func (w *gethWakuV2Wrapper) SendMessagesRequest(peerID []byte, r types.MessagesRequest) error { return errors.New("DEPRECATED") } @@ -215,15 +217,27 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(peerID []byte, r types.Messages return nil, nil } -// RequestHistoricMessages sends a message with p2pRequestCode to a specific peer, -// which is known to implement MailServer interface, and is supposed to process this -// request and respond with a number of peer-to-peer messages (possibly expired), -// which are not supposed to be forwarded any further. -// The whisper protocol is agnostic of the format and contents of envelope. +// DEPRECATED: Not used in waku V2 func (w *gethWakuV2Wrapper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope types.Envelope, timeout time.Duration) error { return errors.New("DEPRECATED") } +func (w *gethWakuV2Wrapper) AddStorePeer(address string) error { + return w.waku.AddStorePeer(address) +} + +func (w *gethWakuV2Wrapper) AddRelayPeer(address string) error { + return w.waku.AddRelayPeer(address) +} + +func (w *gethWakuV2Wrapper) Peers() map[string][]string { + return w.waku.Peers() +} + +func (w *gethWakuV2Wrapper) DropPeer(peerID string) error { + return w.waku.DropPeer(peerID) +} + type wakuV2FilterWrapper struct { filter *wakucommon.Filter id string diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index d010100fc..feaadedef 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -13,6 +13,17 @@ type Waku interface { // Waku protocol version Version() uint + // PeerCount + PeerCount() int + + Peers() map[string][]string + + AddStorePeer(address string) error + + AddRelayPeer(address string) error + + DropPeer(peerID string) error + // MinPow returns the PoW value required by this node. MinPow() float64 // BloomFilter returns the aggregated bloom filter for all the topics of interest. diff --git a/protocol/messenger.go b/protocol/messenger.go index cefd4c245..14634a841 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -566,11 +566,12 @@ func (m *Messenger) handleConnectionChange(online bool) { } func (m *Messenger) online() bool { - // TODO: we are still missing peer management in wakuv2 - if m.transport.WakuVersion() == 2 { - return true + switch m.transport.WakuVersion() { + case 2: + return m.transport.PeerCount() > 0 + default: + return m.node.PeersCount() > 0 } - return m.node.PeersCount() > 0 } func (m *Messenger) buildContactCodeAdvertisement() (*protobuf.ContactCodeAdvertisement, error) { diff --git a/protocol/messenger_peers.go b/protocol/messenger_peers.go new file mode 100644 index 000000000..199ceb93b --- /dev/null +++ b/protocol/messenger_peers.go @@ -0,0 +1,17 @@ +package protocol + +func (m *Messenger) AddStorePeer(address string) error { + return m.transport.AddStorePeer(address) +} + +func (m *Messenger) AddRelayPeer(address string) error { + return m.transport.AddStorePeer(address) +} + +func (m *Messenger) DropPeer(peerID string) error { + return m.transport.DropPeer(peerID) +} + +func (m *Messenger) Peers() map[string][]string { + return m.transport.Peers() +} diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index f3058ff5c..be102a4e0 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -414,6 +414,14 @@ func (t *Transport) WakuVersion() uint { return t.waku.Version() } +func (t *Transport) PeerCount() int { + return t.waku.PeerCount() +} + +func (t *Transport) Peers() map[string][]string { + return t.waku.Peers() +} + func (t *Transport) createMessagesRequestV1( ctx context.Context, peerID []byte, @@ -585,3 +593,15 @@ func (t *Transport) BloomFilter() []byte { func PubkeyToHex(key *ecdsa.PublicKey) string { return types.EncodeHex(crypto.FromECDSAPub(key)) } + +func (t *Transport) AddStorePeer(address string) error { + return t.waku.AddStorePeer(address) +} + +func (t *Transport) AddRelayPeer(address string) error { + return t.waku.AddRelayPeer(address) +} + +func (t *Transport) DropPeer(peerID string) error { + return t.waku.DropPeer(peerID) +} diff --git a/services/ext/api.go b/services/ext/api.go index 5501b0060..88fd9a713 100644 --- a/services/ext/api.go +++ b/services/ext/api.go @@ -904,6 +904,22 @@ func (api *PublicAPI) BloomFilter() string { return hexutil.Encode(api.service.messenger.BloomFilter()) } +func (api *PublicAPI) AddStorePeer(address string) error { + return api.service.messenger.AddStorePeer(address) +} + +func (api *PublicAPI) AddRelayPeer(address string) error { + return api.service.messenger.AddRelayPeer(address) +} + +func (api *PublicAPI) DropPeer(peerID string) error { + return api.service.messenger.DropPeer(peerID) +} + +func (api *PublicAPI) Peers() map[string][]string { + return api.service.messenger.Peers() +} + // ----- // HELPER // ----- diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go index 9618c4813..62ff87a21 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go @@ -721,6 +721,11 @@ func (w *WakuNode) DialPeerByID(peerID peer.ID) error { return w.connect(info) } +func (w *WakuNode) DialPeerByID(peerID peer.ID) error { + info := w.host.Peerstore().PeerInfo(peerID) + return w.host.Connect(w.ctx, info) +} + func (w *WakuNode) ClosePeerByAddress(address string) error { p, err := ma.NewMultiaddr(address) if err != nil { @@ -766,7 +771,6 @@ func (w *WakuNode) Peers() PeerStats { } func (w *WakuNode) startKeepAlive(t time.Duration) { - log.Info("Setting up ping protocol with duration of ", t) w.ping = ping.NewPingService(w.host) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 13f70e500..7534574e3 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -29,7 +29,7 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/libp2p/go-libp2p-core/peer" "go.uber.org/zap" @@ -37,6 +37,7 @@ import ( "golang.org/x/crypto/pbkdf2" gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -67,6 +68,12 @@ type settings struct { SoftBlacklistedPeerIDs map[string]bool // SoftBlacklistedPeerIDs is a list of peer ids that we want to keep connected but silently drop any envelope from } +type ConnStatus struct { + IsOnline bool `json:"isOnline"` + HasHistory bool `json:"hasHistory"` + Peers map[string][]string `json:"peers"` +} + // Waku represents a dark communication interface through the Ethereum // network, using its very own P2P communication layer. type Waku struct { @@ -167,7 +174,7 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) { case <-waku.quit: return case c := <-connStatusChan: - signal.SendPeerStats(c) + signal.SendPeerStats(formatConnStatus(c)) } } }() @@ -707,6 +714,28 @@ func (w *Waku) IsEnvelopeCached(hash gethcommon.Hash) bool { return exist } +func (w *Waku) PeerCount() int { + return w.node.PeerCount() +} + +func (w *Waku) Peers() map[string][]string { + return FormatPeerStats(w.node.Peers()) +} + +func (w *Waku) AddStorePeer(address string) error { + _, err := w.node.AddStorePeer(address) + return err +} + +func (w *Waku) AddRelayPeer(address string) error { + // TODO: + return nil +} + +func (w *Waku) DropPeer(peerID string) error { + return w.node.ClosePeerById(peer.ID(peerID)) +} + // validatePrivateKey checks the format of the given private key. func validatePrivateKey(k *ecdsa.PrivateKey) bool { if k == nil || k.D == nil || k.D.Sign() == 0 { @@ -741,3 +770,19 @@ func toDeterministicID(id string, expectedLen int) (string, error) { return id, nil } + +func FormatPeerStats(peers node.PeerStats) map[string][]string { + p := make(map[string][]string) + for k, v := range peers { + p[k.Pretty()] = v + } + return p +} + +func formatConnStatus(c node.ConnStatus) ConnStatus { + return ConnStatus{ + IsOnline: c.IsOnline, + HasHistory: c.HasHistory, + Peers: FormatPeerStats(c.Peers), + } +}