mirror of
https://github.com/status-im/status-go.git
synced 2025-01-21 20:20:29 +00:00
aa7f591587
Why make the change? As discussed previously, the way we will move across versions is to maintain completely separate codebases and eventually remove those that are not supported anymore. This has the drawback of some code duplication, but the advantage is that is more explicit what each version requires, and changes in one version will not impact the other, so we won't pile up backward compatible code. This is the same strategy used by `whisper` in go ethereum and is influenced by https://www.youtube.com/watch?v=oyLBGkS5ICk . All the code that is used for the networking protocol is now under `v0/`. Some of the common parts might still be refactored out. The main namespace `waku` deals with `host`->`waku` interactions (through RPC), while `v0` deals with `waku`->`remote-waku` interactions. In order to support `v1`, the namespace `v0` will be copied over, and changed to support `v1`. Once `v0` will be not used anymore, the whole namespace will be removed. This PR does not actually implement `v1`, I'd rather get things looked over to make sure the structure is what we would like before implementing the changes. What has changed? - Moved all code for the common parts under `waku/common/` namespace - Moved code used for bloomfilters in `waku/common/bloomfilter.go` - Removed all version specific code from `waku/common/const` (`ProtocolVersion`, status-codes etc) - Added interfaces for `WakuHost` and `Peer` under `waku/common/protocol.go` Things still to do Some tests in `waku/` are still testing by stubbing components of a particular version (`v0`). I started moving those tests to instead of stubbing using the actual component, which increases the testing surface. Some other tests that can't be easily ported should be likely moved under `v0` instead. Ideally no version specif code should be exported from a version namespace (for example the various codes, as those might change across versions). But this will be a work-in-progress. Some code that will be common in `v0`/`v1` could still be extract to avoid duplication, and duplicated only when implementations diverge across versions.
622 lines
18 KiB
Go
622 lines
18 KiB
Go
package v0
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
mapset "github.com/deckarep/golang-set"
|
|
|
|
gethcommon "github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/crypto"
|
|
"github.com/ethereum/go-ethereum/p2p"
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
|
|
"github.com/status-im/status-go/waku/common"
|
|
)
|
|
|
|
type Peer struct {
|
|
host common.WakuHost
|
|
rw p2p.MsgReadWriter
|
|
p2pPeer *p2p.Peer
|
|
logger *zap.Logger
|
|
|
|
quit chan struct{}
|
|
|
|
trusted bool
|
|
powRequirement float64
|
|
// bloomMu is to allow thread safe access to
|
|
// the bloom filter
|
|
bloomMu sync.Mutex
|
|
bloomFilter []byte
|
|
// topicInterestMu is to allow thread safe access to
|
|
// the map of topic interests
|
|
topicInterestMu sync.Mutex
|
|
topicInterest map[common.TopicType]bool
|
|
// fullNode is used to indicate that the node will be accepting any
|
|
// envelope. The opposite is an "empty node" , which is when
|
|
// a bloom filter is all 0s or topic interest is an empty map (not nil).
|
|
// In that case no envelope is accepted.
|
|
fullNode bool
|
|
confirmationsEnabled bool
|
|
rateLimitsMu sync.Mutex
|
|
rateLimits common.RateLimits
|
|
|
|
known mapset.Set // Messages already known by the peer to avoid wasting bandwidth
|
|
}
|
|
|
|
func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logger *zap.Logger) *Peer {
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
|
|
return &Peer{
|
|
host: host,
|
|
p2pPeer: p2pPeer,
|
|
logger: logger,
|
|
rw: rw,
|
|
trusted: false,
|
|
powRequirement: 0.0,
|
|
known: mapset.NewSet(),
|
|
quit: make(chan struct{}),
|
|
bloomFilter: common.MakeFullNodeBloom(),
|
|
fullNode: true,
|
|
}
|
|
}
|
|
|
|
func (p *Peer) Start() error {
|
|
if err := p.handshake(); err != nil {
|
|
return err
|
|
}
|
|
go p.update()
|
|
p.logger.Debug("starting peer", zap.Binary("peerID", p.ID()))
|
|
return nil
|
|
}
|
|
|
|
func (p *Peer) Stop() {
|
|
close(p.quit)
|
|
p.logger.Debug("stopping peer", zap.Binary("peerID", p.ID()))
|
|
}
|
|
|
|
func (p *Peer) NotifyAboutPowRequirementChange(pow float64) error {
|
|
i := math.Float64bits(pow)
|
|
return p2p.Send(p.rw, StatusUpdateCode, StatusOptions{PoWRequirement: &i})
|
|
}
|
|
|
|
func (p *Peer) NotifyAboutBloomFilterChange(bloom []byte) error {
|
|
return p2p.Send(p.rw, StatusUpdateCode, StatusOptions{BloomFilter: bloom})
|
|
}
|
|
|
|
func (p *Peer) NotifyAboutTopicInterestChange(topics []common.TopicType) error {
|
|
return p2p.Send(p.rw, StatusUpdateCode, StatusOptions{TopicInterest: topics})
|
|
}
|
|
|
|
func (p *Peer) SetPeerTrusted(trusted bool) {
|
|
p.trusted = trusted
|
|
}
|
|
|
|
func (p *Peer) RequestHistoricMessages(envelope *common.Envelope) error {
|
|
return p2p.Send(p.rw, P2PRequestCode, envelope)
|
|
}
|
|
|
|
func (p *Peer) SendMessagesRequest(request common.MessagesRequest) error {
|
|
return p2p.Send(p.rw, P2PRequestCode, request)
|
|
}
|
|
|
|
func (p *Peer) SendHistoricMessageResponse(payload []byte) error {
|
|
size, r, err := rlp.EncodeToReader(payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return p.rw.WriteMsg(p2p.Msg{Code: P2PRequestCompleteCode, Size: uint32(size), Payload: r})
|
|
|
|
}
|
|
|
|
func (p *Peer) SendP2PMessages(envelopes []*common.Envelope) error {
|
|
return p2p.Send(p.rw, P2PMessageCode, envelopes)
|
|
}
|
|
|
|
func (p *Peer) SendRawP2PDirect(envelopes []rlp.RawValue) error {
|
|
return p2p.Send(p.rw, P2PMessageCode, envelopes)
|
|
}
|
|
|
|
func (p *Peer) SetRWWriter(rw p2p.MsgReadWriter) {
|
|
p.rw = rw
|
|
}
|
|
|
|
// Mark marks an envelope known to the peer so that it won't be sent back.
|
|
func (p *Peer) Mark(envelope *common.Envelope) {
|
|
p.known.Add(envelope.Hash())
|
|
}
|
|
|
|
// Marked checks if an envelope is already known to the remote peer.
|
|
func (p *Peer) Marked(envelope *common.Envelope) bool {
|
|
return p.known.Contains(envelope.Hash())
|
|
}
|
|
|
|
func (p *Peer) BloomFilter() []byte {
|
|
p.bloomMu.Lock()
|
|
defer p.bloomMu.Unlock()
|
|
|
|
bloomFilterCopy := make([]byte, len(p.bloomFilter))
|
|
copy(bloomFilterCopy, p.bloomFilter)
|
|
return bloomFilterCopy
|
|
}
|
|
|
|
func (p *Peer) PoWRequirement() float64 {
|
|
return p.powRequirement
|
|
}
|
|
|
|
func (p *Peer) ConfirmationsEnabled() bool {
|
|
return p.confirmationsEnabled
|
|
}
|
|
|
|
// ID returns a peer's id
|
|
func (p *Peer) ID() []byte {
|
|
id := p.p2pPeer.ID()
|
|
return id[:]
|
|
}
|
|
|
|
func (p *Peer) EnodeID() enode.ID {
|
|
return p.p2pPeer.ID()
|
|
}
|
|
|
|
func (p *Peer) IP() net.IP {
|
|
return p.p2pPeer.Node().IP()
|
|
}
|
|
|
|
func (p *Peer) Run() error {
|
|
logger := p.logger.Named("Run")
|
|
|
|
for {
|
|
// fetch the next packet
|
|
packet, err := p.rw.ReadMsg()
|
|
if err != nil {
|
|
logger.Info("failed to read a message", zap.Binary("peer", p.ID()), zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
if packet.Size > p.host.MaxMessageSize() {
|
|
logger.Warn("oversize message received", zap.Binary("peer", p.ID()), zap.Uint32("size", packet.Size))
|
|
return errors.New("oversize message received")
|
|
}
|
|
|
|
if err := p.handlePacket(packet); err != nil {
|
|
logger.Warn("failed to handle packet message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
|
|
}
|
|
_ = packet.Discard()
|
|
}
|
|
}
|
|
|
|
func (p *Peer) handlePacket(packet p2p.Msg) error {
|
|
switch packet.Code {
|
|
case MessagesCode:
|
|
if err := p.handleMessagesCode(packet); err != nil {
|
|
p.logger.Warn("failed to handle MessagesCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
|
|
return err
|
|
}
|
|
case MessageResponseCode:
|
|
if err := p.handleMessageResponseCode(packet); err != nil {
|
|
p.logger.Warn("failed to handle MessageResponseCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
|
|
return err
|
|
}
|
|
case BatchAcknowledgedCode:
|
|
if err := p.handleBatchAcknowledgeCode(packet); err != nil {
|
|
p.logger.Warn("failed to handle BatchAcknowledgedCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
|
|
return err
|
|
}
|
|
case StatusUpdateCode:
|
|
if err := p.handleStatusUpdateCode(packet); err != nil {
|
|
p.logger.Warn("failed to decode status update message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
|
|
return err
|
|
}
|
|
case P2PMessageCode:
|
|
if err := p.handleP2PMessageCode(packet); err != nil {
|
|
p.logger.Warn("failed to decode direct message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
|
|
return err
|
|
}
|
|
case P2PRequestCode:
|
|
if err := p.handleP2PRequestCode(packet); err != nil {
|
|
p.logger.Warn("failed to decode p2p request message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
|
|
return err
|
|
}
|
|
case P2PRequestCompleteCode:
|
|
if err := p.handleP2PRequestCompleteCode(packet); err != nil {
|
|
p.logger.Warn("failed to decode p2p request complete message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
|
|
return err
|
|
}
|
|
default:
|
|
// New message common might be implemented in the future versions of Waku.
|
|
// For forward compatibility, just ignore.
|
|
p.logger.Debug("ignored packet with message code", zap.Uint64("code", packet.Code))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Peer) handleMessagesCode(packet p2p.Msg) error {
|
|
// decode the contained envelopes
|
|
data, err := ioutil.ReadAll(packet.Payload)
|
|
if err != nil {
|
|
common.EnvelopesRejectedCounter.WithLabelValues("failed_read").Inc()
|
|
return fmt.Errorf("failed to read packet payload: %v", err)
|
|
}
|
|
|
|
var envelopes []*common.Envelope
|
|
if err := rlp.DecodeBytes(data, &envelopes); err != nil {
|
|
common.EnvelopesRejectedCounter.WithLabelValues("invalid_data").Inc()
|
|
return fmt.Errorf("invalid payload: %v", err)
|
|
}
|
|
|
|
envelopeErrors, err := p.host.OnNewEnvelopes(envelopes, p)
|
|
|
|
if p.host.ConfirmationsEnabled() {
|
|
go p.sendConfirmation(data, envelopeErrors) // nolint: errcheck
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (p *Peer) handleMessageResponseCode(packet p2p.Msg) error {
|
|
var resp MultiVersionResponse
|
|
if err := packet.Decode(&resp); err != nil {
|
|
common.EnvelopesRejectedCounter.WithLabelValues("failed_read").Inc()
|
|
return fmt.Errorf("invalid response message: %v", err)
|
|
}
|
|
if resp.Version != 1 {
|
|
p.logger.Info("received unsupported version of MultiVersionResponse for MessageResponseCode packet", zap.Uint("version", resp.Version))
|
|
return nil
|
|
}
|
|
|
|
response, err := resp.DecodeResponse1()
|
|
if err != nil {
|
|
common.EnvelopesRejectedCounter.WithLabelValues("invalid_data").Inc()
|
|
return fmt.Errorf("failed to decode response message: %v", err)
|
|
}
|
|
|
|
return p.host.OnMessagesResponse(response, p)
|
|
}
|
|
|
|
func (p *Peer) handleP2PRequestCode(packet p2p.Msg) error {
|
|
// Must be processed if mail server is implemented. Otherwise ignore.
|
|
if !p.host.Mailserver() {
|
|
return nil
|
|
}
|
|
|
|
// Read all data as we will try to decode it possibly twice.
|
|
data, err := ioutil.ReadAll(packet.Payload)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid p2p request messages: %v", err)
|
|
}
|
|
r := bytes.NewReader(data)
|
|
packet.Payload = r
|
|
|
|
// As we failed to decode the request, let's set the offset
|
|
// to the beginning and try decode it again.
|
|
if _, err := r.Seek(0, io.SeekStart); err != nil {
|
|
return fmt.Errorf("invalid p2p request message: %v", err)
|
|
}
|
|
|
|
var request common.MessagesRequest
|
|
errReq := packet.Decode(&request)
|
|
if errReq == nil {
|
|
return p.host.OnMessagesRequest(request, p)
|
|
}
|
|
p.logger.Info("failed to decode p2p request message", zap.Binary("peer", p.ID()), zap.Error(errReq))
|
|
|
|
return errors.New("invalid p2p request message")
|
|
}
|
|
|
|
func (p *Peer) handleBatchAcknowledgeCode(packet p2p.Msg) error {
|
|
var batchHash gethcommon.Hash
|
|
if err := packet.Decode(&batchHash); err != nil {
|
|
return fmt.Errorf("invalid batch ack message: %v", err)
|
|
}
|
|
return p.host.OnBatchAcknowledged(batchHash, p)
|
|
}
|
|
|
|
func (p *Peer) handleStatusUpdateCode(packet p2p.Msg) error {
|
|
var StatusOptions StatusOptions
|
|
err := packet.Decode(&StatusOptions)
|
|
if err != nil {
|
|
p.logger.Error("failed to decode status-options", zap.Error(err))
|
|
common.EnvelopesRejectedCounter.WithLabelValues("invalid_settings_changed").Inc()
|
|
return err
|
|
}
|
|
|
|
return p.setOptions(StatusOptions)
|
|
|
|
}
|
|
|
|
func (p *Peer) handleP2PMessageCode(packet p2p.Msg) error {
|
|
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
|
|
// this message is not supposed to be forwarded to other peers, and
|
|
// therefore might not satisfy the PoW, expiry and other requirements.
|
|
// these messages are only accepted from the trusted peer.
|
|
if !p.trusted {
|
|
return nil
|
|
}
|
|
|
|
var (
|
|
envelopes []*common.Envelope
|
|
err error
|
|
)
|
|
|
|
if err = packet.Decode(&envelopes); err != nil {
|
|
return fmt.Errorf("invalid direct message payload: %v", err)
|
|
}
|
|
|
|
return p.host.OnNewP2PEnvelopes(envelopes, p)
|
|
}
|
|
|
|
func (p *Peer) handleP2PRequestCompleteCode(packet p2p.Msg) error {
|
|
if !p.trusted {
|
|
return nil
|
|
}
|
|
|
|
var payload []byte
|
|
if err := packet.Decode(&payload); err != nil {
|
|
return fmt.Errorf("invalid p2p request complete message: %v", err)
|
|
}
|
|
return p.host.OnP2PRequestCompleted(payload, p)
|
|
}
|
|
|
|
// sendConfirmation sends MessageResponseCode and BatchAcknowledgedCode messages.
|
|
func (p *Peer) sendConfirmation(data []byte, envelopeErrors []common.EnvelopeError) (err error) {
|
|
batchHash := crypto.Keccak256Hash(data)
|
|
err = p2p.Send(p.rw, MessageResponseCode, NewMessagesResponse(batchHash, envelopeErrors))
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = p2p.Send(p.rw, BatchAcknowledgedCode, batchHash) // DEPRECATED
|
|
return
|
|
}
|
|
|
|
// handshake sends the protocol initiation status message to the remote peer and
|
|
// verifies the remote status too.
|
|
func (p *Peer) handshake() error {
|
|
// Send the handshake status message asynchronously
|
|
errc := make(chan error, 1)
|
|
opts := StatusOptionsFromHost(p.host)
|
|
go func() {
|
|
errc <- p2p.SendItems(p.rw, StatusCode, Version, opts)
|
|
}()
|
|
|
|
// Fetch the remote status packet and verify protocol match
|
|
packet, err := p.rw.ReadMsg()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if packet.Code != StatusCode {
|
|
return fmt.Errorf("p [%x] sent packet %x before status packet", p.ID(), packet.Code)
|
|
}
|
|
|
|
var (
|
|
peerProtocolVersion uint64
|
|
peerOptions StatusOptions
|
|
)
|
|
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
|
|
if _, err := s.List(); err != nil {
|
|
return fmt.Errorf("p [%x]: failed to decode status packet: %v", p.ID(), err)
|
|
}
|
|
// Validate protocol version.
|
|
if err := s.Decode(&peerProtocolVersion); err != nil {
|
|
return fmt.Errorf("p [%x]: failed to decode peer protocol version: %v", p.ID(), err)
|
|
}
|
|
if peerProtocolVersion != Version {
|
|
return fmt.Errorf("p [%x]: protocol version mismatch %d != %d", p.ID(), peerProtocolVersion, Version)
|
|
}
|
|
// Decode and validate other status packet options.
|
|
if err := s.Decode(&peerOptions); err != nil {
|
|
return fmt.Errorf("p [%x]: failed to decode status options: %v", p.ID(), err)
|
|
}
|
|
if err := s.ListEnd(); err != nil {
|
|
return fmt.Errorf("p [%x]: failed to decode status packet: %v", p.ID(), err)
|
|
}
|
|
if err := p.setOptions(peerOptions.WithDefaults()); err != nil {
|
|
return fmt.Errorf("p [%x]: failed to set options: %v", p.ID(), err)
|
|
}
|
|
if err := <-errc; err != nil {
|
|
return fmt.Errorf("p [%x] failed to send status packet: %v", p.ID(), err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// update executes periodic operations on the peer, including message transmission
|
|
// and expiration.
|
|
func (p *Peer) update() {
|
|
// Start the tickers for the updates
|
|
expire := time.NewTicker(common.ExpirationCycle)
|
|
transmit := time.NewTicker(common.TransmissionCycle)
|
|
|
|
// Loop and transmit until termination is requested
|
|
for {
|
|
select {
|
|
case <-expire.C:
|
|
p.expire()
|
|
|
|
case <-transmit.C:
|
|
if err := p.broadcast(); err != nil {
|
|
p.logger.Debug("broadcasting failed", zap.Binary("peer", p.ID()), zap.Error(err))
|
|
return
|
|
}
|
|
|
|
case <-p.quit:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Peer) setOptions(peerOptions StatusOptions) error {
|
|
|
|
p.logger.Debug("settings options", zap.Binary("peerID", p.ID()), zap.Any("Options", peerOptions))
|
|
|
|
if err := peerOptions.Validate(); err != nil {
|
|
return fmt.Errorf("p [%x]: sent invalid options: %v", p.ID(), err)
|
|
}
|
|
// Validate and save peer's PoW.
|
|
pow := peerOptions.PoWRequirementF()
|
|
if pow != nil {
|
|
if math.IsInf(*pow, 0) || math.IsNaN(*pow) || *pow < 0.0 {
|
|
return fmt.Errorf("p [%x]: sent bad status message: invalid pow", p.ID())
|
|
}
|
|
p.powRequirement = *pow
|
|
}
|
|
|
|
if peerOptions.TopicInterest != nil {
|
|
p.setTopicInterest(peerOptions.TopicInterest)
|
|
} else if peerOptions.BloomFilter != nil {
|
|
// Validate and save peer's bloom filters.
|
|
bloom := peerOptions.BloomFilter
|
|
bloomSize := len(bloom)
|
|
if bloomSize != 0 && bloomSize != common.BloomFilterSize {
|
|
return fmt.Errorf("p [%x] sent bad status message: wrong bloom filter size %d", p.ID(), bloomSize)
|
|
}
|
|
p.setBloomFilter(bloom)
|
|
}
|
|
|
|
if peerOptions.LightNodeEnabled != nil {
|
|
// Validate and save other peer's options.
|
|
if *peerOptions.LightNodeEnabled && p.host.LightClientMode() && p.host.LightClientModeConnectionRestricted() {
|
|
return fmt.Errorf("p [%x] is useless: two light client communication restricted", p.ID())
|
|
}
|
|
}
|
|
if peerOptions.ConfirmationsEnabled != nil {
|
|
p.confirmationsEnabled = *peerOptions.ConfirmationsEnabled
|
|
}
|
|
if peerOptions.RateLimits != nil {
|
|
p.setRateLimits(*peerOptions.RateLimits)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// expire iterates over all the known envelopes in the host and removes all
|
|
// expired (unknown) ones from the known list.
|
|
func (p *Peer) expire() {
|
|
unmark := make(map[gethcommon.Hash]struct{})
|
|
p.known.Each(func(v interface{}) bool {
|
|
if !p.host.IsEnvelopeCached(v.(gethcommon.Hash)) {
|
|
unmark[v.(gethcommon.Hash)] = struct{}{}
|
|
}
|
|
return true
|
|
})
|
|
// Dump all known but no longer cached
|
|
for hash := range unmark {
|
|
p.known.Remove(hash)
|
|
}
|
|
}
|
|
|
|
// broadcast iterates over the collection of envelopes and transmits yet unknown
|
|
// ones over the network.
|
|
func (p *Peer) broadcast() error {
|
|
envelopes := p.host.Envelopes()
|
|
bundle := make([]*common.Envelope, 0, len(envelopes))
|
|
for _, envelope := range envelopes {
|
|
if !p.Marked(envelope) && envelope.PoW() >= p.powRequirement && p.topicOrBloomMatch(envelope) {
|
|
bundle = append(bundle, envelope)
|
|
}
|
|
}
|
|
|
|
if len(bundle) == 0 {
|
|
return nil
|
|
}
|
|
|
|
batchHash, err := sendBundle(p.rw, bundle)
|
|
if err != nil {
|
|
p.logger.Debug("failed to deliver envelopes", zap.Binary("peer", p.ID()), zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// mark envelopes only if they were successfully sent
|
|
for _, e := range bundle {
|
|
p.Mark(e)
|
|
event := common.EnvelopeEvent{
|
|
Event: common.EventEnvelopeSent,
|
|
Hash: e.Hash(),
|
|
Peer: p.EnodeID(),
|
|
}
|
|
if p.confirmationsEnabled {
|
|
event.Batch = batchHash
|
|
}
|
|
p.host.SendEnvelopeEvent(event)
|
|
}
|
|
p.logger.Debug("broadcasted bundles successfully", zap.Binary("peer", p.ID()), zap.Int("count", len(bundle)))
|
|
return nil
|
|
}
|
|
|
|
func (p *Peer) setBloomFilter(bloom []byte) {
|
|
p.bloomMu.Lock()
|
|
defer p.bloomMu.Unlock()
|
|
p.bloomFilter = bloom
|
|
p.fullNode = common.IsFullNode(bloom)
|
|
if p.fullNode && p.bloomFilter == nil {
|
|
p.bloomFilter = common.MakeFullNodeBloom()
|
|
}
|
|
p.topicInterest = nil
|
|
}
|
|
|
|
func (p *Peer) setTopicInterest(topicInterest []common.TopicType) {
|
|
p.topicInterestMu.Lock()
|
|
defer p.topicInterestMu.Unlock()
|
|
if topicInterest == nil {
|
|
p.topicInterest = nil
|
|
return
|
|
}
|
|
p.topicInterest = make(map[common.TopicType]bool)
|
|
for _, topic := range topicInterest {
|
|
p.topicInterest[topic] = true
|
|
}
|
|
p.fullNode = false
|
|
p.bloomFilter = nil
|
|
}
|
|
|
|
func (p *Peer) setRateLimits(r common.RateLimits) {
|
|
p.rateLimitsMu.Lock()
|
|
p.rateLimits = r
|
|
p.rateLimitsMu.Unlock()
|
|
}
|
|
|
|
// topicOrBloomMatch matches against topic-interest if topic interest
|
|
// is not nil. Otherwise it will match against the bloom-filter.
|
|
// If the bloom-filter is nil, or full, the node is considered a full-node
|
|
// and any envelope will be accepted. An empty topic-interest (but not nil)
|
|
// signals that we are not interested in any envelope.
|
|
func (p *Peer) topicOrBloomMatch(env *common.Envelope) bool {
|
|
p.topicInterestMu.Lock()
|
|
topicInterestMode := p.topicInterest != nil
|
|
p.topicInterestMu.Unlock()
|
|
|
|
if topicInterestMode {
|
|
return p.topicInterestMatch(env)
|
|
}
|
|
return p.bloomMatch(env)
|
|
}
|
|
|
|
func (p *Peer) topicInterestMatch(env *common.Envelope) bool {
|
|
p.topicInterestMu.Lock()
|
|
defer p.topicInterestMu.Unlock()
|
|
|
|
if p.topicInterest == nil {
|
|
return false
|
|
}
|
|
|
|
return p.topicInterest[env.Topic]
|
|
}
|
|
|
|
func (p *Peer) bloomMatch(env *common.Envelope) bool {
|
|
p.bloomMu.Lock()
|
|
defer p.bloomMu.Unlock()
|
|
return p.fullNode || common.BloomFilterMatch(p.bloomFilter, env.Bloom())
|
|
}
|