Extend whisper protocol with confirmations and add necessary events

This commit is contained in:
Dmitry 2018-11-19 09:53:59 +02:00 committed by Dmitry Shulyak
parent 552229d15d
commit 76c2447643
6 changed files with 250 additions and 22 deletions

View File

@ -21,6 +21,7 @@ type Config struct {
MaxMessageSize uint32 `toml:",omitempty"`
MinimumAcceptedPOW float64 `toml:",omitempty"`
RestrictConnectionBetweenLightClients bool `toml:",omitempty"`
DisableConfirmations bool `toml:",omitempty"`
}
// DefaultConfig represents (shocker!) the default configuration.

View File

@ -47,6 +47,7 @@ const (
messagesCode = 1 // normal whisper message
powRequirementCode = 2 // PoW requirement
bloomFilterExCode = 3 // bloom filter exchange
batchAcknowledgedCode = 11 // confirmation that batch of envelopes was received
p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol
p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol
p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)

View File

@ -13,6 +13,8 @@ const (
EventEnvelopeSent EventType = "envelope.sent"
// EventEnvelopeExpired fires when envelop expired
EventEnvelopeExpired EventType = "envelope.expired"
// EventBatchAcknowledged is sent when batch of envelopes was acknowleged by a peer.
EventBatchAcknowledged EventType = "batch.acknowleged"
// EventEnvelopeAvailable fires when envelop is available for filters
EventEnvelopeAvailable EventType = "envelope.available"
// EventMailServerRequestCompleted fires after mailserver sends all the requested messages
@ -27,6 +29,7 @@ const (
type EnvelopeEvent struct {
Event EventType
Hash common.Hash
Batch common.Hash
Peer enode.ID
Data interface{}
}

View File

@ -17,6 +17,7 @@
package whisperv6
import (
"bytes"
"fmt"
"math"
"sync"
@ -24,6 +25,7 @@ import (
mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
@ -35,11 +37,12 @@ type Peer struct {
peer *p2p.Peer
ws p2p.MsgReadWriter
trusted bool
powRequirement float64
bloomMu sync.Mutex
bloomFilter []byte
fullNode bool
trusted bool
powRequirement float64
bloomMu sync.Mutex
bloomFilter []byte
fullNode bool
confirmationsEnabled bool
known mapset.Set // Messages already known by the peer to avoid wasting bandwidth
@ -85,8 +88,9 @@ func (peer *Peer) handshake() error {
pow := peer.host.MinPow()
powConverted := math.Float64bits(pow)
bloom := peer.host.BloomFilter()
confirmationsEnabled := !peer.host.disableConfirmations
errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode)
errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode, confirmationsEnabled)
}()
// Fetch the remote status packet and verify protocol match
@ -134,6 +138,12 @@ func (peer *Peer) handshake() error {
if isRemotePeerLightNode && isLightNode && isRestrictedLightNodeConnection {
return fmt.Errorf("peer [%x] is useless: two light client communication restricted", peer.ID())
}
confirmationsEnabled, err := s.Bool()
if err != nil || !confirmationsEnabled {
log.Warn("confirmations are disabled", "peer", peer.ID())
} else {
peer.confirmationsEnabled = confirmationsEnabled
}
if err := <-errc; err != nil {
return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err)
@ -208,19 +218,24 @@ func (peer *Peer) broadcast() error {
}
if len(bundle) > 0 {
// transmit the batch of envelopes
if err := p2p.Send(peer.ws, messagesCode, bundle); err != nil {
batchHash, err := sendBundle(peer.ws, bundle)
if err != nil {
log.Warn("failed to deliver envelopes", "peer", peer.peer.ID(), "error", err)
return err
}
// mark envelopes only if they were successfully sent
for _, e := range bundle {
peer.mark(e)
peer.host.envelopeFeed.Send(EnvelopeEvent{
event := EnvelopeEvent{
Event: EventEnvelopeSent,
Hash: e.Hash(),
Peer: peer.peer.ID(), // specifically discover.NodeID because it can be pretty printed
})
Peer: peer.peer.ID(),
}
if peer.confirmationsEnabled {
event.Batch = batchHash
}
peer.host.envelopeFeed.Send(event)
}
log.Trace("broadcast", "num. messages", len(bundle))
@ -266,3 +281,19 @@ func MakeFullNodeBloom() []byte {
}
return bloom
}
func sendBundle(rw p2p.MsgWriter, bundle []*Envelope) (rst common.Hash, err error) {
data, err := rlp.EncodeToBytes(bundle)
if err != nil {
return
}
err = rw.WriteMsg(p2p.Msg{
Code: messagesCode,
Size: uint32(len(data)),
Payload: bytes.NewBuffer(data),
})
if err != nil {
return
}
return crypto.Keccak256Hash(data), nil
}

View File

@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/syndtr/goleveldb/leveldb/errors"
@ -92,6 +93,8 @@ type Whisper struct {
settings syncmap.Map // holds configuration settings that can be dynamically changed
disableConfirmations bool // do not reply with confirmations
syncAllowance int // maximum time in seconds allowed to process the whisper-related messages
statsMu sync.Mutex // guard stats
@ -111,16 +114,17 @@ func New(cfg *Config) *Whisper {
}
whisper := &Whisper{
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
expirations: make(map[uint32]mapset.Set),
peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
syncAllowance: DefaultSyncAllowance,
timeSource: time.Now,
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
expirations: make(map[uint32]mapset.Set),
peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
syncAllowance: DefaultSyncAllowance,
timeSource: time.Now,
disableConfirmations: cfg.DisableConfirmations,
}
whisper.filters = NewFilters(whisper)
@ -780,6 +784,13 @@ func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
return whisper.runMessageLoop(whisperPeer, rw)
}
func (whisper *Whisper) sendConfirmation(peer enode.ID, rw p2p.MsgReadWriter, data []byte) {
batchHash := crypto.Keccak256Hash(data)
if err := p2p.Send(rw, batchAcknowledgedCode, batchHash); err != nil {
log.Warn("failed to deliver confirmation", "hash", batchHash, "peer", peer, "error", err)
}
}
// runMessageLoop reads and processes inbound messages directly to merge into client-global state.
func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
for {
@ -800,8 +811,17 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
log.Warn("unxepected status message received", "peer", p.peer.ID())
case messagesCode:
// decode the contained envelopes
data, err := ioutil.ReadAll(packet.Payload)
if err != nil {
log.Warn("failed to read envelopes data", "peer", p.peer.ID(), "error", err)
return errors.New("invalid enveloopes")
}
if !whisper.disableConfirmations {
go whisper.sendConfirmation(p.peer.ID(), rw, data)
}
var envelopes []*Envelope
if err := packet.Decode(&envelopes); err != nil {
if err := rlp.DecodeBytes(data, &envelopes); err != nil {
log.Warn("failed to decode envelopes, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid envelopes")
}
@ -821,6 +841,17 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
if trouble {
return errors.New("invalid envelope")
}
case batchAcknowledgedCode:
var batchHash common.Hash
if err := packet.Decode(&batchHash); err != nil {
log.Warn("failed to decode confirmation into common.Hash", "peer", p.peer.ID(), "error", err)
return errors.New("invalid confirmation message")
}
whisper.envelopeFeed.Send(EnvelopeEvent{
Batch: batchHash,
Event: EventBatchAcknowledged,
Peer: p.peer.ID(),
})
case powRequirementCode:
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
i, err := s.Uint()

View File

@ -20,14 +20,17 @@ import (
"bytes"
"crypto/ecdsa"
"crypto/sha256"
"math"
mrand "math/rand"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
"github.com/stretchr/testify/require"
"github.com/syndtr/goleveldb/leveldb/errors"
"golang.org/x/crypto/pbkdf2"
)
@ -1041,3 +1044,161 @@ func (stub *rwP2PMessagesStub) WriteMsg(m p2p.Msg) error {
stub.messages = append(stub.messages, m)
return nil
}
func testConfirmationsHandshake(t *testing.T, expectConfirmations bool) {
conf := &Config{
MinimumAcceptedPOW: 0,
DisableConfirmations: !expectConfirmations,
}
w := New(conf)
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}})
rw1, rw2 := p2p.MsgPipe()
errorc := make(chan error, 1)
go func() {
err := w.HandlePeer(p, rw2)
errorc <- err
}()
// so that actual read won't hang forever
time.AfterFunc(5*time.Second, func() {
rw1.Close()
})
require.NoError(t, p2p.ExpectMsg(rw1, statusCode, []interface{}{ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, expectConfirmations}))
}
func TestConfirmationHadnshakeExtension(t *testing.T) {
testConfirmationsHandshake(t, true)
}
func TestHandshakeWithConfirmationsDisabled(t *testing.T) {
testConfirmationsHandshake(t, false)
}
func TestConfirmationReceived(t *testing.T) {
conf := &Config{
MinimumAcceptedPOW: 0,
MaxMessageSize: 10 << 20,
}
w := New(conf)
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}})
rw1, rw2 := p2p.MsgPipe()
errorc := make(chan error, 1)
go func() {
err := w.HandlePeer(p, rw2)
errorc <- err
}()
time.AfterFunc(5*time.Second, func() {
rw1.Close()
})
require.NoError(t, p2p.ExpectMsg(rw1, statusCode, []interface{}{ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, true}))
require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, true))
e := Envelope{
Expiry: uint32(time.Now().Add(10 * time.Second).Unix()),
TTL: 10,
Topic: TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
data, err := rlp.EncodeToBytes([]*Envelope{&e})
require.NoError(t, err)
hash := crypto.Keccak256Hash(data)
require.NoError(t, p2p.SendItems(rw1, messagesCode, &e))
require.NoError(t, p2p.ExpectMsg(rw1, batchAcknowledgedCode, hash))
}
func TestEventsReceived(t *testing.T) {
conf := &Config{
MinimumAcceptedPOW: 0,
MaxMessageSize: 10 << 20,
}
w := New(conf)
events := make(chan EnvelopeEvent, 2)
sub := w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}})
rw1, rw2 := p2p.MsgPipe()
errorc := make(chan error, 1)
go func() {
err := w.HandlePeer(p, rw2)
errorc <- err
}()
time.AfterFunc(5*time.Second, func() {
rw1.Close()
})
require.NoError(t, p2p.ExpectMsg(rw1, statusCode, []interface{}{ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, true}))
require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, true))
e := Envelope{
Expiry: uint32(time.Now().Add(10 * time.Second).Unix()),
TTL: 10,
Topic: TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
require.NoError(t, w.Send(&e))
require.NoError(t, p2p.ExpectMsg(rw1, messagesCode, []*Envelope{&e}))
var hash common.Hash
select {
case ev := <-events:
require.Equal(t, EventEnvelopeSent, ev.Event)
require.Equal(t, p.ID(), ev.Peer)
require.NotEqual(t, common.Hash{}, ev.Batch)
hash = ev.Batch
case <-time.After(5 * time.Second):
require.FailNow(t, "timed out waiting for an envelope.sent event")
}
require.NoError(t, p2p.Send(rw1, batchAcknowledgedCode, hash))
select {
case ev := <-events:
require.Equal(t, EventBatchAcknowledged, ev.Event)
require.Equal(t, p.ID(), ev.Peer)
require.Equal(t, hash, ev.Batch)
case <-time.After(5 * time.Second):
require.FailNow(t, "timed out waiting for an batch.acknowledged event")
}
}
func TestEventsWithoutConfirmation(t *testing.T) {
conf := &Config{
MinimumAcceptedPOW: 0,
MaxMessageSize: 10 << 20,
}
w := New(conf)
events := make(chan EnvelopeEvent, 2)
sub := w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}})
rw1, rw2 := p2p.MsgPipe()
errorc := make(chan error, 1)
go func() {
err := w.HandlePeer(p, rw2)
errorc <- err
}()
time.AfterFunc(5*time.Second, func() {
rw1.Close()
})
require.NoError(t, p2p.ExpectMsg(rw1, statusCode, []interface{}{ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, true}))
require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, false))
e := Envelope{
Expiry: uint32(time.Now().Add(10 * time.Second).Unix()),
TTL: 10,
Topic: TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
require.NoError(t, w.Send(&e))
require.NoError(t, p2p.ExpectMsg(rw1, messagesCode, []*Envelope{&e}))
select {
case ev := <-events:
require.Equal(t, EventEnvelopeSent, ev.Event)
require.Equal(t, p.ID(), ev.Peer)
require.Equal(t, common.Hash{}, ev.Batch)
case <-time.After(5 * time.Second):
require.FailNow(t, "timed out waiting for an envelope.sent event")
}
}