Create waku/1 namespace

This commit creates a waku/1 namespace and adds the code to it.
No changes in the protocol have been made (i.e waku/1 is the same as
waku/1 in this commit).
This commit is contained in:
Andrea Maria Piana 2020-04-29 20:05:09 +02:00
parent b025db235f
commit 8aa42e4148
18 changed files with 1954 additions and 882 deletions

View File

@ -53,6 +53,8 @@ type Peer interface {
// WakuHost is the local instance of waku, which both interacts with remote clients
// (peers) and local clients (through RPC API)
type WakuHost interface {
// HandlePeer handles the connection of a new peer
HandlePeer(Peer, p2p.MsgReadWriter) error
// MaxMessageSize returns the maximum accepted message size.
MaxMessageSize() uint32
// LightClientMode returns whether the host is running in light client mode

View File

@ -36,9 +36,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/status-im/status-go/waku/common"
v0 "github.com/status-im/status-go/waku/v0"
"github.com/stretchr/testify/require"
)
var keys = []string{
@ -372,7 +369,7 @@ func sendMsg(t *testing.T, expected bool, id int) {
msg, err := common.NewSentMessage(&opt)
if err != nil {
t.Fatalf("failed to create new message with seed %d: %s.", seed, err)
t.Fatalf("failed to create new message: %s.", err)
}
envelope, err := msg.Wrap(&opt, time.Now())
if err != nil {
@ -385,31 +382,6 @@ func sendMsg(t *testing.T, expected bool, id int) {
}
}
func TestPeerBasic(t *testing.T) {
InitSingleTest()
params, err := generateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d.", seed)
}
params.PoW = 0.001
msg, err := common.NewSentMessage(params)
if err != nil {
t.Fatalf("failed to create new message with seed %d: %s.", seed, err)
}
env, err := msg.Wrap(params, time.Now())
if err != nil {
t.Fatalf("failed Wrap with seed %d.", seed)
}
p := v0.NewPeer(nil, nil, nil, nil)
p.Mark(env)
if !p.Marked(env) {
t.Fatalf("failed mark with seed %d.", seed)
}
}
func checkPowExchangeForNodeZero(t *testing.T) {
const iterations = 200
for j := 0; j < iterations; j++ {
@ -499,85 +471,3 @@ func waitForServersToStart(t *testing.T) {
}
t.Fatalf("Failed to start all the servers, running: %d", started)
}
//two generic waku node handshake
func TestPeerHandshakeWithTwoFullNode(t *testing.T) {
rw1, rw2 := p2p.MsgPipe()
defer rw1.Close()
defer rw2.Close()
w1 := New(nil, nil)
var pow float64 = 0.1
err := w1.SetMinimumPoW(pow, true)
if err != nil {
t.Fatal(err)
}
w2 := New(nil, nil)
go w1.HandlePeer(p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1) // nolint: errcheck
p2 := v0.NewPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
err = p2.Start()
if err != nil {
t.Fatal(err)
}
require.Equal(t, pow, p2.PoWRequirement())
}
//two generic waku node handshake. one don't send light flag
func TestHandshakeWithOldVersionWithoutLightModeFlag(t *testing.T) {
rw1, rw2 := p2p.MsgPipe()
defer rw1.Close()
defer rw2.Close()
w1 := New(nil, nil)
w1.SetLightClientMode(true)
w2 := New(nil, nil)
go w1.HandlePeer(p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1) // nolint: errcheck
p2 := v0.NewPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
err := p2.Start()
if err != nil {
t.Fatal(err)
}
}
//two light nodes handshake. restriction enable
func TestTwoLightPeerHandshakeRestrictionOff(t *testing.T) {
rw1, rw2 := p2p.MsgPipe()
defer rw1.Close()
defer rw2.Close()
w1 := New(nil, nil)
w1.SetLightClientMode(true)
w1.settings.RestrictLightClientsConn = false
w2 := New(nil, nil)
w2.SetLightClientMode(true)
w2.settings.RestrictLightClientsConn = false
go w1.HandlePeer(p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1) // nolint: errcheck
p2 := v0.NewPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
require.NoError(t, p2.Start())
}
//two light nodes handshake. restriction enabled
func TestTwoLightPeerHandshakeError(t *testing.T) {
rw1, rw2 := p2p.MsgPipe()
w1 := New(nil, nil)
w1.SetLightClientMode(true)
w1.settings.RestrictLightClientsConn = true
w2 := New(nil, nil)
w2.SetLightClientMode(true)
w2.settings.RestrictLightClientsConn = true
go w1.HandlePeer(p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1) // nolint: errcheck
p2 := v0.NewPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
require.Error(t, p2.Start())
}

6
waku/v0/README.md Normal file
View File

@ -0,0 +1,6 @@
### waku/0
This namespace implements `waku` 0.6 as described in https://github.com/vacp2p/specs/blob/master/specs/waku/waku-0.md and it
is now deprecated. Now feature changes will be made only critical bugfixes if required.
Once support for `waku/0` can be dropped, this code can be safely removed.

View File

@ -7,13 +7,13 @@ const (
Name = "waku" // Nickname of the protocol
// Waku protocol message codes, according to https://github.com/vacp2p/specs/blob/master/specs/waku/waku-0.md
StatusCode = 0 // used in the handshake
MessagesCode = 1 // regular message
StatusUpdateCode = 22 // update of settings
BatchAcknowledgedCode = 11 // confirmation that batch of envelopes was received
MessageResponseCode = 12 // includes confirmation for delivery and information about errors
P2PRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol
P2PRequestCode = 126 // peer-to-peer message, used by Dapp protocol
P2PMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
statusCode = 0 // used in the handshake
messagesCode = 1 // regular message
statusUpdateCode = 22 // update of settings
batchAcknowledgedCode = 11 // confirmation that batch of envelopes was received
messageResponseCode = 12 // includes confirmation for delivery and information about errors
p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol
p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol
p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
NumberOfMessageCodes = 128
)

View File

@ -44,7 +44,7 @@ func sendBundle(rw p2p.MsgWriter, bundle []*common.Envelope) (rst gethcommon.Has
return
}
err = rw.WriteMsg(p2p.Msg{
Code: MessagesCode,
Code: messagesCode,
Size: uint32(len(data)),
Payload: bytes.NewBuffer(data),
})

View File

@ -54,7 +54,7 @@ type Peer struct {
known mapset.Set // Messages already known by the peer to avoid wasting bandwidth
}
func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logger *zap.Logger) *Peer {
func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logger *zap.Logger) common.Peer {
if logger == nil {
logger = zap.NewNop()
}
@ -89,15 +89,15 @@ func (p *Peer) Stop() {
func (p *Peer) NotifyAboutPowRequirementChange(pow float64) error {
i := math.Float64bits(pow)
return p2p.Send(p.rw, StatusUpdateCode, StatusOptions{PoWRequirement: &i})
return p2p.Send(p.rw, statusUpdateCode, StatusOptions{PoWRequirement: &i})
}
func (p *Peer) NotifyAboutBloomFilterChange(bloom []byte) error {
return p2p.Send(p.rw, StatusUpdateCode, StatusOptions{BloomFilter: bloom})
return p2p.Send(p.rw, statusUpdateCode, StatusOptions{BloomFilter: bloom})
}
func (p *Peer) NotifyAboutTopicInterestChange(topics []common.TopicType) error {
return p2p.Send(p.rw, StatusUpdateCode, StatusOptions{TopicInterest: topics})
return p2p.Send(p.rw, statusUpdateCode, StatusOptions{TopicInterest: topics})
}
func (p *Peer) SetPeerTrusted(trusted bool) {
@ -105,11 +105,11 @@ func (p *Peer) SetPeerTrusted(trusted bool) {
}
func (p *Peer) RequestHistoricMessages(envelope *common.Envelope) error {
return p2p.Send(p.rw, P2PRequestCode, envelope)
return p2p.Send(p.rw, p2pRequestCode, envelope)
}
func (p *Peer) SendMessagesRequest(request common.MessagesRequest) error {
return p2p.Send(p.rw, P2PRequestCode, request)
return p2p.Send(p.rw, p2pRequestCode, request)
}
func (p *Peer) SendHistoricMessageResponse(payload []byte) error {
@ -118,16 +118,16 @@ func (p *Peer) SendHistoricMessageResponse(payload []byte) error {
return err
}
return p.rw.WriteMsg(p2p.Msg{Code: P2PRequestCompleteCode, Size: uint32(size), Payload: r})
return p.rw.WriteMsg(p2p.Msg{Code: p2pRequestCompleteCode, Size: uint32(size), Payload: r})
}
func (p *Peer) SendP2PMessages(envelopes []*common.Envelope) error {
return p2p.Send(p.rw, P2PMessageCode, envelopes)
return p2p.Send(p.rw, p2pMessageCode, envelopes)
}
func (p *Peer) SendRawP2PDirect(envelopes []rlp.RawValue) error {
return p2p.Send(p.rw, P2PMessageCode, envelopes)
return p2p.Send(p.rw, p2pMessageCode, envelopes)
}
func (p *Peer) SetRWWriter(rw p2p.MsgReadWriter) {
@ -200,37 +200,37 @@ func (p *Peer) Run() error {
func (p *Peer) handlePacket(packet p2p.Msg) error {
switch packet.Code {
case MessagesCode:
case messagesCode:
if err := p.handleMessagesCode(packet); err != nil {
p.logger.Warn("failed to handle MessagesCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
p.logger.Warn("failed to handle messagesCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case MessageResponseCode:
case messageResponseCode:
if err := p.handleMessageResponseCode(packet); err != nil {
p.logger.Warn("failed to handle MessageResponseCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
p.logger.Warn("failed to handle messageResponseCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case BatchAcknowledgedCode:
case batchAcknowledgedCode:
if err := p.handleBatchAcknowledgeCode(packet); err != nil {
p.logger.Warn("failed to handle BatchAcknowledgedCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
p.logger.Warn("failed to handle batchAcknowledgedCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case StatusUpdateCode:
case statusUpdateCode:
if err := p.handleStatusUpdateCode(packet); err != nil {
p.logger.Warn("failed to decode status update message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case P2PMessageCode:
case p2pMessageCode:
if err := p.handleP2PMessageCode(packet); err != nil {
p.logger.Warn("failed to decode direct message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case P2PRequestCode:
case p2pRequestCode:
if err := p.handleP2PRequestCode(packet); err != nil {
p.logger.Warn("failed to decode p2p request message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case P2PRequestCompleteCode:
case p2pRequestCompleteCode:
if err := p.handleP2PRequestCompleteCode(packet); err != nil {
p.logger.Warn("failed to decode p2p request complete message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
@ -274,7 +274,7 @@ func (p *Peer) handleMessageResponseCode(packet p2p.Msg) error {
return fmt.Errorf("invalid response message: %v", err)
}
if resp.Version != 1 {
p.logger.Info("received unsupported version of MultiVersionResponse for MessageResponseCode packet", zap.Uint("version", resp.Version))
p.logger.Info("received unsupported version of MultiVersionResponse for messageResponseCode packet", zap.Uint("version", resp.Version))
return nil
}
@ -371,14 +371,14 @@ func (p *Peer) handleP2PRequestCompleteCode(packet p2p.Msg) error {
return p.host.OnP2PRequestCompleted(payload, p)
}
// sendConfirmation sends MessageResponseCode and BatchAcknowledgedCode messages.
// sendConfirmation sends messageResponseCode and batchAcknowledgedCode messages.
func (p *Peer) sendConfirmation(data []byte, envelopeErrors []common.EnvelopeError) (err error) {
batchHash := crypto.Keccak256Hash(data)
err = p2p.Send(p.rw, MessageResponseCode, NewMessagesResponse(batchHash, envelopeErrors))
err = p2p.Send(p.rw, messageResponseCode, NewMessagesResponse(batchHash, envelopeErrors))
if err != nil {
return
}
err = p2p.Send(p.rw, BatchAcknowledgedCode, batchHash) // DEPRECATED
err = p2p.Send(p.rw, batchAcknowledgedCode, batchHash) // DEPRECATED
return
}
@ -389,7 +389,7 @@ func (p *Peer) handshake() error {
errc := make(chan error, 1)
opts := StatusOptionsFromHost(p.host)
go func() {
errc <- p2p.SendItems(p.rw, StatusCode, Version, opts)
errc <- p2p.SendItems(p.rw, statusCode, Version, opts)
}()
// Fetch the remote status packet and verify protocol match
@ -397,7 +397,7 @@ func (p *Peer) handshake() error {
if err != nil {
return err
}
if packet.Code != StatusCode {
if packet.Code != statusCode {
return fmt.Errorf("p [%x] sent packet %x before status packet", p.ID(), packet.Code)
}

4
waku/v1/README.md Normal file
View File

@ -0,0 +1,4 @@
### waku/1
This namespace implements `waku` 1.0 as described in https://github.com/vacp2p/specs/blob/master/specs/waku/waku-1.md.

19
waku/v1/const.go Normal file
View File

@ -0,0 +1,19 @@
package v1
// Waku protocol parameters
const (
Version = uint64(0) // Peer version number
VersionStr = "0" // The same, as a string
Name = "waku" // Nickname of the protocol
// Waku protocol message codes, according to https://github.com/vacp2p/specs/blob/master/specs/waku/waku-0.md
statusCode = 0 // used in the handshake
messagesCode = 1 // regular message
statusUpdateCode = 22 // update of settings
batchAcknowledgedCode = 11 // confirmation that batch of envelopes was received
messageResponseCode = 12 // includes confirmation for delivery and information about errors
p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol
p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol
p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
NumberOfMessageCodes = 128
)

5
waku/v1/init.go Normal file
View File

@ -0,0 +1,5 @@
package v1
func init() {
initRLPKeyFields()
}

55
waku/v1/message.go Normal file
View File

@ -0,0 +1,55 @@
package v1
import (
"bytes"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/waku/common"
)
// MultiVersionResponse allows to decode response into chosen version.
type MultiVersionResponse struct {
Version uint
Response rlp.RawValue
}
// DecodeResponse1 decodes response into first version of the messages response.
func (m MultiVersionResponse) DecodeResponse1() (resp common.MessagesResponse, err error) {
return resp, rlp.DecodeBytes(m.Response, &resp)
}
// Version1MessageResponse first version of the message response.
type Version1MessageResponse struct {
Version uint
Response common.MessagesResponse
}
// NewMessagesResponse returns instance of the version messages response.
func NewMessagesResponse(batch gethcommon.Hash, errors []common.EnvelopeError) Version1MessageResponse {
return Version1MessageResponse{
Version: 1,
Response: common.MessagesResponse{
Hash: batch,
Errors: errors,
},
}
}
func sendBundle(rw p2p.MsgWriter, bundle []*common.Envelope) (rst gethcommon.Hash, err error) {
data, err := rlp.EncodeToBytes(bundle)
if err != nil {
return
}
err = rw.WriteMsg(p2p.Msg{
Code: messagesCode,
Size: uint32(len(data)),
Payload: bytes.NewBuffer(data),
})
if err != nil {
return
}
return crypto.Keccak256Hash(data), nil
}

42
waku/v1/message_test.go Normal file
View File

@ -0,0 +1,42 @@
// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package v1
import (
"testing"
"github.com/stretchr/testify/require"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/waku/common"
)
func TestEncodeDecodeVersionedResponse(t *testing.T) {
response := NewMessagesResponse(gethcommon.Hash{1}, []common.EnvelopeError{{Code: 1}})
bytes, err := rlp.EncodeToBytes(response)
require.NoError(t, err)
var mresponse MultiVersionResponse
require.NoError(t, rlp.DecodeBytes(bytes, &mresponse))
v1resp, err := mresponse.DecodeResponse1()
require.NoError(t, err)
require.Equal(t, response.Response.Hash, v1resp.Hash)
}

621
waku/v1/peer.go Normal file
View File

@ -0,0 +1,621 @@
package v1
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"net"
"sync"
"time"
"go.uber.org/zap"
mapset "github.com/deckarep/golang-set"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/waku/common"
)
type Peer struct {
host common.WakuHost
rw p2p.MsgReadWriter
p2pPeer *p2p.Peer
logger *zap.Logger
quit chan struct{}
trusted bool
powRequirement float64
// bloomMu is to allow thread safe access to
// the bloom filter
bloomMu sync.Mutex
bloomFilter []byte
// topicInterestMu is to allow thread safe access to
// the map of topic interests
topicInterestMu sync.Mutex
topicInterest map[common.TopicType]bool
// fullNode is used to indicate that the node will be accepting any
// envelope. The opposite is an "empty node" , which is when
// a bloom filter is all 0s or topic interest is an empty map (not nil).
// In that case no envelope is accepted.
fullNode bool
confirmationsEnabled bool
rateLimitsMu sync.Mutex
rateLimits common.RateLimits
known mapset.Set // Messages already known by the peer to avoid wasting bandwidth
}
func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logger *zap.Logger) common.Peer {
if logger == nil {
logger = zap.NewNop()
}
return &Peer{
host: host,
p2pPeer: p2pPeer,
logger: logger,
rw: rw,
trusted: false,
powRequirement: 0.0,
known: mapset.NewSet(),
quit: make(chan struct{}),
bloomFilter: common.MakeFullNodeBloom(),
fullNode: true,
}
}
func (p *Peer) Start() error {
if err := p.handshake(); err != nil {
return err
}
go p.update()
p.logger.Debug("starting peer", zap.Binary("peerID", p.ID()))
return nil
}
func (p *Peer) Stop() {
close(p.quit)
p.logger.Debug("stopping peer", zap.Binary("peerID", p.ID()))
}
func (p *Peer) NotifyAboutPowRequirementChange(pow float64) error {
i := math.Float64bits(pow)
return p2p.Send(p.rw, statusUpdateCode, StatusOptions{PoWRequirement: &i})
}
func (p *Peer) NotifyAboutBloomFilterChange(bloom []byte) error {
return p2p.Send(p.rw, statusUpdateCode, StatusOptions{BloomFilter: bloom})
}
func (p *Peer) NotifyAboutTopicInterestChange(topics []common.TopicType) error {
return p2p.Send(p.rw, statusUpdateCode, StatusOptions{TopicInterest: topics})
}
func (p *Peer) SetPeerTrusted(trusted bool) {
p.trusted = trusted
}
func (p *Peer) RequestHistoricMessages(envelope *common.Envelope) error {
return p2p.Send(p.rw, p2pRequestCode, envelope)
}
func (p *Peer) SendMessagesRequest(request common.MessagesRequest) error {
return p2p.Send(p.rw, p2pRequestCode, request)
}
func (p *Peer) SendHistoricMessageResponse(payload []byte) error {
size, r, err := rlp.EncodeToReader(payload)
if err != nil {
return err
}
return p.rw.WriteMsg(p2p.Msg{Code: p2pRequestCompleteCode, Size: uint32(size), Payload: r})
}
func (p *Peer) SendP2PMessages(envelopes []*common.Envelope) error {
return p2p.Send(p.rw, p2pMessageCode, envelopes)
}
func (p *Peer) SendRawP2PDirect(envelopes []rlp.RawValue) error {
return p2p.Send(p.rw, p2pMessageCode, envelopes)
}
func (p *Peer) SetRWWriter(rw p2p.MsgReadWriter) {
p.rw = rw
}
// Mark marks an envelope known to the peer so that it won't be sent back.
func (p *Peer) Mark(envelope *common.Envelope) {
p.known.Add(envelope.Hash())
}
// Marked checks if an envelope is already known to the remote peer.
func (p *Peer) Marked(envelope *common.Envelope) bool {
return p.known.Contains(envelope.Hash())
}
func (p *Peer) BloomFilter() []byte {
p.bloomMu.Lock()
defer p.bloomMu.Unlock()
bloomFilterCopy := make([]byte, len(p.bloomFilter))
copy(bloomFilterCopy, p.bloomFilter)
return bloomFilterCopy
}
func (p *Peer) PoWRequirement() float64 {
return p.powRequirement
}
func (p *Peer) ConfirmationsEnabled() bool {
return p.confirmationsEnabled
}
// ID returns a peer's id
func (p *Peer) ID() []byte {
id := p.p2pPeer.ID()
return id[:]
}
func (p *Peer) EnodeID() enode.ID {
return p.p2pPeer.ID()
}
func (p *Peer) IP() net.IP {
return p.p2pPeer.Node().IP()
}
func (p *Peer) Run() error {
logger := p.logger.Named("Run")
for {
// fetch the next packet
packet, err := p.rw.ReadMsg()
if err != nil {
logger.Info("failed to read a message", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
if packet.Size > p.host.MaxMessageSize() {
logger.Warn("oversize message received", zap.Binary("peer", p.ID()), zap.Uint32("size", packet.Size))
return errors.New("oversize message received")
}
if err := p.handlePacket(packet); err != nil {
logger.Warn("failed to handle packet message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
}
_ = packet.Discard()
}
}
func (p *Peer) handlePacket(packet p2p.Msg) error {
switch packet.Code {
case messagesCode:
if err := p.handleMessagesCode(packet); err != nil {
p.logger.Warn("failed to handle messagesCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case messageResponseCode:
if err := p.handleMessageResponseCode(packet); err != nil {
p.logger.Warn("failed to handle messageResponseCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case batchAcknowledgedCode:
if err := p.handleBatchAcknowledgeCode(packet); err != nil {
p.logger.Warn("failed to handle batchAcknowledgedCode message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case statusUpdateCode:
if err := p.handleStatusUpdateCode(packet); err != nil {
p.logger.Warn("failed to decode status update message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case p2pMessageCode:
if err := p.handleP2PMessageCode(packet); err != nil {
p.logger.Warn("failed to decode direct message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case p2pRequestCode:
if err := p.handleP2PRequestCode(packet); err != nil {
p.logger.Warn("failed to decode p2p request message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
case p2pRequestCompleteCode:
if err := p.handleP2PRequestCompleteCode(packet); err != nil {
p.logger.Warn("failed to decode p2p request complete message, peer will be disconnected", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
default:
// New message common might be implemented in the future versions of Waku.
// For forward compatibility, just ignore.
p.logger.Debug("ignored packet with message code", zap.Uint64("code", packet.Code))
}
return nil
}
func (p *Peer) handleMessagesCode(packet p2p.Msg) error {
// decode the contained envelopes
data, err := ioutil.ReadAll(packet.Payload)
if err != nil {
common.EnvelopesRejectedCounter.WithLabelValues("failed_read").Inc()
return fmt.Errorf("failed to read packet payload: %v", err)
}
var envelopes []*common.Envelope
if err := rlp.DecodeBytes(data, &envelopes); err != nil {
common.EnvelopesRejectedCounter.WithLabelValues("invalid_data").Inc()
return fmt.Errorf("invalid payload: %v", err)
}
envelopeErrors, err := p.host.OnNewEnvelopes(envelopes, p)
if p.host.ConfirmationsEnabled() {
go p.sendConfirmation(data, envelopeErrors) // nolint: errcheck
}
return err
}
func (p *Peer) handleMessageResponseCode(packet p2p.Msg) error {
var resp MultiVersionResponse
if err := packet.Decode(&resp); err != nil {
common.EnvelopesRejectedCounter.WithLabelValues("failed_read").Inc()
return fmt.Errorf("invalid response message: %v", err)
}
if resp.Version != 1 {
p.logger.Info("received unsupported version of MultiVersionResponse for messageResponseCode packet", zap.Uint("version", resp.Version))
return nil
}
response, err := resp.DecodeResponse1()
if err != nil {
common.EnvelopesRejectedCounter.WithLabelValues("invalid_data").Inc()
return fmt.Errorf("failed to decode response message: %v", err)
}
return p.host.OnMessagesResponse(response, p)
}
func (p *Peer) handleP2PRequestCode(packet p2p.Msg) error {
// Must be processed if mail server is implemented. Otherwise ignore.
if !p.host.Mailserver() {
return nil
}
// Read all data as we will try to decode it possibly twice.
data, err := ioutil.ReadAll(packet.Payload)
if err != nil {
return fmt.Errorf("invalid p2p request messages: %v", err)
}
r := bytes.NewReader(data)
packet.Payload = r
// As we failed to decode the request, let's set the offset
// to the beginning and try decode it again.
if _, err := r.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("invalid p2p request message: %v", err)
}
var request common.MessagesRequest
errReq := packet.Decode(&request)
if errReq == nil {
return p.host.OnMessagesRequest(request, p)
}
p.logger.Info("failed to decode p2p request message", zap.Binary("peer", p.ID()), zap.Error(errReq))
return errors.New("invalid p2p request message")
}
func (p *Peer) handleBatchAcknowledgeCode(packet p2p.Msg) error {
var batchHash gethcommon.Hash
if err := packet.Decode(&batchHash); err != nil {
return fmt.Errorf("invalid batch ack message: %v", err)
}
return p.host.OnBatchAcknowledged(batchHash, p)
}
func (p *Peer) handleStatusUpdateCode(packet p2p.Msg) error {
var StatusOptions StatusOptions
err := packet.Decode(&StatusOptions)
if err != nil {
p.logger.Error("failed to decode status-options", zap.Error(err))
common.EnvelopesRejectedCounter.WithLabelValues("invalid_settings_changed").Inc()
return err
}
return p.setOptions(StatusOptions)
}
func (p *Peer) handleP2PMessageCode(packet p2p.Msg) error {
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
// this message is not supposed to be forwarded to other peers, and
// therefore might not satisfy the PoW, expiry and other requirements.
// these messages are only accepted from the trusted peer.
if !p.trusted {
return nil
}
var (
envelopes []*common.Envelope
err error
)
if err = packet.Decode(&envelopes); err != nil {
return fmt.Errorf("invalid direct message payload: %v", err)
}
return p.host.OnNewP2PEnvelopes(envelopes, p)
}
func (p *Peer) handleP2PRequestCompleteCode(packet p2p.Msg) error {
if !p.trusted {
return nil
}
var payload []byte
if err := packet.Decode(&payload); err != nil {
return fmt.Errorf("invalid p2p request complete message: %v", err)
}
return p.host.OnP2PRequestCompleted(payload, p)
}
// sendConfirmation sends messageResponseCode and batchAcknowledgedCode messages.
func (p *Peer) sendConfirmation(data []byte, envelopeErrors []common.EnvelopeError) (err error) {
batchHash := crypto.Keccak256Hash(data)
err = p2p.Send(p.rw, messageResponseCode, NewMessagesResponse(batchHash, envelopeErrors))
if err != nil {
return
}
err = p2p.Send(p.rw, batchAcknowledgedCode, batchHash) // DEPRECATED
return
}
// handshake sends the protocol initiation status message to the remote peer and
// verifies the remote status too.
func (p *Peer) handshake() error {
// Send the handshake status message asynchronously
errc := make(chan error, 1)
opts := StatusOptionsFromHost(p.host)
go func() {
errc <- p2p.SendItems(p.rw, statusCode, Version, opts)
}()
// Fetch the remote status packet and verify protocol match
packet, err := p.rw.ReadMsg()
if err != nil {
return err
}
if packet.Code != statusCode {
return fmt.Errorf("p [%x] sent packet %x before status packet", p.ID(), packet.Code)
}
var (
peerProtocolVersion uint64
peerOptions StatusOptions
)
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
if _, err := s.List(); err != nil {
return fmt.Errorf("p [%x]: failed to decode status packet: %v", p.ID(), err)
}
// Validate protocol version.
if err := s.Decode(&peerProtocolVersion); err != nil {
return fmt.Errorf("p [%x]: failed to decode peer protocol version: %v", p.ID(), err)
}
if peerProtocolVersion != Version {
return fmt.Errorf("p [%x]: protocol version mismatch %d != %d", p.ID(), peerProtocolVersion, Version)
}
// Decode and validate other status packet options.
if err := s.Decode(&peerOptions); err != nil {
return fmt.Errorf("p [%x]: failed to decode status options: %v", p.ID(), err)
}
if err := s.ListEnd(); err != nil {
return fmt.Errorf("p [%x]: failed to decode status packet: %v", p.ID(), err)
}
if err := p.setOptions(peerOptions.WithDefaults()); err != nil {
return fmt.Errorf("p [%x]: failed to set options: %v", p.ID(), err)
}
if err := <-errc; err != nil {
return fmt.Errorf("p [%x] failed to send status packet: %v", p.ID(), err)
}
return nil
}
// update executes periodic operations on the peer, including message transmission
// and expiration.
func (p *Peer) update() {
// Start the tickers for the updates
expire := time.NewTicker(common.ExpirationCycle)
transmit := time.NewTicker(common.TransmissionCycle)
// Loop and transmit until termination is requested
for {
select {
case <-expire.C:
p.expire()
case <-transmit.C:
if err := p.broadcast(); err != nil {
p.logger.Debug("broadcasting failed", zap.Binary("peer", p.ID()), zap.Error(err))
return
}
case <-p.quit:
return
}
}
}
func (p *Peer) setOptions(peerOptions StatusOptions) error {
p.logger.Debug("settings options", zap.Binary("peerID", p.ID()), zap.Any("Options", peerOptions))
if err := peerOptions.Validate(); err != nil {
return fmt.Errorf("p [%x]: sent invalid options: %v", p.ID(), err)
}
// Validate and save peer's PoW.
pow := peerOptions.PoWRequirementF()
if pow != nil {
if math.IsInf(*pow, 0) || math.IsNaN(*pow) || *pow < 0.0 {
return fmt.Errorf("p [%x]: sent bad status message: invalid pow", p.ID())
}
p.powRequirement = *pow
}
if peerOptions.TopicInterest != nil {
p.setTopicInterest(peerOptions.TopicInterest)
} else if peerOptions.BloomFilter != nil {
// Validate and save peer's bloom filters.
bloom := peerOptions.BloomFilter
bloomSize := len(bloom)
if bloomSize != 0 && bloomSize != common.BloomFilterSize {
return fmt.Errorf("p [%x] sent bad status message: wrong bloom filter size %d", p.ID(), bloomSize)
}
p.setBloomFilter(bloom)
}
if peerOptions.LightNodeEnabled != nil {
// Validate and save other peer's options.
if *peerOptions.LightNodeEnabled && p.host.LightClientMode() && p.host.LightClientModeConnectionRestricted() {
return fmt.Errorf("p [%x] is useless: two light client communication restricted", p.ID())
}
}
if peerOptions.ConfirmationsEnabled != nil {
p.confirmationsEnabled = *peerOptions.ConfirmationsEnabled
}
if peerOptions.RateLimits != nil {
p.setRateLimits(*peerOptions.RateLimits)
}
return nil
}
// expire iterates over all the known envelopes in the host and removes all
// expired (unknown) ones from the known list.
func (p *Peer) expire() {
unmark := make(map[gethcommon.Hash]struct{})
p.known.Each(func(v interface{}) bool {
if !p.host.IsEnvelopeCached(v.(gethcommon.Hash)) {
unmark[v.(gethcommon.Hash)] = struct{}{}
}
return true
})
// Dump all known but no longer cached
for hash := range unmark {
p.known.Remove(hash)
}
}
// broadcast iterates over the collection of envelopes and transmits yet unknown
// ones over the network.
func (p *Peer) broadcast() error {
envelopes := p.host.Envelopes()
bundle := make([]*common.Envelope, 0, len(envelopes))
for _, envelope := range envelopes {
if !p.Marked(envelope) && envelope.PoW() >= p.powRequirement && p.topicOrBloomMatch(envelope) {
bundle = append(bundle, envelope)
}
}
if len(bundle) == 0 {
return nil
}
batchHash, err := sendBundle(p.rw, bundle)
if err != nil {
p.logger.Debug("failed to deliver envelopes", zap.Binary("peer", p.ID()), zap.Error(err))
return err
}
// mark envelopes only if they were successfully sent
for _, e := range bundle {
p.Mark(e)
event := common.EnvelopeEvent{
Event: common.EventEnvelopeSent,
Hash: e.Hash(),
Peer: p.EnodeID(),
}
if p.confirmationsEnabled {
event.Batch = batchHash
}
p.host.SendEnvelopeEvent(event)
}
p.logger.Debug("broadcasted bundles successfully", zap.Binary("peer", p.ID()), zap.Int("count", len(bundle)))
return nil
}
func (p *Peer) setBloomFilter(bloom []byte) {
p.bloomMu.Lock()
defer p.bloomMu.Unlock()
p.bloomFilter = bloom
p.fullNode = common.IsFullNode(bloom)
if p.fullNode && p.bloomFilter == nil {
p.bloomFilter = common.MakeFullNodeBloom()
}
p.topicInterest = nil
}
func (p *Peer) setTopicInterest(topicInterest []common.TopicType) {
p.topicInterestMu.Lock()
defer p.topicInterestMu.Unlock()
if topicInterest == nil {
p.topicInterest = nil
return
}
p.topicInterest = make(map[common.TopicType]bool)
for _, topic := range topicInterest {
p.topicInterest[topic] = true
}
p.fullNode = false
p.bloomFilter = nil
}
func (p *Peer) setRateLimits(r common.RateLimits) {
p.rateLimitsMu.Lock()
p.rateLimits = r
p.rateLimitsMu.Unlock()
}
// topicOrBloomMatch matches against topic-interest if topic interest
// is not nil. Otherwise it will match against the bloom-filter.
// If the bloom-filter is nil, or full, the node is considered a full-node
// and any envelope will be accepted. An empty topic-interest (but not nil)
// signals that we are not interested in any envelope.
func (p *Peer) topicOrBloomMatch(env *common.Envelope) bool {
p.topicInterestMu.Lock()
topicInterestMode := p.topicInterest != nil
p.topicInterestMu.Unlock()
if topicInterestMode {
return p.topicInterestMatch(env)
}
return p.bloomMatch(env)
}
func (p *Peer) topicInterestMatch(env *common.Envelope) bool {
p.topicInterestMu.Lock()
defer p.topicInterestMu.Unlock()
if p.topicInterest == nil {
return false
}
return p.topicInterest[env.Topic]
}
func (p *Peer) bloomMatch(env *common.Envelope) bool {
p.bloomMu.Lock()
defer p.bloomMu.Unlock()
return p.fullNode || common.BloomFilterMatch(p.bloomFilter, env.Bloom())
}

60
waku/v1/peer_test.go Normal file
View File

@ -0,0 +1,60 @@
// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package v1
import (
"testing"
"github.com/status-im/status-go/waku/common"
)
var sharedTopic = common.TopicType{0xF, 0x1, 0x2, 0}
var wrongTopic = common.TopicType{0, 0, 0, 0}
//two generic waku node handshake. one don't send light flag
func TestTopicOrBloomMatch(t *testing.T) {
p := Peer{}
p.setTopicInterest([]common.TopicType{sharedTopic})
envelope := &common.Envelope{Topic: sharedTopic}
if !p.topicOrBloomMatch(envelope) {
t.Fatal("envelope should match")
}
badEnvelope := &common.Envelope{Topic: wrongTopic}
if p.topicOrBloomMatch(badEnvelope) {
t.Fatal("envelope should not match")
}
}
func TestTopicOrBloomMatchFullNode(t *testing.T) {
p := Peer{}
// Set as full node
p.fullNode = true
p.setTopicInterest([]common.TopicType{sharedTopic})
envelope := &common.Envelope{Topic: sharedTopic}
if !p.topicOrBloomMatch(envelope) {
t.Fatal("envelope should match")
}
badEnvelope := &common.Envelope{Topic: wrongTopic}
if p.topicOrBloomMatch(badEnvelope) {
t.Fatal("envelope should not match")
}
}

207
waku/v1/statusoptions.go Normal file
View File

@ -0,0 +1,207 @@
package v1
import (
"errors"
"fmt"
"io"
"math"
"reflect"
"strings"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/waku/common"
)
// statusOptionKey is a current type used in StatusOptions as a key.
type statusOptionKey string
var (
defaultMinPoW = math.Float64bits(0.001)
idxFieldKey = make(map[int]statusOptionKey)
keyFieldIdx = make(map[statusOptionKey]int)
)
// StatusOptions defines additional information shared between peers
// during the handshake.
// There might be more options provided then fields in StatusOptions
// and they should be ignored during deserialization to stay forward compatible.
// In the case of RLP, options should be serialized to an array of tuples
// where the first item is a field name and the second is a RLP-serialized value.
type StatusOptions struct {
PoWRequirement *uint64 `rlp:"key=0"` // RLP does not support float64 natively
BloomFilter []byte `rlp:"key=1"`
LightNodeEnabled *bool `rlp:"key=2"`
ConfirmationsEnabled *bool `rlp:"key=3"`
RateLimits *common.RateLimits `rlp:"key=4"`
TopicInterest []common.TopicType `rlp:"key=5"`
}
func StatusOptionsFromHost(host common.WakuHost) StatusOptions {
opts := StatusOptions{}
rateLimits := host.RateLimits()
opts.RateLimits = &rateLimits
lightNode := host.LightClientMode()
opts.LightNodeEnabled = &lightNode
minPoW := host.MinPow()
opts.SetPoWRequirementFromF(minPoW)
confirmationsEnabled := host.ConfirmationsEnabled()
opts.ConfirmationsEnabled = &confirmationsEnabled
bloomFilterMode := host.BloomFilterMode()
if bloomFilterMode {
opts.BloomFilter = host.BloomFilter()
} else {
opts.TopicInterest = host.TopicInterest()
}
return opts
}
// initFLPKeyFields initialises the values of `idxFieldKey` and `keyFieldIdx`
func initRLPKeyFields() {
o := StatusOptions{}
v := reflect.ValueOf(o)
for i := 0; i < v.NumField(); i++ {
// skip unexported fields
if !v.Field(i).CanInterface() {
continue
}
rlpTag := v.Type().Field(i).Tag.Get("rlp")
// skip fields without rlp field tag
if rlpTag == "" {
continue
}
keys := strings.Split(rlpTag, "=")
if len(keys) != 2 || keys[0] != "key" {
panic("invalid value of \"rlp\" tag, expected \"key=N\" where N is uint")
}
// typecast key to be of statusOptionKey type
keyFieldIdx[statusOptionKey(keys[1])] = i
idxFieldKey[i] = statusOptionKey(keys[1])
}
}
// WithDefaults adds the default values for a given peer.
// This are not the host default values, but the default values that ought to
// be used when receiving from an update from a peer.
func (o StatusOptions) WithDefaults() StatusOptions {
if o.PoWRequirement == nil {
o.PoWRequirement = &defaultMinPoW
}
if o.LightNodeEnabled == nil {
lightNodeEnabled := false
o.LightNodeEnabled = &lightNodeEnabled
}
if o.ConfirmationsEnabled == nil {
confirmationsEnabled := false
o.ConfirmationsEnabled = &confirmationsEnabled
}
if o.RateLimits == nil {
o.RateLimits = &common.RateLimits{}
}
if o.BloomFilter == nil {
o.BloomFilter = common.MakeFullNodeBloom()
}
return o
}
func (o StatusOptions) PoWRequirementF() *float64 {
if o.PoWRequirement == nil {
return nil
}
result := math.Float64frombits(*o.PoWRequirement)
return &result
}
func (o *StatusOptions) SetPoWRequirementFromF(val float64) {
requirement := math.Float64bits(val)
o.PoWRequirement = &requirement
}
func (o StatusOptions) EncodeRLP(w io.Writer) error {
v := reflect.ValueOf(o)
var optionsList []interface{}
for i := 0; i < v.NumField(); i++ {
field := v.Field(i)
if !field.IsNil() {
value := field.Interface()
key, ok := idxFieldKey[i]
if !ok {
continue
}
if value != nil {
optionsList = append(optionsList, []interface{}{key, value})
}
}
}
return rlp.Encode(w, optionsList)
}
func (o *StatusOptions) DecodeRLP(s *rlp.Stream) error {
_, err := s.List()
if err != nil {
return fmt.Errorf("expected an outer list: %v", err)
}
v := reflect.ValueOf(o)
loop:
for {
_, err := s.List()
switch err {
case nil:
// continue to decode a key
case rlp.EOL:
break loop
default:
return fmt.Errorf("expected an inner list: %v", err)
}
var key statusOptionKey
if err := s.Decode(&key); err != nil {
return fmt.Errorf("invalid key: %v", err)
}
// Skip processing if a key does not exist.
// It might happen when there is a new peer
// which supports a new option with
// a higher index.
idx, ok := keyFieldIdx[key]
if !ok {
// Read the rest of the list items and dump peer.
_, err := s.Raw()
if err != nil {
return fmt.Errorf("failed to read the value of key %s: %v", key, err)
}
continue
}
if err := s.Decode(v.Elem().Field(idx).Addr().Interface()); err != nil {
return fmt.Errorf("failed to decode an option %s: %v", key, err)
}
if err := s.ListEnd(); err != nil {
return err
}
}
return s.ListEnd()
}
func (o StatusOptions) Validate() error {
if len(o.TopicInterest) > 10000 {
return errors.New("topic interest is limited by 1000 items")
}
return nil
}

View File

@ -0,0 +1,111 @@
package v1
import (
"math"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/waku/common"
)
func TestEncodeDecodeRLP(t *testing.T) {
pow := math.Float64bits(6.02)
lightNodeEnabled := true
confirmationsEnabled := true
opts := StatusOptions{
PoWRequirement: &pow,
BloomFilter: common.TopicToBloom(common.TopicType{0xaa, 0xbb, 0xcc, 0xdd}),
LightNodeEnabled: &lightNodeEnabled,
ConfirmationsEnabled: &confirmationsEnabled,
RateLimits: &common.RateLimits{
IPLimits: 10,
PeerIDLimits: 5,
TopicLimits: 1,
},
TopicInterest: []common.TopicType{{0x01}, {0x02}, {0x03}, {0x04}},
}
data, err := rlp.EncodeToBytes(opts)
require.NoError(t, err)
var optsDecoded StatusOptions
err = rlp.DecodeBytes(data, &optsDecoded)
require.NoError(t, err)
require.EqualValues(t, opts, optsDecoded)
}
func TestBackwardCompatibility(t *testing.T) {
alist := []interface{}{
[]interface{}{"0", math.Float64bits(2.05)},
}
data, err := rlp.EncodeToBytes(alist)
require.NoError(t, err)
var optsDecoded StatusOptions
err = rlp.DecodeBytes(data, &optsDecoded)
require.NoError(t, err)
pow := math.Float64bits(2.05)
require.EqualValues(t, StatusOptions{PoWRequirement: &pow}, optsDecoded)
}
func TestForwardCompatibility(t *testing.T) {
pow := math.Float64bits(2.05)
alist := []interface{}{
[]interface{}{"0", pow},
[]interface{}{"99", uint(10)}, // some future option
}
data, err := rlp.EncodeToBytes(alist)
require.NoError(t, err)
var optsDecoded StatusOptions
err = rlp.DecodeBytes(data, &optsDecoded)
require.NoError(t, err)
require.EqualValues(t, StatusOptions{PoWRequirement: &pow}, optsDecoded)
}
func TestInitRLPKeyFields(t *testing.T) {
ifk := map[int]statusOptionKey{
0: "0",
1: "1",
2: "2",
3: "3",
4: "4",
5: "5",
}
kfi := map[statusOptionKey]int{
"0": 0,
"1": 1,
"2": 2,
"3": 3,
"4": 4,
"5": 5,
}
// Test that the kfi length matches the inited global keyFieldIdx length
require.Equal(t, len(kfi), len(keyFieldIdx))
// Test that each index of the kfi values matches the inited global keyFieldIdx of the same index
for k, v := range kfi {
require.Exactly(t, v, keyFieldIdx[k])
}
// Test that each index of the inited global keyFieldIdx values matches kfi values of the same index
for k, v := range keyFieldIdx {
require.Exactly(t, v, kfi[k])
}
// Test that the ifk length matches the inited global idxFieldKey length
require.Equal(t, len(ifk), len(idxFieldKey))
// Test that each index of the ifk values matches the inited global idxFieldKey of the same index
for k, v := range ifk {
require.Exactly(t, v, idxFieldKey[k])
}
// Test that each index of the inited global idxFieldKey values matches ifk values of the same index
for k, v := range idxFieldKey {
require.Exactly(t, v, ifk[k])
}
}

View File

@ -46,6 +46,7 @@ import (
"github.com/status-im/status-go/waku/common"
v0 "github.com/status-im/status-go/waku/v0"
v1 "github.com/status-im/status-go/waku/v1"
)
const messageQueueLimit = 1024
@ -72,8 +73,8 @@ type settings struct {
// Waku represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer.
type Waku struct {
protocol p2p.Protocol // Peer description and parameters
filters *common.Filters // Message filters installed with Subscribe function
protocols []p2p.Protocol // Peer description and parameters
filters *common.Filters // Message filters installed with Subscribe function
privateKeys map[string]*ecdsa.PrivateKey // Private key storage
symKeys map[string][]byte // Symmetric key storage
@ -151,11 +152,11 @@ func New(cfg *Config, logger *zap.Logger) *Waku {
waku.filters = common.NewFilters()
// p2p waku sub-protocol handler
waku.protocol = p2p.Protocol{
waku.protocols = []p2p.Protocol{{
Name: v0.Name,
Version: uint(v0.Version),
Length: v0.NumberOfMessageCodes,
Run: waku.HandlePeer,
Run: waku.handlePeerV0,
NodeInfo: func() interface{} {
return map[string]interface{}{
"version": v0.VersionStr,
@ -163,16 +164,25 @@ func New(cfg *Config, logger *zap.Logger) *Waku {
"minimumPoW": waku.MinPow(),
}
},
},
{
Name: v1.Name,
Version: uint(v1.Version),
Length: v1.NumberOfMessageCodes,
Run: waku.handlePeerV1,
NodeInfo: func() interface{} {
return map[string]interface{}{
"version": v1.VersionStr,
"maxMessageSize": waku.MaxMessageSize(),
"minimumPoW": waku.MinPow(),
}
},
},
}
return waku
}
// Version returns the waku sub-protocol version number.
func (w *Waku) Version() uint {
return w.protocol.Version
}
// MinPow returns the PoW value required by this node.
func (w *Waku) MinPow() float64 {
w.settingsMu.RLock()
@ -426,7 +436,7 @@ func (w *Waku) APIs() []rpc.API {
// Protocols returns the waku sub-protocols ran by this particular client.
func (w *Waku) Protocols() []p2p.Protocol {
return []p2p.Protocol{w.protocol}
return w.protocols
}
// RegisterMailServer registers MailServer interface.
@ -1009,12 +1019,17 @@ func (w *Waku) Stop() error {
return nil
}
func (w *Waku) handlePeerV0(p2pPeer *p2p.Peer, rw p2p.MsgReadWriter) error {
return w.HandlePeer(v0.NewPeer(w, p2pPeer, rw, w.logger.Named("waku/peerv0")), rw)
}
func (w *Waku) handlePeerV1(p2pPeer *p2p.Peer, rw p2p.MsgReadWriter) error {
return w.HandlePeer(v1.NewPeer(w, p2pPeer, rw, w.logger.Named("waku/peerv1")), rw)
}
// HandlePeer is called by the underlying P2P layer when the waku sub-protocol
// connection is negotiated.
func (w *Waku) HandlePeer(p2pPeer *p2p.Peer, rw p2p.MsgReadWriter) error {
// Create the new peer and start tracking it
var peer common.Peer = v0.NewPeer(w, p2pPeer, rw, w.logger.Named("waku/peer"))
func (w *Waku) HandlePeer(peer common.Peer, rw p2p.MsgReadWriter) error {
w.peerMu.Lock()
w.peers[peer] = struct{}{}
w.peerMu.Unlock()
@ -1177,6 +1192,22 @@ func (w *Waku) SetBloomFilterMode(mode bool) {
// Recalculate and notify topic interest or bloom, currently not implemented
}
// addEnvelope adds an envelope to the envelope map, used for sending
func (w *Waku) addEnvelope(envelope *common.Envelope) {
hash := envelope.Hash()
w.poolMu.Lock()
w.envelopes[hash] = envelope
if w.expirations[envelope.Expiry] == nil {
w.expirations[envelope.Expiry] = mapset.NewThreadUnsafeSet()
}
if !w.expirations[envelope.Expiry].Contains(hash) {
w.expirations[envelope.Expiry].Add(hash)
}
w.poolMu.Unlock()
}
// addAndBridge inserts a new envelope into the message pool to be distributed within the
// waku network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp. In case of error, connection should be dropped.
@ -1235,16 +1266,10 @@ func (w *Waku) addAndBridge(envelope *common.Envelope, isP2P bool, bridged bool)
w.poolMu.Lock()
_, alreadyCached := w.envelopes[hash]
if !alreadyCached {
w.envelopes[hash] = envelope
if w.expirations[envelope.Expiry] == nil {
w.expirations[envelope.Expiry] = mapset.NewThreadUnsafeSet()
}
if !w.expirations[envelope.Expiry].Contains(hash) {
w.expirations[envelope.Expiry].Add(hash)
}
}
w.poolMu.Unlock()
if !alreadyCached {
w.addEnvelope(envelope)
}
if alreadyCached {
log.Trace("w envelope already cached", "hash", envelope.Hash().Hex())

View File

@ -22,24 +22,16 @@ import (
"bytes"
"crypto/ecdsa"
"crypto/sha256"
"math"
mrand "math/rand"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/crypto/pbkdf2"
"github.com/ethereum/go-ethereum/crypto"
"github.com/status-im/status-go/waku/common"
v0 "github.com/status-im/status-go/waku/v0"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
"go.uber.org/zap"
)
var seed int64
@ -55,21 +47,18 @@ func InitSingleTest() {
func TestBasic(t *testing.T) {
w := New(nil, nil)
p := w.Protocols()
shh := p[0]
if shh.Name != v0.Name {
t.Fatalf("failed Peer Name: %v.", shh.Name)
waku := p[0]
if waku.Name != v0.Name {
t.Fatalf("failed Peer Name: %v.", waku.Name)
}
if uint64(shh.Version) != v0.Version {
t.Fatalf("failed Peer Version: %v.", shh.Version)
if uint64(waku.Version) != v0.Version {
t.Fatalf("failed Peer Version: %v.", waku.Version)
}
if shh.Length != v0.NumberOfMessageCodes {
t.Fatalf("failed Peer Length: %v.", shh.Length)
if waku.Length != v0.NumberOfMessageCodes {
t.Fatalf("failed Peer Length: %v.", waku.Length)
}
if shh.Run == nil {
t.Fatalf("failed shh.Run.")
}
if uint64(w.Version()) != v0.Version {
t.Fatalf("failed waku Version: %v.", shh.Version)
if waku.Run == nil {
t.Fatalf("failed waku.Run.")
}
if w.GetFilter("non-existent") != nil {
t.Fatalf("failed GetFilter.")
@ -1086,672 +1075,6 @@ func TestTopicInterest(t *testing.T) {
}
// TODO: Fix this to use protcol instead of stubbing
func TestHandleP2PMessageCode(t *testing.T) {
InitSingleTest()
w1 := New(nil, nil)
if err := w1.SetMinimumPoW(0.0000001, false); err != nil {
t.Error(err)
}
if err := w1.Start(nil); err != nil {
t.Error(err)
}
defer func() {
handleError(t, w1.Stop())
}()
w2 := New(nil, nil)
if err := w2.SetMinimumPoW(0.0000001, false); err != nil {
t.Error(err)
}
if err := w2.Start(nil); err != nil {
t.Error(err)
}
defer func() {
handleError(t, w2.Stop())
}()
envelopeEvents := make(chan common.EnvelopeEvent, 10)
sub := w1.SubscribeEnvelopeEvents(envelopeEvents)
defer sub.Unsubscribe()
params, err := generateMessageParams()
if err != nil {
t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
}
params.TTL = 1
msg, err := common.NewSentMessage(params)
if err != nil {
t.Fatalf("failed to create new message with seed %d: %s.", seed, err)
}
env, err := msg.Wrap(params, time.Now())
if err != nil {
t.Fatalf("failed Wrap with seed %d: %s.", seed, err)
}
rw1, rw2 := p2p.MsgPipe()
errorc := make(chan error, 1)
go func() {
err := w1.HandlePeer(p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw1)
errorc <- err
}()
go func() {
select {
case err := <-errorc:
t.Log(err)
case <-time.After(time.Second * 5):
if err := rw1.Close(); err != nil {
t.Error(err)
}
if err := rw2.Close(); err != nil {
t.Error(err)
}
}
}()
peer1 := v0.NewPeer(w2, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw2, nil)
peer1.SetPeerTrusted(true)
err = peer1.Start()
require.NoError(t, err, "failed run message loop")
// Simulate receiving the new envelope
_, err = w2.add(env, true)
require.NoError(t, err)
if e := <-envelopeEvents; e.Hash != env.Hash() {
t.Fatalf("received envelope %s while expected %s", e.Hash, env.Hash())
}
peer1.Stop()
}
func testConfirmationsHandshake(t *testing.T, expectConfirmations bool) {
conf := &Config{
MinimumAcceptedPoW: 0,
EnableConfirmations: expectConfirmations,
}
w := New(conf, nil)
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}})
rw1, rw2 := p2p.MsgPipe()
errorc := make(chan error, 1)
go func() {
err := w.HandlePeer(p, rw2)
errorc <- err
}()
// so that actual read won't hang forever
time.AfterFunc(5*time.Second, func() {
if err := rw1.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
})
require.NoError(
t,
p2p.ExpectMsg(
rw1,
v0.StatusCode,
[]interface{}{
v0.Version,
v0.StatusOptionsFromHost(w),
},
),
)
}
func TestConfirmationHadnshakeExtension(t *testing.T) {
testConfirmationsHandshake(t, true)
}
func TestHandshakeWithConfirmationsDisabled(t *testing.T) {
testConfirmationsHandshake(t, false)
}
func TestConfirmationReceived(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
conf := &Config{
MinimumAcceptedPoW: 0,
MaxMessageSize: 10 << 20,
EnableConfirmations: true,
}
w := New(conf, logger)
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}})
rw1, rw2 := p2p.MsgPipe()
errorc := make(chan error, 1)
go func() {
err := w.HandlePeer(p, rw2)
errorc <- err
}()
go func() {
select {
case err := <-errorc:
t.Log(err)
case <-time.After(time.Second * 5):
if err := rw1.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
}
}()
pow := math.Float64bits(w.MinPow())
confirmationsEnabled := true
lightNodeEnabled := true
require.NoError(
t,
p2p.ExpectMsg(
rw1,
v0.StatusCode,
[]interface{}{
v0.Version,
v0.StatusOptionsFromHost(w),
},
),
)
require.NoError(
t,
p2p.SendItems(
rw1,
v0.StatusCode,
v0.Version,
v0.StatusOptions{
PoWRequirement: &pow,
BloomFilter: w.BloomFilter(),
ConfirmationsEnabled: &confirmationsEnabled,
LightNodeEnabled: &lightNodeEnabled,
},
),
)
e := common.Envelope{
Expiry: uint32(time.Now().Add(10 * time.Second).Unix()),
TTL: 10,
Topic: common.TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
data, err := rlp.EncodeToBytes([]*common.Envelope{&e})
require.NoError(t, err)
hash := crypto.Keccak256Hash(data)
require.NoError(t, p2p.SendItems(rw1, v0.MessagesCode, &e))
require.NoError(t, p2p.ExpectMsg(rw1, v0.MessageResponseCode, nil))
require.NoError(t, p2p.ExpectMsg(rw1, v0.BatchAcknowledgedCode, hash))
}
func TestMessagesResponseWithError(t *testing.T) {
conf := &Config{
MinimumAcceptedPoW: 0,
MaxMessageSize: 10 << 20,
EnableConfirmations: true,
}
w := New(conf, nil)
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}})
rw1, rw2 := p2p.MsgPipe()
defer func() {
if err := rw1.Close(); err != nil {
t.Errorf("error closing MsgPipe 1, '%s'", err)
}
if err := rw2.Close(); err != nil {
t.Errorf("error closing MsgPipe 2, '%s'", err)
}
}()
errorc := make(chan error, 1)
go func() {
err := w.HandlePeer(p, rw2)
errorc <- err
}()
pow := math.Float64bits(w.MinPow())
confirmationsEnabled := true
lightNodeEnabled := true
require.NoError(
t,
p2p.ExpectMsg(
rw1,
v0.StatusCode,
[]interface{}{
v0.Version,
v0.StatusOptionsFromHost(w),
},
),
)
require.NoError(
t,
p2p.SendItems(
rw1,
v0.StatusCode,
v0.Version,
v0.StatusOptions{
PoWRequirement: &pow,
BloomFilter: w.BloomFilter(),
ConfirmationsEnabled: &confirmationsEnabled,
LightNodeEnabled: &lightNodeEnabled,
},
),
)
failed := common.Envelope{
Expiry: uint32(time.Now().Add(time.Hour).Unix()),
TTL: 10,
Topic: common.TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
normal := common.Envelope{
Expiry: uint32(time.Now().Unix()),
TTL: 10,
Topic: common.TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
data, err := rlp.EncodeToBytes([]*common.Envelope{&failed, &normal})
require.NoError(t, err)
hash := crypto.Keccak256Hash(data)
require.NoError(t, p2p.SendItems(rw1, v0.MessagesCode, &failed, &normal))
require.NoError(t, p2p.ExpectMsg(rw1, v0.MessageResponseCode, v0.NewMessagesResponse(hash, []common.EnvelopeError{
{Hash: failed.Hash(), Code: common.EnvelopeTimeNotSynced, Description: "envelope from future"},
})))
require.NoError(t, p2p.ExpectMsg(rw1, v0.BatchAcknowledgedCode, hash))
}
func testConfirmationEvents(t *testing.T, envelope common.Envelope, envelopeErrors []common.EnvelopeError) {
conf := &Config{
MinimumAcceptedPoW: 0,
MaxMessageSize: 10 << 20,
EnableConfirmations: true,
}
w := New(conf, nil)
events := make(chan common.EnvelopeEvent, 2)
sub := w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}})
rw1, rw2 := p2p.MsgPipe()
errorc := make(chan error, 1)
go func() {
err := w.HandlePeer(p, rw2)
errorc <- err
}()
time.AfterFunc(5*time.Second, func() {
if err := rw1.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
})
pow := math.Float64bits(w.MinPow())
confirmationsEnabled := true
lightNodeEnabled := true
require.NoError(t, p2p.ExpectMsg(
rw1,
v0.StatusCode,
[]interface{}{
v0.Version,
v0.StatusOptionsFromHost(w),
},
))
require.NoError(t, p2p.SendItems(
rw1,
v0.StatusCode,
v0.Version,
v0.StatusOptions{
PoWRequirement: &pow,
BloomFilter: w.BloomFilter(),
ConfirmationsEnabled: &confirmationsEnabled,
LightNodeEnabled: &lightNodeEnabled,
},
))
require.NoError(t, w.Send(&envelope))
require.NoError(t, p2p.ExpectMsg(rw1, v0.MessagesCode, []*common.Envelope{&envelope}))
var hash gethcommon.Hash
select {
case ev := <-events:
require.Equal(t, common.EventEnvelopeSent, ev.Event)
require.Equal(t, p.ID(), ev.Peer)
require.NotEqual(t, gethcommon.Hash{}, ev.Batch)
hash = ev.Batch
case <-time.After(5 * time.Second):
require.FailNow(t, "timed out waiting for an envelope.sent event")
}
require.NoError(t, p2p.Send(rw1, v0.MessageResponseCode, v0.NewMessagesResponse(hash, envelopeErrors)))
require.NoError(t, p2p.Send(rw1, v0.BatchAcknowledgedCode, hash))
select {
case ev := <-events:
require.Equal(t, common.EventBatchAcknowledged, ev.Event)
require.Equal(t, p.ID(), ev.Peer)
require.Equal(t, hash, ev.Batch)
require.Equal(t, envelopeErrors, ev.Data)
case <-time.After(5 * time.Second):
require.FailNow(t, "timed out waiting for an batch.acknowledged event")
}
}
func TestConfirmationEventsReceived(t *testing.T) {
e := common.Envelope{
Expiry: uint32(time.Now().Add(10 * time.Second).Unix()),
TTL: 10,
Topic: common.TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
testConfirmationEvents(t, e, []common.EnvelopeError{})
}
func TestConfirmationEventsExtendedWithErrors(t *testing.T) {
e := common.Envelope{
Expiry: uint32(time.Now().Unix()),
TTL: 10,
Topic: common.TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
testConfirmationEvents(t, e, []common.EnvelopeError{
{
Hash: e.Hash(),
Code: common.EnvelopeTimeNotSynced,
Description: "test error",
}},
)
}
func TestEventsWithoutConfirmation(t *testing.T) {
conf := &Config{
MinimumAcceptedPoW: 0,
MaxMessageSize: 10 << 20,
}
w := New(conf, nil)
events := make(chan common.EnvelopeEvent, 2)
sub := w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}})
rw1, rw2 := p2p.MsgPipe()
errorc := make(chan error, 1)
go func() {
err := w.HandlePeer(p, rw2)
errorc <- err
}()
time.AfterFunc(5*time.Second, func() {
if err := rw1.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
})
pow := math.Float64bits(w.MinPow())
lightNodeEnabled := true
require.NoError(
t,
p2p.ExpectMsg(
rw1,
v0.StatusCode,
[]interface{}{
v0.Version,
v0.StatusOptionsFromHost(w),
},
),
)
require.NoError(
t,
p2p.SendItems(
rw1,
v0.StatusCode,
v0.Version,
v0.StatusOptions{
PoWRequirement: &pow,
BloomFilter: w.BloomFilter(),
LightNodeEnabled: &lightNodeEnabled,
},
),
)
e := common.Envelope{
Expiry: uint32(time.Now().Add(10 * time.Second).Unix()),
TTL: 10,
Topic: common.TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
require.NoError(t, w.Send(&e))
require.NoError(t, p2p.ExpectMsg(rw1, v0.MessagesCode, []*common.Envelope{&e}))
select {
case ev := <-events:
require.Equal(t, common.EventEnvelopeSent, ev.Event)
require.Equal(t, p.ID(), ev.Peer)
require.Equal(t, gethcommon.Hash{}, ev.Batch)
case <-time.After(5 * time.Second):
require.FailNow(t, "timed out waiting for an envelope.sent event")
}
}
func discardPipe() *p2p.MsgPipeRW {
rw1, rw2 := p2p.MsgPipe()
go func() {
for {
msg, err := rw1.ReadMsg()
if err != nil {
return
}
msg.Discard() // nolint: errcheck
}
}()
return rw2
}
func TestWakuTimeDesyncEnvelopeIgnored(t *testing.T) {
c := &Config{
MaxMessageSize: common.DefaultMaxMessageSize,
MinimumAcceptedPoW: 0,
}
rw1, rw2 := p2p.MsgPipe()
defer func() {
if err := rw1.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
if err := rw2.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
}()
p1 := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}})
p2 := p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"shh", 6}})
w1, w2 := New(c, nil), New(c, nil)
errc := make(chan error)
go func() {
errc <- w1.HandlePeer(p2, rw2)
}()
go func() {
errc <- w2.HandlePeer(p1, rw1)
}()
w1.SetTimeSource(func() time.Time {
return time.Now().Add(time.Hour)
})
env := &common.Envelope{
Expiry: uint32(time.Now().Add(time.Hour).Unix()),
TTL: 30,
Topic: common.TopicType{1},
Data: []byte{1, 1, 1},
}
require.NoError(t, w1.Send(env))
select {
case err := <-errc:
require.NoError(t, err)
case <-time.After(time.Second):
}
if err := rw2.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
select {
case err := <-errc:
require.Error(t, err, "p2p: read or write on closed message pipe")
case <-time.After(time.Second):
require.FailNow(t, "connection wasn't closed in expected time")
}
}
func TestRequestSentEventWithExpiry(t *testing.T) {
w := New(nil, nil)
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}})
rw := discardPipe()
defer func() {
handleError(t, rw.Close())
}()
w.peers[v0.NewPeer(w, p, rw, nil)] = struct{}{}
events := make(chan common.EnvelopeEvent, 1)
sub := w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
e := &common.Envelope{Nonce: 1}
require.NoError(t, w.RequestHistoricMessagesWithTimeout(p.ID().Bytes(), e, time.Millisecond))
verifyEvent := func(etype common.EventType) {
select {
case <-time.After(time.Second):
require.FailNow(t, "error waiting for a event type %s", etype)
case ev := <-events:
require.Equal(t, etype, ev.Event)
require.Equal(t, p.ID(), ev.Peer)
require.Equal(t, e.Hash(), ev.Hash)
}
}
verifyEvent(common.EventMailServerRequestSent)
verifyEvent(common.EventMailServerRequestExpired)
}
func TestSendMessagesRequest(t *testing.T) {
validMessagesRequest := common.MessagesRequest{
ID: make([]byte, 32),
From: 0,
To: 10,
Bloom: []byte{0x01},
}
t.Run("InvalidID", func(t *testing.T) {
w := New(nil, nil)
err := w.SendMessagesRequest([]byte{0x01, 0x02}, common.MessagesRequest{})
require.EqualError(t, err, "invalid 'ID', expected a 32-byte slice")
})
t.Run("WithoutPeer", func(t *testing.T) {
w := New(nil, nil)
err := w.SendMessagesRequest([]byte{0x01, 0x02}, validMessagesRequest)
require.EqualError(t, err, "could not find peer with ID: 0102")
})
t.Run("AllGood", func(t *testing.T) {
p := p2p.NewPeer(enode.ID{0x01}, "peer01", nil)
rw1, rw2 := p2p.MsgPipe()
w := New(nil, nil)
w.peers[v0.NewPeer(w, p, rw1, nil)] = struct{}{}
go func() {
err := w.SendMessagesRequest(p.ID().Bytes(), validMessagesRequest)
require.NoError(t, err)
}()
require.NoError(t, p2p.ExpectMsg(rw2, v0.P2PRequestCode, nil))
})
}
func TestRateLimiterIntegration(t *testing.T) {
conf := &Config{
MinimumAcceptedPoW: 0,
MaxMessageSize: 10 << 20,
}
w := New(conf, nil)
w.RegisterRateLimiter(common.NewPeerRateLimiter(nil, &common.MetricsRateLimiterHandler{}))
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}})
rw1, rw2 := p2p.MsgPipe()
defer func() {
if err := rw1.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
if err := rw2.Close(); err != nil {
t.Errorf("error closing MsgPipe, '%s'", err)
}
}()
errorc := make(chan error, 1)
go func() {
err := w.HandlePeer(p, rw2)
errorc <- err
}()
require.NoError(
t,
p2p.ExpectMsg(
rw1,
v0.StatusCode,
[]interface{}{
v0.Version,
v0.StatusOptionsFromHost(w),
},
),
)
select {
case err := <-errorc:
require.NoError(t, err)
default:
}
}
func TestMailserverCompletionEvent(t *testing.T) {
w1 := New(nil, nil)
require.NoError(t, w1.Start(nil))
defer func() {
handleError(t, w1.Stop())
}()
rw1, rw2 := p2p.MsgPipe()
peer1 := v0.NewPeer(w1, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil)
peer1.SetPeerTrusted(true)
w1.peers[peer1] = struct{}{}
w2 := New(nil, nil)
require.NoError(t, w2.Start(nil))
defer func() {
handleError(t, w2.Stop())
}()
peer2 := v0.NewPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw2, nil)
peer2.SetPeerTrusted(true)
w2.peers[peer2] = struct{}{}
events := make(chan common.EnvelopeEvent)
sub := w1.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
envelopes := []*common.Envelope{{Data: []byte{1}}, {Data: []byte{2}}}
go func() {
require.NoError(t, peer2.Start())
require.NoError(t, p2p.Send(rw2, v0.P2PMessageCode, envelopes))
require.NoError(t, p2p.Send(rw2, v0.P2PRequestCompleteCode, [100]byte{})) // 2 hashes + cursor size
require.NoError(t, rw2.Close())
}()
require.NoError(t, peer1.Start(), "p2p: read or write on closed message pipe")
require.EqualError(t, peer1.Run(), "p2p: read or write on closed message pipe")
after := time.After(2 * time.Second)
count := 0
for {
select {
case <-after:
require.FailNow(t, "timed out waiting for all events")
case ev := <-events:
switch ev.Event {
case common.EventEnvelopeAvailable:
count++
case common.EventMailServerRequestCompleted:
require.Equal(t, count, len(envelopes),
"all envelope.available events mut be received before request is completed")
return
}
}
}
}
func handleError(t *testing.T, err error) {
if err != nil {
t.Logf("deferred function error: '%s'", err)
@ -1792,29 +1115,3 @@ func generateFilter(t *testing.T, symmetric bool) (*common.Filter, error) {
// AcceptP2P & PoW are not set
return &f, nil
}
func generateMessageParams() (*common.MessageParams, error) {
// set all the parameters except p.Dst and p.Padding
buf := make([]byte, 4)
mrand.Read(buf) // nolint: gosec
sz := mrand.Intn(400)
var p common.MessageParams
p.PoW = 0.01
p.WorkTime = 1
p.TTL = uint32(mrand.Intn(1024))
p.Payload = make([]byte, sz)
p.KeySym = make([]byte, common.AESKeyLength)
mrand.Read(p.Payload) // nolint: gosec
mrand.Read(p.KeySym) // nolint: gosec
p.Topic = common.BytesToTopic(buf)
var err error
p.Src, err = crypto.GenerateKey()
if err != nil {
return nil, err
}
return &p, nil
}

728
waku/waku_version_test.go Normal file
View File

@ -0,0 +1,728 @@
// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package waku
import (
mrand "math/rand"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/status-im/status-go/waku/common"
v0 "github.com/status-im/status-go/waku/v0"
v1 "github.com/status-im/status-go/waku/v1"
"go.uber.org/zap"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
)
func TestWakuV0(t *testing.T) {
ws := new(WakuTestSuite)
ws.newPeer = v0.NewPeer
suite.Run(t, ws)
}
func TestWakuV1(t *testing.T) {
ws := new(WakuTestSuite)
ws.newPeer = v1.NewPeer
suite.Run(t, ws)
}
type WakuTestSuite struct {
suite.Suite
seed int64
newPeer func(common.WakuHost, *p2p.Peer, p2p.MsgReadWriter, *zap.Logger) common.Peer
}
// Set up random seed
func (s *WakuTestSuite) SetupTest() {
s.seed = time.Now().Unix()
mrand.Seed(s.seed)
}
func (s *WakuTestSuite) TestHandleP2PMessageCode() {
w1 := New(nil, nil)
s.Require().NoError(w1.SetMinimumPoW(0.0000001, false))
s.Require().NoError(w1.Start(nil))
go func() {
handleError(s.T(), w1.Stop())
}()
w2 := New(nil, nil)
s.Require().NoError(w2.SetMinimumPoW(0.0000001, false))
s.Require().NoError(w2.Start(nil))
go func() {
handleError(s.T(), w2.Stop())
}()
envelopeEvents := make(chan common.EnvelopeEvent, 10)
sub := w1.SubscribeEnvelopeEvents(envelopeEvents)
defer sub.Unsubscribe()
params, err := generateMessageParams()
s.Require().NoError(err, "failed generateMessageParams with seed", s.seed)
params.TTL = 1
msg, err := common.NewSentMessage(params)
s.Require().NoError(err, "failed to create new message with seed", seed)
env, err := msg.Wrap(params, time.Now())
s.Require().NoError(err, "failed Wrap with seed", seed)
rw1, rw2 := p2p.MsgPipe()
go func() {
// This will eventually error as we disconnect one the peers
handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw1, nil), rw1))
}()
go func() {
select {
case <-time.After(time.Second * 5):
handleError(s.T(), rw1.Close())
handleError(s.T(), rw2.Close())
}
}()
peer1 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw2, nil)
peer1.SetPeerTrusted(true)
err = peer1.Start()
s.Require().NoError(err, "failed run message loop")
// Simulate receiving the new envelope
_, err = w2.add(env, true)
s.Require().NoError(err)
e := <-envelopeEvents
s.Require().Equal(e.Hash, env.Hash(), "envelopes not equal")
peer1.Stop()
}
func (s *WakuTestSuite) testConfirmationsHandshake(expectConfirmations bool) {
conf := &Config{
MinimumAcceptedPoW: 0,
EnableConfirmations: expectConfirmations,
}
w1 := New(nil, nil)
w2 := New(conf, nil)
rw1, rw2 := p2p.MsgPipe()
// so that actual read won't hang forever
time.AfterFunc(5*time.Second, func() {
handleError(s.T(), rw1.Close())
handleError(s.T(), rw2.Close())
})
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}}), rw1, nil)
go func() {
handleError(s.T(), w1.HandlePeer(p1, rw1))
}()
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
err := p2.Start()
s.Require().NoError(err)
peers := w1.getPeers()
s.Require().Len(peers, 1)
s.Require().Equal(expectConfirmations, peers[0].ConfirmationsEnabled())
}
func (s *WakuTestSuite) TestConfirmationHadnshakeExtension() {
s.testConfirmationsHandshake(true)
}
func (s *WakuTestSuite) TestHandshakeWithConfirmationsDisabled() {
s.testConfirmationsHandshake(false)
}
// FLAKY
func (s *WakuTestSuite) TestMessagesResponseWithError() {
conf := &Config{
MinimumAcceptedPoW: 0,
MaxMessageSize: 10 << 20,
EnableConfirmations: true,
}
w1 := New(conf, nil)
w2 := New(conf, nil)
rw1, rw2 := p2p.MsgPipe()
defer func() {
if err := rw1.Close(); err != nil {
s.T().Errorf("error closing MsgPipe 1, '%s'", err)
}
if err := rw2.Close(); err != nil {
s.T().Errorf("error closing MsgPipe 2, '%s'", err)
}
}()
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil)
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 0}}), rw1, nil)
errorc := make(chan error, 1)
go func() {
err := w1.HandlePeer(p1, rw2)
errorc <- err
}()
s.Require().NoError(p2.Start())
failed := common.Envelope{
Expiry: uint32(time.Now().Add(time.Hour).Unix()),
TTL: 10,
Topic: common.TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
normal := common.Envelope{
Expiry: uint32(time.Now().Unix()),
TTL: 10,
Topic: common.TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
events := make(chan common.EnvelopeEvent, 2)
sub := w1.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
w2.addEnvelope(&failed)
w2.addEnvelope(&normal)
count := 0
// Wait for the two envelopes to be received
for count < 2 {
select {
case <-time.After(5 * time.Second):
s.Require().FailNow("didnt receive events")
case ev := <-events:
switch ev.Event {
case common.EventEnvelopeReceived:
count++
default:
s.Require().FailNow("invalid event message", ev.Event)
}
}
}
// Make sure only one envelope is saved and one is discarded
s.Require().Len(w1.Envelopes(), 1)
}
func (s *WakuTestSuite) testConfirmationEvents(envelope common.Envelope, envelopeErrors []common.EnvelopeError) {
conf := &Config{
MinimumAcceptedPoW: 0,
MaxMessageSize: 10 << 20,
EnableConfirmations: true,
}
w1 := New(conf, nil)
w2 := New(conf, nil)
events := make(chan common.EnvelopeEvent, 2)
sub := w1.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
rw1, rw2 := p2p.MsgPipe()
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil)
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 0}}), rw1, nil)
errorc := make(chan error, 1)
go func() {
err := w1.HandlePeer(p1, rw2)
errorc <- err
}()
time.AfterFunc(5*time.Second, func() {
if err := rw1.Close(); err != nil {
s.T().Errorf("error closing MsgPipe 1, '%s'", err)
}
if err := rw2.Close(); err != nil {
s.T().Errorf("error closing MsgPipe 2, '%s'", err)
}
})
// Start peer
err := p2.Start()
s.Require().NoError(err)
// And run mainloop
go func() {
err := p2.Run()
errorc <- err
}()
s.Require().NoError(w1.Send(&envelope))
var hash gethcommon.Hash
select {
case err := <-errorc:
s.Require().NoError(err)
case ev := <-events:
switch ev.Event {
case common.EventEnvelopeSent:
s.Require().Equal(p1.EnodeID(), ev.Peer)
s.Require().NotEqual(gethcommon.Hash{}, ev.Batch)
hash = ev.Batch
case common.EventBatchAcknowledged:
s.Require().Equal(p1.EnodeID(), ev.Peer)
s.Require().Equal(hash, ev.Batch)
s.Require().Equal(envelopeErrors, ev.Data)
default:
s.Require().FailNow("invalid event message", ev.Event)
}
case <-time.After(5 * time.Second):
s.Require().FailNow("timed out waiting for an envelope.sent event")
}
}
func (s *WakuTestSuite) TestConfirmationEventsReceived() {
e := common.Envelope{
Expiry: uint32(time.Now().Add(10 * time.Second).Unix()),
TTL: 10,
Topic: common.TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
s.testConfirmationEvents(e, []common.EnvelopeError{})
}
func (s *WakuTestSuite) TestConfirmationEventsExtendedWithErrors() {
e := common.Envelope{
Expiry: uint32(time.Now().Unix()),
TTL: 10,
Topic: common.TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
s.testConfirmationEvents(e, []common.EnvelopeError{
{
Hash: e.Hash(),
Code: common.EnvelopeTimeNotSynced,
Description: "test error",
}},
)
}
func (s *WakuTestSuite) TestEventsWithoutConfirmation() {
conf := &Config{
MinimumAcceptedPoW: 0,
MaxMessageSize: 10 << 20,
}
w1 := New(conf, nil)
w2 := New(conf, nil)
events := make(chan common.EnvelopeEvent, 2)
sub := w1.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
rw1, rw2 := p2p.MsgPipe()
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil)
go func() {
handleError(s.T(), w1.HandlePeer(p1, rw2))
}()
time.AfterFunc(5*time.Second, func() {
rw1.Close()
})
peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil)
s.Require().NoError(peer2.Start())
go func() {
handleError(s.T(), peer2.Run())
}()
e := common.Envelope{
Expiry: uint32(time.Now().Add(10 * time.Second).Unix()),
TTL: 10,
Topic: common.TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
s.Require().NoError(w1.Send(&e))
select {
case ev := <-events:
s.Require().Equal(common.EventEnvelopeSent, ev.Event)
s.Require().Equal(p1.EnodeID(), ev.Peer)
s.Require().Equal(gethcommon.Hash{}, ev.Batch)
case <-time.After(5 * time.Second):
s.Require().FailNow("timed out waiting for an envelope.sent event")
}
}
func discardPipe() *p2p.MsgPipeRW {
rw1, rw2 := p2p.MsgPipe()
go func() {
for {
msg, err := rw1.ReadMsg()
if err != nil {
return
}
msg.Discard() // nolint: errcheck
}
}()
return rw2
}
func (s *WakuTestSuite) TestWakuTimeDesyncEnvelopeIgnored() {
c := &Config{
MaxMessageSize: common.DefaultMaxMessageSize,
MinimumAcceptedPoW: 0,
}
rw1, rw2 := p2p.MsgPipe()
defer func() {
if err := rw1.Close(); err != nil {
s.T().Errorf("error closing MsgPipe, '%s'", err)
}
if err := rw2.Close(); err != nil {
s.T().Errorf("error closing MsgPipe, '%s'", err)
}
}()
w1, w2 := New(c, nil), New(c, nil)
p1 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}}), rw1, nil)
p2 := s.newPeer(w1, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 1}}), rw2, nil)
errc := make(chan error)
go func() {
errc <- w1.HandlePeer(p2, rw2)
}()
go func() {
errc <- w2.HandlePeer(p1, rw1)
}()
w1.SetTimeSource(func() time.Time {
return time.Now().Add(time.Hour)
})
env := &common.Envelope{
Expiry: uint32(time.Now().Add(time.Hour).Unix()),
TTL: 30,
Topic: common.TopicType{1},
Data: []byte{1, 1, 1},
}
s.Require().NoError(w1.Send(env))
select {
case err := <-errc:
s.Require().NoError(err)
case <-time.After(time.Second):
}
s.Require().NoError(rw2.Close())
select {
case err := <-errc:
s.Require().Error(err, "p2p: read or write on closed message pipe")
case <-time.After(time.Second):
s.Require().FailNow("connection wasn't closed in expected time")
}
}
func (s *WakuTestSuite) TestRequestSentEventWithExpiry() {
w := New(nil, nil)
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}})
rw := discardPipe()
defer func() {
handleError(s.T(), rw.Close())
}()
w.peers[s.newPeer(w, p, rw, nil)] = struct{}{}
events := make(chan common.EnvelopeEvent, 1)
sub := w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
e := &common.Envelope{Nonce: 1}
s.Require().NoError(w.RequestHistoricMessagesWithTimeout(p.ID().Bytes(), e, time.Millisecond))
verifyEvent := func(etype common.EventType) {
select {
case <-time.After(time.Second):
s.Require().FailNow("error waiting for a event type %s", etype)
case ev := <-events:
s.Require().Equal(etype, ev.Event)
s.Require().Equal(p.ID(), ev.Peer)
s.Require().Equal(e.Hash(), ev.Hash)
}
}
verifyEvent(common.EventMailServerRequestSent)
verifyEvent(common.EventMailServerRequestExpired)
}
func (s *WakuTestSuite) TestSendMessagesRequest() {
validMessagesRequest := common.MessagesRequest{
ID: make([]byte, 32),
From: 0,
To: 10,
Bloom: []byte{0x01},
}
s.Run("InvalidID", func() {
w := New(nil, nil)
err := w.SendMessagesRequest([]byte{0x01, 0x02}, common.MessagesRequest{})
s.Require().EqualError(err, "invalid 'ID', expected a 32-byte slice")
})
s.Run("WithoutPeer", func() {
w := New(nil, nil)
err := w.SendMessagesRequest([]byte{0x01, 0x02}, validMessagesRequest)
s.Require().EqualError(err, "could not find peer with ID: 0102")
})
s.Run("AllGood", func() {
p := p2p.NewPeer(enode.ID{0x01}, "peer01", nil)
rw1, rw2 := p2p.MsgPipe()
w := New(nil, nil)
w.peers[s.newPeer(w, p, rw1, nil)] = struct{}{}
go func() {
// Read out so that it's consumed
_, err := rw2.ReadMsg()
s.Require().NoError(err)
rw2.Close()
rw1.Close()
}()
err := w.SendMessagesRequest(p.ID().Bytes(), validMessagesRequest)
s.Require().NoError(err)
})
}
func (s *WakuTestSuite) TestRateLimiterIntegration() {
conf := &Config{
MinimumAcceptedPoW: 0,
MaxMessageSize: 10 << 20,
}
w := New(conf, nil)
w.RegisterRateLimiter(common.NewPeerRateLimiter(nil, &common.MetricsRateLimiterHandler{}))
rw1, rw2 := p2p.MsgPipe()
defer func() {
if err := rw1.Close(); err != nil {
s.T().Errorf("error closing MsgPipe, '%s'", err)
}
if err := rw2.Close(); err != nil {
s.T().Errorf("error closing MsgPipe, '%s'", err)
}
}()
p := s.newPeer(w, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil)
errorc := make(chan error, 1)
go func() {
err := w.HandlePeer(p, rw2)
errorc <- err
}()
_, err := rw1.ReadMsg()
s.Require().NoError(err)
select {
case err := <-errorc:
s.Require().NoError(err)
default:
}
}
func (s *WakuTestSuite) TestMailserverCompletionEvent() {
w1 := New(nil, nil)
s.Require().NoError(w1.Start(nil))
defer func() {
handleError(s.T(), w1.Stop())
}()
rw1, rw2 := p2p.MsgPipe()
errorc := make(chan error, 1)
go func() {
err := w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "1", []p2p.Cap{}), rw1, nil), rw1)
errorc <- err
}()
w2 := New(nil, nil)
s.Require().NoError(w2.Start(nil))
defer func() {
handleError(s.T(), w2.Stop())
}()
peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw2, nil)
peer2.SetPeerTrusted(true)
events := make(chan common.EnvelopeEvent)
sub := w1.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
envelopes := []*common.Envelope{{Data: []byte{1}}, {Data: []byte{2}}}
s.Require().NoError(peer2.Start())
// Set peer trusted, we know the peer has been added as handshake was successful
w1.getPeers()[0].SetPeerTrusted(true)
s.Require().NoError(peer2.SendP2PMessages(envelopes))
s.Require().NoError(peer2.SendHistoricMessageResponse(make([]byte, 100)))
rw2.Close()
// Wait for all messages to be read
err := <-errorc
s.Require().EqualError(err, "p2p: read or write on closed message pipe")
after := time.After(2 * time.Second)
count := 0
for {
select {
case <-after:
s.Require().FailNow("timed out waiting for all events")
case ev := <-events:
switch ev.Event {
case common.EventEnvelopeAvailable:
count++
case common.EventMailServerRequestCompleted:
s.Require().Equal(count, len(envelopes),
"all envelope.avaiable events mut be recevied before request is compelted")
return
}
}
}
}
//two generic waku node handshake
func (s *WakuTestSuite) TestPeerHandshakeWithTwoFullNode() {
rw1, rw2 := p2p.MsgPipe()
defer func() {
handleError(s.T(), rw1.Close())
}()
defer func() {
handleError(s.T(), rw2.Close())
}()
w1 := New(nil, nil)
var pow float64 = 0.1
err := w1.SetMinimumPoW(pow, true)
s.Require().NoError(err)
w2 := New(nil, nil)
go func() {
handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil), rw1))
}()
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
err = p2.Start()
s.Require().NoError(err)
s.Require().Equal(pow, p2.PoWRequirement())
}
//two generic waku node handshake. one don't send light flag
func (s *WakuTestSuite) TestHandshakeWithOldVersionWithoutLightModeFlag() {
rw1, rw2 := p2p.MsgPipe()
go func() {
handleError(s.T(), rw1.Close())
}()
go func() {
handleError(s.T(), rw2.Close())
}()
w1 := New(nil, nil)
w1.SetLightClientMode(true)
w2 := New(nil, nil)
go func() {
handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil), rw1))
}()
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
err := p2.Start()
s.Require().NoError(err)
}
//two light nodes handshake. restriction enable
func (s *WakuTestSuite) TestTwoLightPeerHandshakeRestrictionOff() {
rw1, rw2 := p2p.MsgPipe()
defer func() {
handleError(s.T(), rw1.Close())
}()
defer func() {
handleError(s.T(), rw2.Close())
}()
w1 := New(nil, nil)
w1.SetLightClientMode(true)
w1.settings.RestrictLightClientsConn = false
w2 := New(nil, nil)
w2.SetLightClientMode(true)
w2.settings.RestrictLightClientsConn = false
go func() {
handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil), rw1))
}()
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
s.Require().NoError(p2.Start())
}
//two light nodes handshake. restriction enabled
func (s *WakuTestSuite) TestTwoLightPeerHandshakeError() {
rw1, rw2 := p2p.MsgPipe()
defer func() {
handleError(s.T(), rw1.Close())
}()
defer func() {
handleError(s.T(), rw2.Close())
}()
w1 := New(nil, nil)
w1.SetLightClientMode(true)
w1.settings.RestrictLightClientsConn = true
w2 := New(nil, nil)
w2.SetLightClientMode(true)
w2.settings.RestrictLightClientsConn = true
go func() {
handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil), rw1))
}()
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
s.Require().Error(p2.Start())
}
func generateMessageParams() (*common.MessageParams, error) {
// set all the parameters except p.Dst and p.Padding
buf := make([]byte, 4)
mrand.Read(buf) // nolint: gosec
sz := mrand.Intn(400)
var p common.MessageParams
p.PoW = 0.01
p.WorkTime = 1
p.TTL = uint32(mrand.Intn(1024))
p.Payload = make([]byte, sz)
p.KeySym = make([]byte, common.AESKeyLength)
mrand.Read(p.Payload) // nolint: gosec
mrand.Read(p.KeySym) // nolint: gosec
p.Topic = common.BytesToTopic(buf)
var err error
p.Src, err = crypto.GenerateKey()
if err != nil {
return nil, err
}
return &p, nil
}