status-go/waku/waku.go
Andrea Maria Piana 86e0ec8e10 Log disconnect/connect event
Marketing was relying on mailserver entries for checking the time a
give peer spent on the app.

This was not accurate as the assumption was that a peer would "ping" a
mailserver every 15s, which is not the case, it only hit the mailserver
as it comes from an "offline" state.

This commit changes the log level of two entries so that we have
connect/disconnect time for a given peer, which should be enough to
calculate roughly the time a peer has been online if connected to our
fleet.
2020-11-03 15:58:26 +01:00

1559 lines
45 KiB
Go

// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
import (
"bytes"
"crypto/ecdsa"
"crypto/sha256"
"errors"
"fmt"
"runtime"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"go.uber.org/zap"
mapset "github.com/deckarep/golang-set"
"golang.org/x/crypto/pbkdf2"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/waku/common"
v0 "github.com/status-im/status-go/waku/v0"
v1 "github.com/status-im/status-go/waku/v1"
)
const messageQueueLimit = 1024
type Bridge interface {
Pipe() (<-chan *common.Envelope, chan<- *common.Envelope)
}
type settings struct {
MaxMsgSize uint32 // Maximal message length allowed by the waku node
EnableConfirmations bool // Enable sending message confirmations
MinPow float64 // Minimal PoW required by the waku node
MinPowTolerance float64 // Minimal PoW tolerated by the waku node for a limited time
BloomFilter []byte // Bloom filter for topics of interest for this node
BloomFilterTolerance []byte // Bloom filter tolerated by the waku node for a limited time
TopicInterest map[common.TopicType]bool // Topic interest for this node
TopicInterestTolerance map[common.TopicType]bool // Topic interest tolerated by the waku node for a limited time
SoftBlacklistedPeerIDs map[string]bool // SoftBlacklistedPeerIDs is a list of peer ids that we want to keep connected but silently drop any envelope from
BloomFilterMode bool // Whether we should match against bloom-filter only
LightClient bool // Light client mode enabled does not forward messages
RestrictLightClientsConn bool // Restrict connection between two light clients
SyncAllowance int // Maximum time in seconds allowed to process the waku-related messages
FullNode bool // Whether this is to be run in FullNode settings
}
// Waku represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer.
type Waku struct {
protocols []p2p.Protocol // Peer description and parameters
filters *common.Filters // Message filters installed with Subscribe function
privateKeys map[string]*ecdsa.PrivateKey // Private key storage
symKeys map[string][]byte // Symmetric key storage
keyMu sync.RWMutex // Mutex associated with key stores
envelopes map[gethcommon.Hash]*common.Envelope // Pool of envelopes currently tracked by this node
expirations map[uint32]mapset.Set // Message expiration pool
poolMu sync.RWMutex // Mutex to sync the message and expiration pools
peers map[common.Peer]struct{} // Set of currently active peers
peerMu sync.RWMutex // Mutex to sync the active peer set
msgQueue chan *common.Envelope // Message queue for normal waku messages
p2pMsgQueue chan interface{} // Message queue for peer-to-peer messages (not to be forwarded any further) and history delivery confirmations.
quit chan struct{} // Channel used for graceful exit
settings settings // Holds configuration settings that can be dynamically changed
settingsMu sync.RWMutex // Mutex to sync the settings access
mailServer MailServer
rateLimiter *common.PeerRateLimiter
envelopeFeed event.Feed
timeSource func() time.Time // source of time for waku
bridge Bridge
bridgeWg sync.WaitGroup
cancelBridge chan struct{}
logger *zap.Logger
}
// New creates a Waku client ready to communicate through the Ethereum P2P network.
func New(cfg *Config, logger *zap.Logger) *Waku {
if logger == nil {
logger = zap.NewNop()
}
logger.Debug("starting waku with config", zap.Any("config", cfg))
if cfg == nil {
c := DefaultConfig
cfg = &c
}
waku := &Waku{
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[gethcommon.Hash]*common.Envelope),
expirations: make(map[uint32]mapset.Set),
peers: make(map[common.Peer]struct{}),
msgQueue: make(chan *common.Envelope, messageQueueLimit),
p2pMsgQueue: make(chan interface{}, messageQueueLimit),
quit: make(chan struct{}),
timeSource: time.Now,
logger: logger,
}
waku.settings = settings{
MaxMsgSize: cfg.MaxMessageSize,
MinPow: cfg.MinimumAcceptedPoW,
MinPowTolerance: cfg.MinimumAcceptedPoW,
EnableConfirmations: cfg.EnableConfirmations,
LightClient: cfg.LightClient,
FullNode: cfg.FullNode,
BloomFilterMode: cfg.BloomFilterMode,
SoftBlacklistedPeerIDs: make(map[string]bool),
RestrictLightClientsConn: cfg.RestrictLightClientsConn,
SyncAllowance: common.DefaultSyncAllowance,
}
for _, peerID := range cfg.SoftBlacklistedPeerIDs {
waku.settings.SoftBlacklistedPeerIDs[peerID] = true
}
if cfg.FullNode {
waku.settings.BloomFilter = common.MakeFullNodeBloom()
waku.settings.BloomFilterTolerance = common.MakeFullNodeBloom()
}
waku.filters = common.NewFilters()
// p2p waku sub-protocol handler
waku.protocols = []p2p.Protocol{{
Name: v0.Name,
Version: uint(v0.Version),
Length: v0.NumberOfMessageCodes,
Run: waku.handlePeerV0,
NodeInfo: func() interface{} {
return map[string]interface{}{
"version": v0.VersionStr,
"maxMessageSize": waku.MaxMessageSize(),
"minimumPoW": waku.MinPow(),
}
},
},
{
Name: v1.Name,
Version: uint(v1.Version),
Length: v1.NumberOfMessageCodes,
Run: waku.handlePeerV1,
NodeInfo: func() interface{} {
return map[string]interface{}{
"version": v1.VersionStr,
"maxMessageSize": waku.MaxMessageSize(),
"minimumPoW": waku.MinPow(),
}
},
},
}
return waku
}
// MinPow returns the PoW value required by this node.
func (w *Waku) MinPow() float64 {
w.settingsMu.RLock()
defer w.settingsMu.RUnlock()
return w.settings.MinPow
}
// SetMinimumPoW sets the minimal PoW required by this node
func (w *Waku) SetMinimumPoW(val float64, tolerate bool) error {
if val < 0.0 {
return fmt.Errorf("invalid PoW: %f", val)
}
w.settingsMu.Lock()
w.settings.MinPow = val
w.settingsMu.Unlock()
w.notifyPeersAboutPowRequirementChange(val)
if tolerate {
go func() {
// allow some time before all the peers have processed the notification
select {
case <-w.quit:
return
case <-time.After(time.Duration(w.settings.SyncAllowance) * time.Second):
w.settingsMu.Lock()
w.settings.MinPowTolerance = val
w.settingsMu.Unlock()
}
}()
}
return nil
}
// MinPowTolerance returns the value of minimum PoW which is tolerated for a limited
// time after PoW was changed. If sufficient time have elapsed or no change of PoW
// have ever occurred, the return value will be the same as return value of MinPow().
func (w *Waku) MinPowTolerance() float64 {
w.settingsMu.RLock()
defer w.settingsMu.RUnlock()
return w.settings.MinPowTolerance
}
// BloomFilter returns the aggregated bloom filter for all the topics of interest.
// The nodes are required to send only messages that match the advertised bloom filter.
// If a message does not match the bloom, it will tantamount to spam, and the peer will
// be disconnected.
func (w *Waku) BloomFilter() []byte {
if w.FullNode() {
return common.MakeFullNodeBloom()
}
w.settingsMu.RLock()
defer w.settingsMu.RUnlock()
return w.settings.BloomFilter
}
// BloomFilterTolerance returns the bloom filter which is tolerated for a limited
// time after new bloom was advertised to the peers. If sufficient time have elapsed
// or no change of bloom filter have ever occurred, the return value will be the same
// as return value of BloomFilter().
func (w *Waku) BloomFilterTolerance() []byte {
if w.FullNode() {
return common.MakeFullNodeBloom()
}
w.settingsMu.RLock()
defer w.settingsMu.RUnlock()
return w.settings.BloomFilterTolerance
}
// BloomFilterMode returns whether the node is running in bloom filter mode
func (w *Waku) BloomFilterMode() bool {
if w.FullNode() {
return true
}
w.settingsMu.RLock()
defer w.settingsMu.RUnlock()
return w.settings.BloomFilterMode
}
// SetBloomFilter sets the new bloom filter
func (w *Waku) SetBloomFilter(bloom []byte) error {
if len(bloom) != common.BloomFilterSize {
return fmt.Errorf("invalid bloom filter size: %d", len(bloom))
}
b := make([]byte, common.BloomFilterSize)
copy(b, bloom)
w.settingsMu.Lock()
w.settings.BloomFilter = b
// Setting bloom filter reset topic interest
w.settings.TopicInterest = nil
w.settingsMu.Unlock()
w.notifyPeersAboutBloomFilterChange(b)
go func() {
// allow some time before all the peers have processed the notification
select {
case <-w.quit:
return
case <-time.After(time.Duration(w.settings.SyncAllowance) * time.Second):
w.settingsMu.Lock()
w.settings.BloomFilterTolerance = b
w.settingsMu.Unlock()
}
}()
return nil
}
// TopicInterest returns the all the topics of interest.
// The nodes are required to send only messages that match the advertised topics.
// If a message does not match the topic-interest, it will tantamount to spam, and the peer will
// be disconnected.
func (w *Waku) TopicInterest() []common.TopicType {
w.settingsMu.RLock()
defer w.settingsMu.RUnlock()
// Return nil if FullNode as otherwise topic interest will have precedence
if w.settings.FullNode || w.settings.TopicInterest == nil {
return nil
}
topicInterest := make([]common.TopicType, len(w.settings.TopicInterest))
i := 0
for topic := range w.settings.TopicInterest {
topicInterest[i] = topic
i++
}
return topicInterest
}
// updateTopicInterest adds a new topic interest
// and informs the peers
func (w *Waku) updateTopicInterest(f *common.Filter) error {
newTopicInterest := w.TopicInterest()
for _, t := range f.Topics {
top := common.BytesToTopic(t)
newTopicInterest = append(newTopicInterest, top)
}
return w.SetTopicInterest(newTopicInterest)
}
// SetTopicInterest sets the new topicInterest
func (w *Waku) SetTopicInterest(topicInterest []common.TopicType) error {
var topicInterestMap map[common.TopicType]bool
if len(topicInterest) > common.MaxTopicInterest {
return fmt.Errorf("invalid topic interest: %d", len(topicInterest))
}
if topicInterest != nil {
topicInterestMap = make(map[common.TopicType]bool, len(topicInterest))
for _, topic := range topicInterest {
topicInterestMap[topic] = true
}
}
w.settingsMu.Lock()
w.settings.TopicInterest = topicInterestMap
// Setting topic interest resets bloom filter
w.settings.BloomFilter = nil
w.settingsMu.Unlock()
w.notifyPeersAboutTopicInterestChange(topicInterest)
go func() {
// allow some time before all the peers have processed the notification
select {
case <-w.quit:
return
case <-time.After(time.Duration(w.settings.SyncAllowance) * time.Second):
w.settingsMu.Lock()
w.settings.TopicInterestTolerance = topicInterestMap
w.settingsMu.Unlock()
}
}()
return nil
}
// MaxMessageSize returns the maximum accepted message size.
func (w *Waku) MaxMessageSize() uint32 {
w.settingsMu.RLock()
defer w.settingsMu.RUnlock()
return w.settings.MaxMsgSize
}
// SetMaxMessageSize sets the maximal message size allowed by this node
func (w *Waku) SetMaxMessageSize(size uint32) error {
if size > common.MaxMessageSize {
return fmt.Errorf("message size too large [%d>%d]", size, common.MaxMessageSize)
}
w.settingsMu.Lock()
w.settings.MaxMsgSize = size
w.settingsMu.Unlock()
return nil
}
// LightClientMode indicates is this node is light client (does not forward any messages)
func (w *Waku) LightClientMode() bool {
w.settingsMu.RLock()
defer w.settingsMu.RUnlock()
return w.settings.LightClient
}
// SetLightClientMode makes node light client (does not forward any messages)
func (w *Waku) SetLightClientMode(v bool) {
w.settingsMu.Lock()
w.settings.LightClient = v
w.settingsMu.Unlock()
}
// LightClientModeConnectionRestricted indicates that connection to light client in light client mode not allowed
func (w *Waku) LightClientModeConnectionRestricted() bool {
w.settingsMu.RLock()
defer w.settingsMu.RUnlock()
return w.settings.RestrictLightClientsConn
}
// PacketRateLimiting returns RateLimits information for packets
func (w *Waku) PacketRateLimits() common.RateLimits {
if w.rateLimiter == nil {
return common.RateLimits{}
}
return common.RateLimits{
IPLimits: uint64(w.rateLimiter.PacketLimitPerSecIP),
PeerIDLimits: uint64(w.rateLimiter.PacketLimitPerSecPeerID),
}
}
// BytesRateLimiting returns RateLimits information for bytes
func (w *Waku) BytesRateLimits() common.RateLimits {
if w.rateLimiter == nil {
return common.RateLimits{}
}
return common.RateLimits{
IPLimits: uint64(w.rateLimiter.BytesLimitPerSecIP),
PeerIDLimits: uint64(w.rateLimiter.BytesLimitPerSecPeerID),
}
}
// ConfirmationsEnabled returns true if message confirmations are enabled.
func (w *Waku) ConfirmationsEnabled() bool {
w.settingsMu.RLock()
defer w.settingsMu.RUnlock()
return w.settings.EnableConfirmations
}
// CurrentTime returns current time.
func (w *Waku) CurrentTime() time.Time {
return w.timeSource()
}
// SetTimeSource assigns a particular source of time to a waku object.
func (w *Waku) SetTimeSource(timesource func() time.Time) {
w.timeSource = timesource
}
// APIs returns the RPC descriptors the Waku implementation offers
func (w *Waku) APIs() []rpc.API {
return []rpc.API{
{
Namespace: v0.Name,
Version: v0.VersionStr,
Service: NewPublicWakuAPI(w),
Public: false,
},
}
}
// Protocols returns the waku sub-protocols ran by this particular client.
func (w *Waku) Protocols() []p2p.Protocol {
return w.protocols
}
// RegisterMailServer registers MailServer interface.
// MailServer will process all the incoming messages with p2pRequestCode.
func (w *Waku) RegisterMailServer(server MailServer) {
w.mailServer = server
}
// SetRateLimiter registers a rate limiter.
func (w *Waku) RegisterRateLimiter(r *common.PeerRateLimiter) {
w.rateLimiter = r
}
// RegisterBridge registers a new Bridge that moves envelopes
// between different subprotocols.
// It's important that a bridge is registered before the service
// is started, otherwise, it won't read and propagate envelopes.
func (w *Waku) RegisterBridge(b Bridge) {
if w.cancelBridge != nil {
close(w.cancelBridge)
}
w.bridge = b
w.cancelBridge = make(chan struct{})
w.bridgeWg.Add(1)
go w.readBridgeLoop()
}
func (w *Waku) readBridgeLoop() {
defer w.bridgeWg.Done()
out, _ := w.bridge.Pipe()
for {
select {
case <-w.cancelBridge:
return
case env := <-out:
_, err := w.addAndBridge(env, false, true)
if err != nil {
common.BridgeReceivedFailed.Inc()
w.logger.Warn(
"failed to add a bridged envelope",
zap.Binary("ID", env.Hash().Bytes()),
zap.Error(err),
)
} else {
common.BridgeReceivedSucceed.Inc()
w.logger.Debug("bridged envelope successfully", zap.Binary("ID", env.Hash().Bytes()))
w.envelopeFeed.Send(common.EnvelopeEvent{
Event: common.EventEnvelopeReceived,
Topic: env.Topic,
Hash: env.Hash(),
})
}
}
}
}
func (w *Waku) SendEnvelopeEvent(event common.EnvelopeEvent) int {
return w.envelopeFeed.Send(event)
}
// SubscribeEnvelopeEvents subscribes to envelopes feed.
// In order to prevent blocking waku producers events must be amply buffered.
func (w *Waku) SubscribeEnvelopeEvents(events chan<- common.EnvelopeEvent) event.Subscription {
return w.envelopeFeed.Subscribe(events)
}
func (w *Waku) notifyPeersAboutPowRequirementChange(pow float64) {
arr := w.getPeers()
for _, p := range arr {
err := p.NotifyAboutPowRequirementChange(pow)
if err != nil {
// allow one retry
err = p.NotifyAboutPowRequirementChange(pow)
}
if err != nil {
w.logger.Warn("failed to notify peer about new pow requirement", zap.Binary("peer", p.ID()), zap.Error(err))
}
}
}
func (w *Waku) FullNode() bool {
w.settingsMu.RLock()
// If full node, nothing to do
fullNode := w.settings.FullNode
w.settingsMu.RUnlock()
return fullNode
}
func (w *Waku) notifyPeersAboutBloomFilterChange(bloom []byte) {
if w.FullNode() {
return
}
arr := w.getPeers()
for _, p := range arr {
err := p.NotifyAboutBloomFilterChange(bloom)
if err != nil {
// allow one retry
err = p.NotifyAboutBloomFilterChange(bloom)
}
if err != nil {
w.logger.Warn("failed to notify peer about new bloom filter change", zap.Binary("peer", p.ID()), zap.Error(err))
}
}
}
func (w *Waku) notifyPeersAboutTopicInterestChange(topicInterest []common.TopicType) {
if w.FullNode() {
return
}
arr := w.getPeers()
for _, p := range arr {
err := p.NotifyAboutTopicInterestChange(topicInterest)
if err != nil {
// allow one retry
err = p.NotifyAboutTopicInterestChange(topicInterest)
}
if err != nil {
w.logger.Warn("failed to notify peer about new topic interest", zap.Binary("peer", p.ID()), zap.Error(err))
}
}
}
func (w *Waku) getPeers() []common.Peer {
arr := make([]common.Peer, len(w.peers))
i := 0
w.peerMu.Lock()
for p := range w.peers {
arr[i] = p
i++
}
w.peerMu.Unlock()
return arr
}
// getPeer retrieves peer by ID
func (w *Waku) getPeer(peerID []byte) (common.Peer, error) {
w.peerMu.Lock()
defer w.peerMu.Unlock()
for p := range w.peers {
if bytes.Equal(peerID, p.ID()) {
return p, nil
}
}
return nil, fmt.Errorf("could not find peer with ID: %x", peerID)
}
// AllowP2PMessagesFromPeer marks specific peer trusted,
// which will allow it to send historic (expired) messages.
func (w *Waku) AllowP2PMessagesFromPeer(peerID []byte) error {
p, err := w.getPeer(peerID)
if err != nil {
return err
}
p.SetPeerTrusted(true)
return nil
}
// RequestHistoricMessages sends a message with p2pRequestCode to a specific peer,
// which is known to implement MailServer interface, and is supposed to process this
// request and respond with a number of peer-to-peer messages (possibly expired),
// which are not supposed to be forwarded any further.
// The waku protocol is agnostic of the format and contents of envelope.
func (w *Waku) RequestHistoricMessages(peerID []byte, envelope *common.Envelope) error {
return w.RequestHistoricMessagesWithTimeout(peerID, envelope, 0)
}
// RequestHistoricMessagesWithTimeout acts as RequestHistoricMessages but requires to pass a timeout.
// It sends an event EventMailServerRequestExpired after the timeout.
func (w *Waku) RequestHistoricMessagesWithTimeout(peerID []byte, envelope *common.Envelope, timeout time.Duration) error {
p, err := w.getPeer(peerID)
if err != nil {
return err
}
p.SetPeerTrusted(true)
w.envelopeFeed.Send(common.EnvelopeEvent{
Peer: p.EnodeID(),
Topic: envelope.Topic,
Hash: envelope.Hash(),
Event: common.EventMailServerRequestSent,
})
err = p.RequestHistoricMessages(envelope)
if timeout != 0 {
go w.expireRequestHistoricMessages(p.EnodeID(), envelope.Hash(), timeout)
}
return err
}
func (w *Waku) SendMessagesRequest(peerID []byte, request common.MessagesRequest) error {
if err := request.Validate(); err != nil {
return err
}
p, err := w.getPeer(peerID)
if err != nil {
return err
}
p.SetPeerTrusted(true)
if err := p.SendMessagesRequest(request); err != nil {
return err
}
w.envelopeFeed.Send(common.EnvelopeEvent{
Peer: p.EnodeID(),
Hash: gethcommon.BytesToHash(request.ID),
Event: common.EventMailServerRequestSent,
})
return nil
}
func (w *Waku) expireRequestHistoricMessages(peer enode.ID, hash gethcommon.Hash, timeout time.Duration) {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-w.quit:
return
case <-timer.C:
w.envelopeFeed.Send(common.EnvelopeEvent{
Peer: peer,
Hash: hash,
Event: common.EventMailServerRequestExpired,
})
}
}
func (w *Waku) SendHistoricMessageResponse(peerID []byte, payload []byte) error {
peer, err := w.getPeer(peerID)
if err != nil {
return err
}
return peer.SendHistoricMessageResponse(payload)
}
// SendP2PMessage sends a peer-to-peer message to a specific peer.
// It sends one or more envelopes in a single batch.
func (w *Waku) SendP2PMessages(peerID []byte, envelopes ...*common.Envelope) error {
p, err := w.getPeer(peerID)
if err != nil {
return err
}
return p.SendP2PMessages(envelopes)
}
// SendRawP2PDirect sends a peer-to-peer message to a specific peer.
// It sends one or more envelopes in a single batch.
func (w *Waku) SendRawP2PDirect(peerID []byte, envelopes ...rlp.RawValue) error {
p, err := w.getPeer(peerID)
if err != nil {
return err
}
return p.SendRawP2PDirect(envelopes)
}
// NewKeyPair generates a new cryptographic identity for the client, and injects
// it into the known identities for message decryption. Returns ID of the new key pair.
func (w *Waku) NewKeyPair() (string, error) {
key, err := crypto.GenerateKey()
if err != nil || !validatePrivateKey(key) {
key, err = crypto.GenerateKey() // retry once
}
if err != nil {
return "", err
}
if !validatePrivateKey(key) {
return "", fmt.Errorf("failed to generate valid key")
}
id, err := toDeterministicID(hexutil.Encode(crypto.FromECDSAPub(&key.PublicKey)), common.KeyIDSize)
if err != nil {
return "", err
}
w.keyMu.Lock()
defer w.keyMu.Unlock()
if w.privateKeys[id] != nil {
return "", fmt.Errorf("failed to generate unique ID")
}
w.privateKeys[id] = key
return id, nil
}
// DeleteKeyPair deletes the specified key if it exists.
func (w *Waku) DeleteKeyPair(key string) bool {
deterministicID, err := toDeterministicID(key, common.KeyIDSize)
if err != nil {
return false
}
w.keyMu.Lock()
defer w.keyMu.Unlock()
if w.privateKeys[deterministicID] != nil {
delete(w.privateKeys, deterministicID)
return true
}
return false
}
// AddKeyPair imports a asymmetric private key and returns it identifier.
func (w *Waku) AddKeyPair(key *ecdsa.PrivateKey) (string, error) {
id, err := makeDeterministicID(hexutil.Encode(crypto.FromECDSAPub(&key.PublicKey)), common.KeyIDSize)
if err != nil {
return "", err
}
if w.HasKeyPair(id) {
return id, nil // no need to re-inject
}
w.keyMu.Lock()
w.privateKeys[id] = key
w.keyMu.Unlock()
return id, nil
}
// SelectKeyPair adds cryptographic identity, and makes sure
// that it is the only private key known to the node.
func (w *Waku) SelectKeyPair(key *ecdsa.PrivateKey) error {
id, err := makeDeterministicID(hexutil.Encode(crypto.FromECDSAPub(&key.PublicKey)), common.KeyIDSize)
if err != nil {
return err
}
w.keyMu.Lock()
defer w.keyMu.Unlock()
w.privateKeys = make(map[string]*ecdsa.PrivateKey) // reset key store
w.privateKeys[id] = key
return nil
}
// DeleteKeyPairs removes all cryptographic identities known to the node
func (w *Waku) DeleteKeyPairs() error {
w.keyMu.Lock()
defer w.keyMu.Unlock()
w.privateKeys = make(map[string]*ecdsa.PrivateKey)
return nil
}
// HasKeyPair checks if the waku node is configured with the private key
// of the specified public pair.
func (w *Waku) HasKeyPair(id string) bool {
deterministicID, err := toDeterministicID(id, common.KeyIDSize)
if err != nil {
return false
}
w.keyMu.RLock()
defer w.keyMu.RUnlock()
return w.privateKeys[deterministicID] != nil
}
// GetPrivateKey retrieves the private key of the specified identity.
func (w *Waku) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) {
deterministicID, err := toDeterministicID(id, common.KeyIDSize)
if err != nil {
return nil, err
}
w.keyMu.RLock()
defer w.keyMu.RUnlock()
key := w.privateKeys[deterministicID]
if key == nil {
return nil, fmt.Errorf("invalid id")
}
return key, nil
}
// GenerateSymKey generates a random symmetric key and stores it under id,
// which is then returned. Will be used in the future for session key exchange.
func (w *Waku) GenerateSymKey() (string, error) {
key, err := common.GenerateSecureRandomData(common.AESKeyLength)
if err != nil {
return "", err
} else if !common.ValidateDataIntegrity(key, common.AESKeyLength) {
return "", fmt.Errorf("error in GenerateSymKey: crypto/rand failed to generate random data")
}
id, err := common.GenerateRandomID()
if err != nil {
return "", fmt.Errorf("failed to generate ID: %s", err)
}
w.keyMu.Lock()
defer w.keyMu.Unlock()
if w.symKeys[id] != nil {
return "", fmt.Errorf("failed to generate unique ID")
}
w.symKeys[id] = key
return id, nil
}
// AddSymKey stores the key with a given id.
func (w *Waku) AddSymKey(id string, key []byte) (string, error) {
deterministicID, err := toDeterministicID(id, common.KeyIDSize)
if err != nil {
return "", err
}
w.keyMu.Lock()
defer w.keyMu.Unlock()
if w.symKeys[deterministicID] != nil {
return "", fmt.Errorf("key already exists: %v", id)
}
w.symKeys[deterministicID] = key
return deterministicID, nil
}
// AddSymKeyDirect stores the key, and returns its id.
func (w *Waku) AddSymKeyDirect(key []byte) (string, error) {
if len(key) != common.AESKeyLength {
return "", fmt.Errorf("wrong key size: %d", len(key))
}
id, err := common.GenerateRandomID()
if err != nil {
return "", fmt.Errorf("failed to generate ID: %s", err)
}
w.keyMu.Lock()
defer w.keyMu.Unlock()
if w.symKeys[id] != nil {
return "", fmt.Errorf("failed to generate unique ID")
}
w.symKeys[id] = key
return id, nil
}
// AddSymKeyFromPassword generates the key from password, stores it, and returns its id.
func (w *Waku) AddSymKeyFromPassword(password string) (string, error) {
id, err := common.GenerateRandomID()
if err != nil {
return "", fmt.Errorf("failed to generate ID: %s", err)
}
if w.HasSymKey(id) {
return "", fmt.Errorf("failed to generate unique ID")
}
// kdf should run no less than 0.1 seconds on an average computer,
// because it's an once in a session experience
derived := pbkdf2.Key([]byte(password), nil, 65356, common.AESKeyLength, sha256.New)
w.keyMu.Lock()
defer w.keyMu.Unlock()
// double check is necessary, because deriveKeyMaterial() is very slow
if w.symKeys[id] != nil {
return "", fmt.Errorf("critical error: failed to generate unique ID")
}
w.symKeys[id] = derived
return id, nil
}
// HasSymKey returns true if there is a key associated with the given id.
// Otherwise returns false.
func (w *Waku) HasSymKey(id string) bool {
w.keyMu.RLock()
defer w.keyMu.RUnlock()
return w.symKeys[id] != nil
}
// DeleteSymKey deletes the key associated with the name string if it exists.
func (w *Waku) DeleteSymKey(id string) bool {
w.keyMu.Lock()
defer w.keyMu.Unlock()
if w.symKeys[id] != nil {
delete(w.symKeys, id)
return true
}
return false
}
// GetSymKey returns the symmetric key associated with the given id.
func (w *Waku) GetSymKey(id string) ([]byte, error) {
w.keyMu.RLock()
defer w.keyMu.RUnlock()
if w.symKeys[id] != nil {
return w.symKeys[id], nil
}
return nil, fmt.Errorf("non-existent key ID")
}
// Subscribe installs a new message handler used for filtering, decrypting
// and subsequent storing of incoming messages.
func (w *Waku) Subscribe(f *common.Filter) (string, error) {
s, err := w.filters.Install(f)
if err != nil {
return s, err
}
err = w.updateSettingsForFilter(f)
if err != nil {
w.filters.Uninstall(s)
return s, err
}
return s, nil
}
func (w *Waku) updateSettingsForFilter(f *common.Filter) error {
w.settingsMu.RLock()
topicInterestMode := !w.settings.BloomFilterMode
w.settingsMu.RUnlock()
if topicInterestMode {
err := w.updateTopicInterest(f)
if err != nil {
return err
}
} else {
err := w.updateBloomFilter(f)
if err != nil {
return err
}
}
return nil
}
// updateBloomFilter recalculates the new value of bloom filter,
// and informs the peers if necessary.
func (w *Waku) updateBloomFilter(f *common.Filter) error {
aggregate := make([]byte, common.BloomFilterSize)
for _, t := range f.Topics {
top := common.BytesToTopic(t)
b := top.ToBloom()
aggregate = addBloom(aggregate, b)
}
if !common.BloomFilterMatch(w.BloomFilter(), aggregate) {
// existing bloom filter must be updated
aggregate = addBloom(w.BloomFilter(), aggregate)
return w.SetBloomFilter(aggregate)
}
return nil
}
// GetFilter returns the filter by id.
func (w *Waku) GetFilter(id string) *common.Filter {
return w.filters.Get(id)
}
// Unsubscribe removes an installed message handler.
// TODO: This does not seem to update the bloom filter, nor topic-interest
// Note that the filter/topic-interest needs to take into account that there
// might be filters with duplicated topics, so it's not just a matter of removing
// from the map, in the topic-interest case, while the bloom filter might need to
// be rebuilt from scratch
func (w *Waku) Unsubscribe(id string) error {
ok := w.filters.Uninstall(id)
if !ok {
return fmt.Errorf("failed to unsubscribe: invalid ID '%s'", id)
}
return nil
}
// Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles.
func (w *Waku) Send(envelope *common.Envelope) error {
ok, err := w.add(envelope, false)
if err == nil && !ok {
return fmt.Errorf("failed to add envelope")
}
return err
}
// Start implements node.Service, starting the background data propagation thread
// of the Waku protocol.
func (w *Waku) Start(*p2p.Server) error {
go w.update()
numCPU := runtime.NumCPU()
for i := 0; i < numCPU; i++ {
go w.processQueue()
}
go w.processP2P()
return nil
}
// Stop implements node.Service, stopping the background data propagation thread
// of the Waku protocol.
func (w *Waku) Stop() error {
if w.cancelBridge != nil {
close(w.cancelBridge)
w.cancelBridge = nil
w.bridgeWg.Wait()
}
close(w.quit)
return nil
}
func (w *Waku) handlePeerV0(p2pPeer *p2p.Peer, rw p2p.MsgReadWriter) error {
return w.HandlePeer(v0.NewPeer(w, p2pPeer, rw, w.logger.Named("waku/peerv0")), rw)
}
func (w *Waku) handlePeerV1(p2pPeer *p2p.Peer, rw p2p.MsgReadWriter) error {
return w.HandlePeer(v1.NewPeer(w, p2pPeer, rw, w.logger.Named("waku/peerv1")), rw)
}
// HandlePeer is called by the underlying P2P layer when the waku sub-protocol
// connection is negotiated.
func (w *Waku) HandlePeer(peer common.Peer, rw p2p.MsgReadWriter) error {
w.peerMu.Lock()
w.peers[peer] = struct{}{}
w.peerMu.Unlock()
w.logger.Info("handling peer", zap.String("peerID", types.EncodeHex(peer.ID())))
defer func() {
w.peerMu.Lock()
delete(w.peers, peer)
w.peerMu.Unlock()
}()
if err := peer.Start(); err != nil {
return err
}
defer peer.Stop()
if w.rateLimiter != nil {
runLoop := func(out p2p.MsgReadWriter) error {
peer.SetRWWriter(out)
return peer.Run()
}
return w.rateLimiter.Decorate(peer, rw, runLoop)
}
err := peer.Run()
w.logger.Info("handled peer", zap.String("peerID", types.EncodeHex(peer.ID())), zap.Error(err))
return err
}
func (w *Waku) softBlacklisted(peerID string) bool {
w.settingsMu.RLock()
defer w.settingsMu.RUnlock()
return w.settings.SoftBlacklistedPeerIDs[peerID]
}
func (w *Waku) OnNewEnvelopes(envelopes []*common.Envelope, peer common.Peer) ([]common.EnvelopeError, error) {
envelopeErrors := make([]common.EnvelopeError, 0)
peerID := types.EncodeHex(peer.ID())
w.logger.Debug("received new envelopes", zap.Int("count", len(envelopes)), zap.String("peer", peerID))
trouble := false
if w.softBlacklisted(peerID) {
w.logger.Debug("peer is soft blacklisted", zap.String("peer", peerID))
return nil, nil
}
for _, env := range envelopes {
w.logger.Debug("received new envelope", zap.String("peer", peerID), zap.String("hash", env.Hash().Hex()))
cached, err := w.add(env, w.LightClientMode())
if err != nil {
_, isTimeSyncError := err.(common.TimeSyncError)
if !isTimeSyncError {
trouble = true
w.logger.Info("invalid envelope received", zap.String("peer", types.EncodeHex(peer.ID())), zap.Error(err))
}
envelopeErrors = append(envelopeErrors, common.ErrorToEnvelopeError(env.Hash(), err))
} else if cached {
peer.Mark(env)
}
w.envelopeFeed.Send(common.EnvelopeEvent{
Event: common.EventEnvelopeReceived,
Topic: env.Topic,
Hash: env.Hash(),
Peer: peer.EnodeID(),
})
common.EnvelopesValidatedCounter.Inc()
}
if trouble {
return envelopeErrors, errors.New("received invalid envelope")
}
return envelopeErrors, nil
}
func (w *Waku) OnNewP2PEnvelopes(envelopes []*common.Envelope) error {
for _, envelope := range envelopes {
w.postP2P(envelope)
}
return nil
}
func (w *Waku) Mailserver() bool {
return w.mailServer != nil
}
func (w *Waku) OnMessagesRequest(request common.MessagesRequest, p common.Peer) error {
w.mailServer.Deliver(p.ID(), request)
return nil
}
func (w *Waku) OnDeprecatedMessagesRequest(request *common.Envelope, p common.Peer) error {
w.mailServer.DeliverMail(p.ID(), request)
return nil
}
func (w *Waku) OnP2PRequestCompleted(payload []byte, p common.Peer) error {
msEvent, err := CreateMailServerEvent(p.EnodeID(), payload)
if err != nil {
return fmt.Errorf("invalid p2p request complete payload: %v", err)
}
w.postP2P(*msEvent)
return nil
}
func (w *Waku) OnMessagesResponse(response common.MessagesResponse, p common.Peer) error {
w.envelopeFeed.Send(common.EnvelopeEvent{
Batch: response.Hash,
Event: common.EventBatchAcknowledged,
Peer: p.EnodeID(),
Data: response.Errors,
})
return nil
}
func (w *Waku) OnBatchAcknowledged(batchHash gethcommon.Hash, p common.Peer) error {
w.envelopeFeed.Send(common.EnvelopeEvent{
Batch: batchHash,
Event: common.EventBatchAcknowledged,
Peer: p.EnodeID(),
})
return nil
}
func (w *Waku) add(envelope *common.Envelope, isP2P bool) (bool, error) {
return w.addAndBridge(envelope, isP2P, false)
}
func (w *Waku) bloomMatch(envelope *common.Envelope) (bool, error) {
if !common.BloomFilterMatch(w.BloomFilter(), envelope.Bloom()) {
// maybe the value was recently changed, and the peers did not adjust yet.
// in this case the previous value is retrieved by BloomFilterTolerance()
// for a short period of peer synchronization.
if !common.BloomFilterMatch(w.BloomFilterTolerance(), envelope.Bloom()) {
common.EnvelopesCacheFailedCounter.WithLabelValues("no_bloom_match").Inc()
return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x",
envelope.Hash().Hex(), w.BloomFilter(), envelope.Bloom(), envelope.Topic)
}
}
return true, nil
}
func (w *Waku) topicInterestMatch(envelope *common.Envelope) (bool, error) {
w.settingsMu.RLock()
defer w.settingsMu.RUnlock()
if w.settings.TopicInterest == nil {
return false, nil
}
if !w.settings.TopicInterest[envelope.Topic] {
if !w.settings.TopicInterestTolerance[envelope.Topic] {
common.EnvelopesCacheFailedCounter.WithLabelValues("no_topic_interest_match").Inc()
return false, fmt.Errorf("envelope does not match topic interest, hash=[%v], bloom: \n%x \n%x",
envelope.Hash().Hex(), envelope.Bloom(), envelope.Topic)
}
}
return true, nil
}
func (w *Waku) topicInterestOrBloomMatch(envelope *common.Envelope) (bool, error) {
if w.FullNode() {
return true, nil
}
w.settingsMu.RLock()
topicInterestMode := !w.settings.BloomFilterMode
w.settingsMu.RUnlock()
if topicInterestMode {
match, err := w.topicInterestMatch(envelope)
if err != nil {
return false, err
}
if match {
return true, nil
}
}
return w.bloomMatch(envelope)
}
func (w *Waku) SetBloomFilterMode(mode bool) {
w.settingsMu.Lock()
w.settings.BloomFilterMode = mode
w.settingsMu.Unlock()
// Recalculate and notify topic interest or bloom, currently not implemented
}
func (w *Waku) SetFullNode(set bool) {
w.settingsMu.Lock()
w.settings.FullNode = set
w.settingsMu.Unlock()
// We advertise the topic interest if full node has been disabled
// or bloom filter if enabled, as that's how we indicate to a peer we are a full node or not
if set {
w.notifyPeersAboutBloomFilterChange(w.BloomFilter())
} else {
w.notifyPeersAboutTopicInterestChange(w.TopicInterest())
}
}
// addEnvelope adds an envelope to the envelope map, used for sending
func (w *Waku) addEnvelope(envelope *common.Envelope) {
hash := envelope.Hash()
w.poolMu.Lock()
w.envelopes[hash] = envelope
if w.expirations[envelope.Expiry] == nil {
w.expirations[envelope.Expiry] = mapset.NewThreadUnsafeSet()
}
if !w.expirations[envelope.Expiry].Contains(hash) {
w.expirations[envelope.Expiry].Add(hash)
}
w.poolMu.Unlock()
}
// addAndBridge inserts a new envelope into the message pool to be distributed within the
// waku network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp. In case of error, connection should be dropped.
// param isP2P indicates whether the message is peer-to-peer (should not be forwarded).
func (w *Waku) addAndBridge(envelope *common.Envelope, isP2P bool, bridged bool) (bool, error) {
now := uint32(w.timeSource().Unix())
sent := envelope.Expiry - envelope.TTL
common.EnvelopesReceivedCounter.Inc()
if sent > now {
if sent-common.DefaultSyncAllowance > now {
common.EnvelopesCacheFailedCounter.WithLabelValues("in_future").Inc()
log.Warn("envelope created in the future", "hash", envelope.Hash())
return false, common.TimeSyncError(errors.New("envelope from future"))
}
// recalculate PoW, adjusted for the time difference, plus one second for latency
envelope.CalculatePoW(sent - now + 1)
}
if envelope.Expiry < now {
if envelope.Expiry+common.DefaultSyncAllowance*2 < now {
common.EnvelopesCacheFailedCounter.WithLabelValues("very_old").Inc()
log.Warn("very old envelope", "hash", envelope.Hash())
return false, common.TimeSyncError(errors.New("very old envelope"))
}
log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex())
common.EnvelopesCacheFailedCounter.WithLabelValues("expired").Inc()
return false, nil // drop envelope without error
}
if uint32(envelope.Size()) > w.MaxMessageSize() {
common.EnvelopesCacheFailedCounter.WithLabelValues("oversized").Inc()
return false, fmt.Errorf("huge messages are not allowed [%x][%d][%d]", envelope.Hash(), envelope.Size(), w.MaxMessageSize())
}
if envelope.PoW() < w.MinPow() {
// maybe the value was recently changed, and the peers did not adjust yet.
// in this case the previous value is retrieved by MinPowTolerance()
// for a short period of peer synchronization.
if envelope.PoW() < w.MinPowTolerance() {
common.EnvelopesCacheFailedCounter.WithLabelValues("low_pow").Inc()
return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
}
}
match, err := w.topicInterestOrBloomMatch(envelope)
if err != nil {
return false, err
}
if !match {
return false, nil
}
hash := envelope.Hash()
w.poolMu.Lock()
_, alreadyCached := w.envelopes[hash]
w.poolMu.Unlock()
if !alreadyCached {
w.addEnvelope(envelope)
}
if alreadyCached {
log.Trace("w envelope already cached", "hash", envelope.Hash().Hex())
common.EnvelopesCachedCounter.WithLabelValues("hit").Inc()
} else {
log.Trace("cached w envelope", "hash", envelope.Hash().Hex())
common.EnvelopesCachedCounter.WithLabelValues("miss").Inc()
common.EnvelopesSizeMeter.Observe(float64(envelope.Size()))
w.postEvent(envelope, isP2P) // notify the local node about the new message
if w.mailServer != nil {
w.mailServer.Archive(envelope)
w.envelopeFeed.Send(common.EnvelopeEvent{
Topic: envelope.Topic,
Hash: envelope.Hash(),
Event: common.EventMailServerEnvelopeArchived,
})
}
// Bridge only envelopes that are not p2p messages.
// In particular, if a node is a lightweight node,
// it should not bridge any envelopes.
if !isP2P && !bridged && w.bridge != nil {
log.Debug("bridging envelope from Waku", "hash", envelope.Hash().Hex())
_, in := w.bridge.Pipe()
in <- envelope
common.BridgeSent.Inc()
}
}
return true, nil
}
func (w *Waku) postP2P(event interface{}) {
w.p2pMsgQueue <- event
}
// postEvent queues the message for further processing.
func (w *Waku) postEvent(envelope *common.Envelope, isP2P bool) {
if isP2P {
w.postP2P(envelope)
} else {
w.msgQueue <- envelope
}
}
// processQueue delivers the messages to the watchers during the lifetime of the waku node.
func (w *Waku) processQueue() {
for {
select {
case <-w.quit:
return
case e := <-w.msgQueue:
w.filters.NotifyWatchers(e, false)
w.envelopeFeed.Send(common.EnvelopeEvent{
Topic: e.Topic,
Hash: e.Hash(),
Event: common.EventEnvelopeAvailable,
})
}
}
}
func (w *Waku) processP2P() {
for {
select {
case <-w.quit:
return
case e := <-w.p2pMsgQueue:
switch evn := e.(type) {
case *common.Envelope:
w.filters.NotifyWatchers(evn, true)
w.envelopeFeed.Send(common.EnvelopeEvent{
Topic: evn.Topic,
Hash: evn.Hash(),
Event: common.EventEnvelopeAvailable,
})
case common.EnvelopeEvent:
w.envelopeFeed.Send(evn)
}
}
}
}
// update loops until the lifetime of the waku node, updating its internal
// state by expiring stale messages from the pool.
func (w *Waku) update() {
// Start a ticker to check for expirations
expire := time.NewTicker(common.ExpirationCycle)
// Repeat updates until termination is requested
for {
select {
case <-expire.C:
w.expire()
case <-w.quit:
return
}
}
}
// expire iterates over all the expiration timestamps, removing all stale
// messages from the pools.
func (w *Waku) expire() {
w.poolMu.Lock()
defer w.poolMu.Unlock()
now := uint32(w.timeSource().Unix())
for expiry, hashSet := range w.expirations {
if expiry < now {
// Dump all expired messages and remove timestamp
hashSet.Each(func(v interface{}) bool {
delete(w.envelopes, v.(gethcommon.Hash))
common.EnvelopesCachedCounter.WithLabelValues("clear").Inc()
w.envelopeFeed.Send(common.EnvelopeEvent{
Hash: v.(gethcommon.Hash),
Event: common.EventEnvelopeExpired,
})
return false
})
w.expirations[expiry].Clear()
delete(w.expirations, expiry)
}
}
}
// Envelopes retrieves all the messages currently pooled by the node.
func (w *Waku) Envelopes() []*common.Envelope {
w.poolMu.RLock()
defer w.poolMu.RUnlock()
all := make([]*common.Envelope, 0, len(w.envelopes))
for _, envelope := range w.envelopes {
all = append(all, envelope)
}
return all
}
// GetEnvelope retrieves an envelope from the message queue by its hash.
// It returns nil if the envelope can not be found.
func (w *Waku) GetEnvelope(hash gethcommon.Hash) *common.Envelope {
w.poolMu.RLock()
defer w.poolMu.RUnlock()
return w.envelopes[hash]
}
// isEnvelopeCached checks if envelope with specific hash has already been received and cached.
func (w *Waku) IsEnvelopeCached(hash gethcommon.Hash) bool {
w.poolMu.Lock()
defer w.poolMu.Unlock()
_, exist := w.envelopes[hash]
return exist
}
// validatePrivateKey checks the format of the given private key.
func validatePrivateKey(k *ecdsa.PrivateKey) bool {
if k == nil || k.D == nil || k.D.Sign() == 0 {
return false
}
return common.ValidatePublicKey(&k.PublicKey)
}
// makeDeterministicID generates a deterministic ID, based on a given input
func makeDeterministicID(input string, keyLen int) (id string, err error) {
buf := pbkdf2.Key([]byte(input), nil, 4096, keyLen, sha256.New)
if !common.ValidateDataIntegrity(buf, common.KeyIDSize) {
return "", fmt.Errorf("error in GenerateDeterministicID: failed to generate key")
}
id = gethcommon.Bytes2Hex(buf)
return id, err
}
// toDeterministicID reviews incoming id, and transforms it to format
// expected internally be private key store. Originally, public keys
// were used as keys, now random keys are being used. And in order to
// make it easier to consume, we now allow both random IDs and public
// keys to be passed.
func toDeterministicID(id string, expectedLen int) (string, error) {
if len(id) != (expectedLen * 2) { // we received hex key, so number of chars in id is doubled
var err error
id, err = makeDeterministicID(id, expectedLen)
if err != nil {
return "", err
}
}
return id, nil
}
func addBloom(a, b []byte) []byte {
c := make([]byte, common.BloomFilterSize)
for i := 0; i < common.BloomFilterSize; i++ {
c[i] = a[i] | b[i]
}
return c
}