diff --git a/whisperv6/config.go b/whisperv6/config.go index 38eb955..36bf412 100644 --- a/whisperv6/config.go +++ b/whisperv6/config.go @@ -21,6 +21,7 @@ type Config struct { MaxMessageSize uint32 `toml:",omitempty"` MinimumAcceptedPOW float64 `toml:",omitempty"` RestrictConnectionBetweenLightClients bool `toml:",omitempty"` + DisableConfirmations bool `toml:",omitempty"` } // DefaultConfig represents (shocker!) the default configuration. diff --git a/whisperv6/doc.go b/whisperv6/doc.go index df9791a..c8a9c50 100644 --- a/whisperv6/doc.go +++ b/whisperv6/doc.go @@ -47,6 +47,7 @@ const ( messagesCode = 1 // normal whisper message powRequirementCode = 2 // PoW requirement bloomFilterExCode = 3 // bloom filter exchange + batchAcknowledgedCode = 11 // confirmation that batch of envelopes was received p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further) diff --git a/whisperv6/events.go b/whisperv6/events.go index df0f460..bfbce00 100644 --- a/whisperv6/events.go +++ b/whisperv6/events.go @@ -13,6 +13,8 @@ const ( EventEnvelopeSent EventType = "envelope.sent" // EventEnvelopeExpired fires when envelop expired EventEnvelopeExpired EventType = "envelope.expired" + // EventBatchAcknowledged is sent when batch of envelopes was acknowleged by a peer. + EventBatchAcknowledged EventType = "batch.acknowleged" // EventEnvelopeAvailable fires when envelop is available for filters EventEnvelopeAvailable EventType = "envelope.available" // EventMailServerRequestCompleted fires after mailserver sends all the requested messages @@ -27,6 +29,7 @@ const ( type EnvelopeEvent struct { Event EventType Hash common.Hash + Batch common.Hash Peer enode.ID Data interface{} } diff --git a/whisperv6/peer.go b/whisperv6/peer.go index 2b7687e..8460ca5 100644 --- a/whisperv6/peer.go +++ b/whisperv6/peer.go @@ -17,6 +17,7 @@ package whisperv6 import ( + "bytes" "fmt" "math" "sync" @@ -24,6 +25,7 @@ import ( mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" @@ -35,11 +37,12 @@ type Peer struct { peer *p2p.Peer ws p2p.MsgReadWriter - trusted bool - powRequirement float64 - bloomMu sync.Mutex - bloomFilter []byte - fullNode bool + trusted bool + powRequirement float64 + bloomMu sync.Mutex + bloomFilter []byte + fullNode bool + confirmationsEnabled bool known mapset.Set // Messages already known by the peer to avoid wasting bandwidth @@ -85,8 +88,9 @@ func (peer *Peer) handshake() error { pow := peer.host.MinPow() powConverted := math.Float64bits(pow) bloom := peer.host.BloomFilter() + confirmationsEnabled := !peer.host.disableConfirmations - errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode) + errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode, confirmationsEnabled) }() // Fetch the remote status packet and verify protocol match @@ -134,6 +138,12 @@ func (peer *Peer) handshake() error { if isRemotePeerLightNode && isLightNode && isRestrictedLightNodeConnection { return fmt.Errorf("peer [%x] is useless: two light client communication restricted", peer.ID()) } + confirmationsEnabled, err := s.Bool() + if err != nil || !confirmationsEnabled { + log.Warn("confirmations are disabled", "peer", peer.ID()) + } else { + peer.confirmationsEnabled = confirmationsEnabled + } if err := <-errc; err != nil { return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err) @@ -208,19 +218,24 @@ func (peer *Peer) broadcast() error { } if len(bundle) > 0 { - // transmit the batch of envelopes - if err := p2p.Send(peer.ws, messagesCode, bundle); err != nil { + batchHash, err := sendBundle(peer.ws, bundle) + if err != nil { + log.Warn("failed to deliver envelopes", "peer", peer.peer.ID(), "error", err) return err } // mark envelopes only if they were successfully sent for _, e := range bundle { peer.mark(e) - peer.host.envelopeFeed.Send(EnvelopeEvent{ + event := EnvelopeEvent{ Event: EventEnvelopeSent, Hash: e.Hash(), - Peer: peer.peer.ID(), // specifically discover.NodeID because it can be pretty printed - }) + Peer: peer.peer.ID(), + } + if peer.confirmationsEnabled { + event.Batch = batchHash + } + peer.host.envelopeFeed.Send(event) } log.Trace("broadcast", "num. messages", len(bundle)) @@ -266,3 +281,19 @@ func MakeFullNodeBloom() []byte { } return bloom } + +func sendBundle(rw p2p.MsgWriter, bundle []*Envelope) (rst common.Hash, err error) { + data, err := rlp.EncodeToBytes(bundle) + if err != nil { + return + } + err = rw.WriteMsg(p2p.Msg{ + Code: messagesCode, + Size: uint32(len(data)), + Payload: bytes.NewBuffer(data), + }) + if err != nil { + return + } + return crypto.Keccak256Hash(data), nil +} diff --git a/whisperv6/whisper.go b/whisperv6/whisper.go index f118429..cbe44c4 100644 --- a/whisperv6/whisper.go +++ b/whisperv6/whisper.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/syndtr/goleveldb/leveldb/errors" @@ -92,6 +93,8 @@ type Whisper struct { settings syncmap.Map // holds configuration settings that can be dynamically changed + disableConfirmations bool // do not reply with confirmations + syncAllowance int // maximum time in seconds allowed to process the whisper-related messages statsMu sync.Mutex // guard stats @@ -111,16 +114,17 @@ func New(cfg *Config) *Whisper { } whisper := &Whisper{ - privateKeys: make(map[string]*ecdsa.PrivateKey), - symKeys: make(map[string][]byte), - envelopes: make(map[common.Hash]*Envelope), - expirations: make(map[uint32]mapset.Set), - peers: make(map[*Peer]struct{}), - messageQueue: make(chan *Envelope, messageQueueLimit), - p2pMsgQueue: make(chan *Envelope, messageQueueLimit), - quit: make(chan struct{}), - syncAllowance: DefaultSyncAllowance, - timeSource: time.Now, + privateKeys: make(map[string]*ecdsa.PrivateKey), + symKeys: make(map[string][]byte), + envelopes: make(map[common.Hash]*Envelope), + expirations: make(map[uint32]mapset.Set), + peers: make(map[*Peer]struct{}), + messageQueue: make(chan *Envelope, messageQueueLimit), + p2pMsgQueue: make(chan *Envelope, messageQueueLimit), + quit: make(chan struct{}), + syncAllowance: DefaultSyncAllowance, + timeSource: time.Now, + disableConfirmations: cfg.DisableConfirmations, } whisper.filters = NewFilters(whisper) @@ -780,6 +784,13 @@ func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { return whisper.runMessageLoop(whisperPeer, rw) } +func (whisper *Whisper) sendConfirmation(peer enode.ID, rw p2p.MsgReadWriter, data []byte) { + batchHash := crypto.Keccak256Hash(data) + if err := p2p.Send(rw, batchAcknowledgedCode, batchHash); err != nil { + log.Warn("failed to deliver confirmation", "hash", batchHash, "peer", peer, "error", err) + } +} + // runMessageLoop reads and processes inbound messages directly to merge into client-global state. func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { for { @@ -800,8 +811,17 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("unxepected status message received", "peer", p.peer.ID()) case messagesCode: // decode the contained envelopes + data, err := ioutil.ReadAll(packet.Payload) + if err != nil { + log.Warn("failed to read envelopes data", "peer", p.peer.ID(), "error", err) + return errors.New("invalid enveloopes") + } + if !whisper.disableConfirmations { + go whisper.sendConfirmation(p.peer.ID(), rw, data) + } + var envelopes []*Envelope - if err := packet.Decode(&envelopes); err != nil { + if err := rlp.DecodeBytes(data, &envelopes); err != nil { log.Warn("failed to decode envelopes, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid envelopes") } @@ -821,6 +841,17 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { if trouble { return errors.New("invalid envelope") } + case batchAcknowledgedCode: + var batchHash common.Hash + if err := packet.Decode(&batchHash); err != nil { + log.Warn("failed to decode confirmation into common.Hash", "peer", p.peer.ID(), "error", err) + return errors.New("invalid confirmation message") + } + whisper.envelopeFeed.Send(EnvelopeEvent{ + Batch: batchHash, + Event: EventBatchAcknowledged, + Peer: p.peer.ID(), + }) case powRequirementCode: s := rlp.NewStream(packet.Payload, uint64(packet.Size)) i, err := s.Uint() diff --git a/whisperv6/whisper_test.go b/whisperv6/whisper_test.go index 2702f1c..52c2ccb 100644 --- a/whisperv6/whisper_test.go +++ b/whisperv6/whisper_test.go @@ -20,14 +20,17 @@ import ( "bytes" "crypto/ecdsa" "crypto/sha256" + "math" mrand "math/rand" "testing" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rlp" + "github.com/stretchr/testify/require" "github.com/syndtr/goleveldb/leveldb/errors" "golang.org/x/crypto/pbkdf2" ) @@ -1041,3 +1044,161 @@ func (stub *rwP2PMessagesStub) WriteMsg(m p2p.Msg) error { stub.messages = append(stub.messages, m) return nil } + +func testConfirmationsHandshake(t *testing.T, expectConfirmations bool) { + conf := &Config{ + MinimumAcceptedPOW: 0, + DisableConfirmations: !expectConfirmations, + } + w := New(conf) + p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}}) + rw1, rw2 := p2p.MsgPipe() + errorc := make(chan error, 1) + go func() { + err := w.HandlePeer(p, rw2) + errorc <- err + }() + // so that actual read won't hang forever + time.AfterFunc(5*time.Second, func() { + rw1.Close() + }) + require.NoError(t, p2p.ExpectMsg(rw1, statusCode, []interface{}{ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, expectConfirmations})) +} + +func TestConfirmationHadnshakeExtension(t *testing.T) { + testConfirmationsHandshake(t, true) +} + +func TestHandshakeWithConfirmationsDisabled(t *testing.T) { + testConfirmationsHandshake(t, false) +} + +func TestConfirmationReceived(t *testing.T) { + conf := &Config{ + MinimumAcceptedPOW: 0, + MaxMessageSize: 10 << 20, + } + w := New(conf) + p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}}) + rw1, rw2 := p2p.MsgPipe() + errorc := make(chan error, 1) + go func() { + err := w.HandlePeer(p, rw2) + errorc <- err + }() + time.AfterFunc(5*time.Second, func() { + rw1.Close() + }) + require.NoError(t, p2p.ExpectMsg(rw1, statusCode, []interface{}{ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, true})) + require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, true)) + + e := Envelope{ + Expiry: uint32(time.Now().Add(10 * time.Second).Unix()), + TTL: 10, + Topic: TopicType{1}, + Data: make([]byte, 1<<10), + Nonce: 1, + } + data, err := rlp.EncodeToBytes([]*Envelope{&e}) + require.NoError(t, err) + hash := crypto.Keccak256Hash(data) + require.NoError(t, p2p.SendItems(rw1, messagesCode, &e)) + require.NoError(t, p2p.ExpectMsg(rw1, batchAcknowledgedCode, hash)) +} + +func TestEventsReceived(t *testing.T) { + conf := &Config{ + MinimumAcceptedPOW: 0, + MaxMessageSize: 10 << 20, + } + w := New(conf) + events := make(chan EnvelopeEvent, 2) + sub := w.SubscribeEnvelopeEvents(events) + defer sub.Unsubscribe() + + p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}}) + rw1, rw2 := p2p.MsgPipe() + errorc := make(chan error, 1) + go func() { + err := w.HandlePeer(p, rw2) + errorc <- err + }() + time.AfterFunc(5*time.Second, func() { + rw1.Close() + }) + require.NoError(t, p2p.ExpectMsg(rw1, statusCode, []interface{}{ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, true})) + require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, true)) + + e := Envelope{ + Expiry: uint32(time.Now().Add(10 * time.Second).Unix()), + TTL: 10, + Topic: TopicType{1}, + Data: make([]byte, 1<<10), + Nonce: 1, + } + require.NoError(t, w.Send(&e)) + require.NoError(t, p2p.ExpectMsg(rw1, messagesCode, []*Envelope{&e})) + + var hash common.Hash + select { + case ev := <-events: + require.Equal(t, EventEnvelopeSent, ev.Event) + require.Equal(t, p.ID(), ev.Peer) + require.NotEqual(t, common.Hash{}, ev.Batch) + hash = ev.Batch + case <-time.After(5 * time.Second): + require.FailNow(t, "timed out waiting for an envelope.sent event") + } + require.NoError(t, p2p.Send(rw1, batchAcknowledgedCode, hash)) + select { + case ev := <-events: + require.Equal(t, EventBatchAcknowledged, ev.Event) + require.Equal(t, p.ID(), ev.Peer) + require.Equal(t, hash, ev.Batch) + case <-time.After(5 * time.Second): + require.FailNow(t, "timed out waiting for an batch.acknowledged event") + } +} + +func TestEventsWithoutConfirmation(t *testing.T) { + conf := &Config{ + MinimumAcceptedPOW: 0, + MaxMessageSize: 10 << 20, + } + w := New(conf) + events := make(chan EnvelopeEvent, 2) + sub := w.SubscribeEnvelopeEvents(events) + defer sub.Unsubscribe() + + p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}}) + rw1, rw2 := p2p.MsgPipe() + errorc := make(chan error, 1) + go func() { + err := w.HandlePeer(p, rw2) + errorc <- err + }() + time.AfterFunc(5*time.Second, func() { + rw1.Close() + }) + require.NoError(t, p2p.ExpectMsg(rw1, statusCode, []interface{}{ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, true})) + require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, false)) + + e := Envelope{ + Expiry: uint32(time.Now().Add(10 * time.Second).Unix()), + TTL: 10, + Topic: TopicType{1}, + Data: make([]byte, 1<<10), + Nonce: 1, + } + require.NoError(t, w.Send(&e)) + require.NoError(t, p2p.ExpectMsg(rw1, messagesCode, []*Envelope{&e})) + + select { + case ev := <-events: + require.Equal(t, EventEnvelopeSent, ev.Event) + require.Equal(t, p.ID(), ev.Peer) + require.Equal(t, common.Hash{}, ev.Batch) + case <-time.After(5 * time.Second): + require.FailNow(t, "timed out waiting for an envelope.sent event") + } +}