feat: adding peers, fix peer count and fix format of peer stats signal

This commit is contained in:
Richard Ramos 2021-08-30 10:57:28 -04:00 committed by RichΛrd
parent 181c06722f
commit 0575030825
9 changed files with 171 additions and 17 deletions

View File

@ -38,6 +38,32 @@ func (w *gethWakuWrapper) Version() uint {
return 1 return 1
} }
// Added for compatibility with waku V2
func (w *gethWakuWrapper) PeerCount() int {
return -1
}
// PeerCount function only added for compatibility with waku V2
func (w *gethWakuWrapper) AddStorePeer(address string) error {
return errors.New("not available in WakuV1")
}
// AddRelayPeer function only added for compatibility with waku V2
func (w *gethWakuWrapper) AddRelayPeer(address string) error {
return errors.New("not available in WakuV1")
}
// PeerCount function only added for compatibility with waku V2
func (w *gethWakuWrapper) DropPeer(peerID string) error {
return errors.New("not available in WakuV1")
}
// Peers function only added for compatibility with waku V2
func (w *gethWakuWrapper) Peers() map[string][]string {
p := make(map[string][]string)
return p
}
// MinPow returns the PoW value required by this node. // MinPow returns the PoW value required by this node.
func (w *gethWakuWrapper) MinPow() float64 { func (w *gethWakuWrapper) MinPow() float64 {
return w.waku.MinPow() return w.waku.MinPow()

View File

@ -42,7 +42,11 @@ func (w *gethWakuV2Wrapper) Version() uint {
return 2 return 2
} }
// MinPow returns the PoW value required by this node. func (w *gethWakuV2Wrapper) PeerCount() int {
return w.waku.PeerCount()
}
// DEPRECATED: Not used in WakuV2
func (w *gethWakuV2Wrapper) MinPow() float64 { func (w *gethWakuV2Wrapper) MinPow() float64 {
return 0 return 0
} }
@ -52,10 +56,7 @@ func (w *gethWakuV2Wrapper) MaxMessageSize() uint32 {
return w.waku.MaxMessageSize() return w.waku.MaxMessageSize()
} }
// BloomFilter returns the aggregated bloom filter for all the topics of interest. // DEPRECATED: not used in WakuV2
// The nodes are required to send only messages that match the advertised bloom filter.
// If a message does not match the bloom, it will tantamount to spam, and the peer will
// be disconnected.
func (w *gethWakuV2Wrapper) BloomFilter() []byte { func (w *gethWakuV2Wrapper) BloomFilter() []byte {
return nil return nil
} }
@ -170,6 +171,7 @@ func (w *gethWakuV2Wrapper) createFilterWrapper(id string, keyAsym *ecdsa.Privat
}, id), nil }, id), nil
} }
// DEPRECATED: Not used in waku V2
func (w *gethWakuV2Wrapper) SendMessagesRequest(peerID []byte, r types.MessagesRequest) error { func (w *gethWakuV2Wrapper) SendMessagesRequest(peerID []byte, r types.MessagesRequest) error {
return errors.New("DEPRECATED") return errors.New("DEPRECATED")
} }
@ -215,15 +217,27 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(peerID []byte, r types.Messages
return nil, nil return nil, nil
} }
// RequestHistoricMessages sends a message with p2pRequestCode to a specific peer, // DEPRECATED: Not used in waku V2
// which is known to implement MailServer interface, and is supposed to process this
// request and respond with a number of peer-to-peer messages (possibly expired),
// which are not supposed to be forwarded any further.
// The whisper protocol is agnostic of the format and contents of envelope.
func (w *gethWakuV2Wrapper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope types.Envelope, timeout time.Duration) error { func (w *gethWakuV2Wrapper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope types.Envelope, timeout time.Duration) error {
return errors.New("DEPRECATED") return errors.New("DEPRECATED")
} }
func (w *gethWakuV2Wrapper) AddStorePeer(address string) error {
return w.waku.AddStorePeer(address)
}
func (w *gethWakuV2Wrapper) AddRelayPeer(address string) error {
return w.waku.AddRelayPeer(address)
}
func (w *gethWakuV2Wrapper) Peers() map[string][]string {
return w.waku.Peers()
}
func (w *gethWakuV2Wrapper) DropPeer(peerID string) error {
return w.waku.DropPeer(peerID)
}
type wakuV2FilterWrapper struct { type wakuV2FilterWrapper struct {
filter *wakucommon.Filter filter *wakucommon.Filter
id string id string

View File

@ -13,6 +13,17 @@ type Waku interface {
// Waku protocol version // Waku protocol version
Version() uint Version() uint
// PeerCount
PeerCount() int
Peers() map[string][]string
AddStorePeer(address string) error
AddRelayPeer(address string) error
DropPeer(peerID string) error
// MinPow returns the PoW value required by this node. // MinPow returns the PoW value required by this node.
MinPow() float64 MinPow() float64
// BloomFilter returns the aggregated bloom filter for all the topics of interest. // BloomFilter returns the aggregated bloom filter for all the topics of interest.

View File

@ -566,11 +566,12 @@ func (m *Messenger) handleConnectionChange(online bool) {
} }
func (m *Messenger) online() bool { func (m *Messenger) online() bool {
// TODO: we are still missing peer management in wakuv2 switch m.transport.WakuVersion() {
if m.transport.WakuVersion() == 2 { case 2:
return true return m.transport.PeerCount() > 0
default:
return m.node.PeersCount() > 0
} }
return m.node.PeersCount() > 0
} }
func (m *Messenger) buildContactCodeAdvertisement() (*protobuf.ContactCodeAdvertisement, error) { func (m *Messenger) buildContactCodeAdvertisement() (*protobuf.ContactCodeAdvertisement, error) {

View File

@ -0,0 +1,17 @@
package protocol
func (m *Messenger) AddStorePeer(address string) error {
return m.transport.AddStorePeer(address)
}
func (m *Messenger) AddRelayPeer(address string) error {
return m.transport.AddStorePeer(address)
}
func (m *Messenger) DropPeer(peerID string) error {
return m.transport.DropPeer(peerID)
}
func (m *Messenger) Peers() map[string][]string {
return m.transport.Peers()
}

View File

@ -414,6 +414,14 @@ func (t *Transport) WakuVersion() uint {
return t.waku.Version() return t.waku.Version()
} }
func (t *Transport) PeerCount() int {
return t.waku.PeerCount()
}
func (t *Transport) Peers() map[string][]string {
return t.waku.Peers()
}
func (t *Transport) createMessagesRequestV1( func (t *Transport) createMessagesRequestV1(
ctx context.Context, ctx context.Context,
peerID []byte, peerID []byte,
@ -585,3 +593,15 @@ func (t *Transport) BloomFilter() []byte {
func PubkeyToHex(key *ecdsa.PublicKey) string { func PubkeyToHex(key *ecdsa.PublicKey) string {
return types.EncodeHex(crypto.FromECDSAPub(key)) return types.EncodeHex(crypto.FromECDSAPub(key))
} }
func (t *Transport) AddStorePeer(address string) error {
return t.waku.AddStorePeer(address)
}
func (t *Transport) AddRelayPeer(address string) error {
return t.waku.AddRelayPeer(address)
}
func (t *Transport) DropPeer(peerID string) error {
return t.waku.DropPeer(peerID)
}

View File

@ -904,6 +904,22 @@ func (api *PublicAPI) BloomFilter() string {
return hexutil.Encode(api.service.messenger.BloomFilter()) return hexutil.Encode(api.service.messenger.BloomFilter())
} }
func (api *PublicAPI) AddStorePeer(address string) error {
return api.service.messenger.AddStorePeer(address)
}
func (api *PublicAPI) AddRelayPeer(address string) error {
return api.service.messenger.AddRelayPeer(address)
}
func (api *PublicAPI) DropPeer(peerID string) error {
return api.service.messenger.DropPeer(peerID)
}
func (api *PublicAPI) Peers() map[string][]string {
return api.service.messenger.Peers()
}
// ----- // -----
// HELPER // HELPER
// ----- // -----

View File

@ -721,6 +721,11 @@ func (w *WakuNode) DialPeerByID(peerID peer.ID) error {
return w.connect(info) return w.connect(info)
} }
func (w *WakuNode) DialPeerByID(peerID peer.ID) error {
info := w.host.Peerstore().PeerInfo(peerID)
return w.host.Connect(w.ctx, info)
}
func (w *WakuNode) ClosePeerByAddress(address string) error { func (w *WakuNode) ClosePeerByAddress(address string) error {
p, err := ma.NewMultiaddr(address) p, err := ma.NewMultiaddr(address)
if err != nil { if err != nil {
@ -766,7 +771,6 @@ func (w *WakuNode) Peers() PeerStats {
} }
func (w *WakuNode) startKeepAlive(t time.Duration) { func (w *WakuNode) startKeepAlive(t time.Duration) {
log.Info("Setting up ping protocol with duration of ", t) log.Info("Setting up ping protocol with duration of ", t)
w.ping = ping.NewPingService(w.host) w.ping = ping.NewPingService(w.host)

View File

@ -29,7 +29,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/zap" "go.uber.org/zap"
@ -37,6 +37,7 @@ import (
"golang.org/x/crypto/pbkdf2" "golang.org/x/crypto/pbkdf2"
gethcommon "github.com/ethereum/go-ethereum/common" gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
@ -67,6 +68,12 @@ type settings struct {
SoftBlacklistedPeerIDs map[string]bool // SoftBlacklistedPeerIDs is a list of peer ids that we want to keep connected but silently drop any envelope from SoftBlacklistedPeerIDs map[string]bool // SoftBlacklistedPeerIDs is a list of peer ids that we want to keep connected but silently drop any envelope from
} }
type ConnStatus struct {
IsOnline bool `json:"isOnline"`
HasHistory bool `json:"hasHistory"`
Peers map[string][]string `json:"peers"`
}
// Waku represents a dark communication interface through the Ethereum // Waku represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer. // network, using its very own P2P communication layer.
type Waku struct { type Waku struct {
@ -167,7 +174,7 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) {
case <-waku.quit: case <-waku.quit:
return return
case c := <-connStatusChan: case c := <-connStatusChan:
signal.SendPeerStats(c) signal.SendPeerStats(formatConnStatus(c))
} }
} }
}() }()
@ -707,6 +714,28 @@ func (w *Waku) IsEnvelopeCached(hash gethcommon.Hash) bool {
return exist return exist
} }
func (w *Waku) PeerCount() int {
return w.node.PeerCount()
}
func (w *Waku) Peers() map[string][]string {
return FormatPeerStats(w.node.Peers())
}
func (w *Waku) AddStorePeer(address string) error {
_, err := w.node.AddStorePeer(address)
return err
}
func (w *Waku) AddRelayPeer(address string) error {
// TODO:
return nil
}
func (w *Waku) DropPeer(peerID string) error {
return w.node.ClosePeerById(peer.ID(peerID))
}
// validatePrivateKey checks the format of the given private key. // validatePrivateKey checks the format of the given private key.
func validatePrivateKey(k *ecdsa.PrivateKey) bool { func validatePrivateKey(k *ecdsa.PrivateKey) bool {
if k == nil || k.D == nil || k.D.Sign() == 0 { if k == nil || k.D == nil || k.D.Sign() == 0 {
@ -741,3 +770,19 @@ func toDeterministicID(id string, expectedLen int) (string, error) {
return id, nil return id, nil
} }
func FormatPeerStats(peers node.PeerStats) map[string][]string {
p := make(map[string][]string)
for k, v := range peers {
p[k.Pretty()] = v
}
return p
}
func formatConnStatus(c node.ConnStatus) ConnStatus {
return ConnStatus{
IsOnline: c.IsOnline,
HasHistory: c.HasHistory,
Peers: FormatPeerStats(c.Peers),
}
}