Use confirmation that bundle is delivered for sending feedback to user (#1284)

* Pull whisper confirmations changes

* Use batch confirmations as a signal that envelope was sent into the network
This commit is contained in:
Dmitry Shulyak 2018-11-27 08:30:15 +02:00 committed by GitHub
parent 6b09eea749
commit aac706fe4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 154 additions and 34 deletions

6
Gopkg.lock generated
View File

@ -822,12 +822,12 @@
revision = "fbcc46a78cd43fef95a110df664aab513116a850"
[[projects]]
digest = "1:fe884981c5589ade6ea86ca876be4a744ea1344c6e8cfa17e434fcf270b04598"
digest = "1:6cb252f27feb57ef0e8406556c259d903c0ecff2ab0d2200ca85773b3561777d"
name = "github.com/status-im/whisper"
packages = ["whisperv6"]
pruneopts = "NUT"
revision = "e25ea1d673d5982b16fd3e51ed2a0d8f91b809d9"
version = "v1.3.0"
revision = "76c24476436f0cf832021be98316a4ee62cc83cc"
version = "v1.4.0"
[[projects]]
digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e"

View File

@ -29,7 +29,7 @@
[[constraint]]
name = "github.com/status-im/whisper"
version = "=v1.3.0"
version = "=v1.4.0"
[[override]]
name = "github.com/golang/protobuf"

View File

@ -71,6 +71,7 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, conf
w: w,
handler: handler,
cache: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{},
}
return &Service{
w: w,
@ -184,8 +185,9 @@ type tracker struct {
w *whisper.Whisper
handler EnvelopeEventsHandler
mu sync.Mutex
cache map[common.Hash]EnvelopeState
mu sync.Mutex
cache map[common.Hash]EnvelopeState
batches map[common.Hash]map[common.Hash]struct{}
wg sync.WaitGroup
quit chan struct{}
@ -255,6 +257,7 @@ func (t *tracker) handleEvent(event whisper.EnvelopeEvent) {
handlers := map[whisper.EventType]func(whisper.EnvelopeEvent){
whisper.EventEnvelopeSent: t.handleEventEnvelopeSent,
whisper.EventEnvelopeExpired: t.handleEventEnvelopeExpired,
whisper.EventBatchAcknowledged: t.handleAcknowledgedBatch,
whisper.EventMailServerRequestCompleted: t.handleEventMailServerRequestCompleted,
whisper.EventMailServerRequestExpired: t.handleEventMailServerRequestExpired,
}
@ -274,23 +277,53 @@ func (t *tracker) handleEventEnvelopeSent(event whisper.EnvelopeEvent) {
if !ok || state == EnvelopeSent {
return
}
log.Debug("envelope is sent", "hash", event.Hash, "peer", event.Peer)
t.cache[event.Hash] = EnvelopeSent
if t.handler != nil {
t.handler.EnvelopeSent(event.Hash)
if event.Batch != (common.Hash{}) {
if _, ok := t.batches[event.Batch]; !ok {
t.batches[event.Batch] = map[common.Hash]struct{}{}
}
t.batches[event.Batch][event.Hash] = struct{}{}
log.Debug("waiting for a confirmation", "batch", event.Batch)
} else {
log.Debug("envelope is sent", "hash", event.Hash, "peer", event.Peer)
t.cache[event.Hash] = EnvelopeSent
if t.handler != nil {
t.handler.EnvelopeSent(event.Hash)
}
}
}
func (t *tracker) handleAcknowledgedBatch(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
envelopes, ok := t.batches[event.Batch]
if !ok {
log.Debug("batch is not found", "batch", event.Batch)
}
log.Debug("received a confirmation", "batch", event.Batch, "peer", event.Peer)
for hash := range envelopes {
state, ok := t.cache[hash]
if !ok || state == EnvelopeSent {
continue
}
t.cache[hash] = EnvelopeSent
if t.handler != nil {
t.handler.EnvelopeSent(hash)
}
}
delete(t.batches, event.Batch)
}
func (t *tracker) handleEventEnvelopeExpired(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
if state, ok := t.cache[event.Hash]; ok {
log.Debug("envelope expired", "hash", event.Hash, "state", state)
delete(t.cache, event.Hash)
if state == EnvelopeSent {
return
}
log.Debug("envelope expired", "hash", event.Hash, "state", state)
if t.handler != nil {
t.handler.EnvelopeExpired(event.Hash)
}

View File

@ -411,7 +411,8 @@ type TrackerSuite struct {
func (s *TrackerSuite) SetupTest() {
s.tracker = &tracker{
cache: map[common.Hash]EnvelopeState{},
cache: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{},
}
}
@ -427,6 +428,25 @@ func (s *TrackerSuite) TestConfirmed() {
s.Equal(EnvelopeSent, s.tracker.cache[testHash])
}
func (s *TrackerSuite) TestConfirmedWithAcknowledge() {
testBatch := common.Hash{1}
s.tracker.Add(testHash)
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
Batch: testBatch,
})
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventBatchAcknowledged,
Batch: testBatch,
})
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopeSent, s.tracker.cache[testHash])
}
func (s *TrackerSuite) TestIgnored() {
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,

View File

@ -21,6 +21,7 @@ type Config struct {
MaxMessageSize uint32 `toml:",omitempty"`
MinimumAcceptedPOW float64 `toml:",omitempty"`
RestrictConnectionBetweenLightClients bool `toml:",omitempty"`
DisableConfirmations bool `toml:",omitempty"`
}
// DefaultConfig represents (shocker!) the default configuration.

View File

@ -47,6 +47,7 @@ const (
messagesCode = 1 // normal whisper message
powRequirementCode = 2 // PoW requirement
bloomFilterExCode = 3 // bloom filter exchange
batchAcknowledgedCode = 11 // confirmation that batch of envelopes was received
p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol
p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol
p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)

View File

@ -13,6 +13,8 @@ const (
EventEnvelopeSent EventType = "envelope.sent"
// EventEnvelopeExpired fires when envelop expired
EventEnvelopeExpired EventType = "envelope.expired"
// EventBatchAcknowledged is sent when batch of envelopes was acknowleged by a peer.
EventBatchAcknowledged EventType = "batch.acknowleged"
// EventEnvelopeAvailable fires when envelop is available for filters
EventEnvelopeAvailable EventType = "envelope.available"
// EventMailServerRequestCompleted fires after mailserver sends all the requested messages
@ -27,6 +29,7 @@ const (
type EnvelopeEvent struct {
Event EventType
Hash common.Hash
Batch common.Hash
Peer enode.ID
Data interface{}
}

View File

@ -17,6 +17,7 @@
package whisperv6
import (
"bytes"
"fmt"
"math"
"sync"
@ -24,6 +25,7 @@ import (
mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
@ -35,11 +37,12 @@ type Peer struct {
peer *p2p.Peer
ws p2p.MsgReadWriter
trusted bool
powRequirement float64
bloomMu sync.Mutex
bloomFilter []byte
fullNode bool
trusted bool
powRequirement float64
bloomMu sync.Mutex
bloomFilter []byte
fullNode bool
confirmationsEnabled bool
known mapset.Set // Messages already known by the peer to avoid wasting bandwidth
@ -85,8 +88,9 @@ func (peer *Peer) handshake() error {
pow := peer.host.MinPow()
powConverted := math.Float64bits(pow)
bloom := peer.host.BloomFilter()
confirmationsEnabled := !peer.host.disableConfirmations
errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode)
errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode, confirmationsEnabled)
}()
// Fetch the remote status packet and verify protocol match
@ -134,6 +138,12 @@ func (peer *Peer) handshake() error {
if isRemotePeerLightNode && isLightNode && isRestrictedLightNodeConnection {
return fmt.Errorf("peer [%x] is useless: two light client communication restricted", peer.ID())
}
confirmationsEnabled, err := s.Bool()
if err != nil || !confirmationsEnabled {
log.Warn("confirmations are disabled", "peer", peer.ID())
} else {
peer.confirmationsEnabled = confirmationsEnabled
}
if err := <-errc; err != nil {
return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err)
@ -208,19 +218,24 @@ func (peer *Peer) broadcast() error {
}
if len(bundle) > 0 {
// transmit the batch of envelopes
if err := p2p.Send(peer.ws, messagesCode, bundle); err != nil {
batchHash, err := sendBundle(peer.ws, bundle)
if err != nil {
log.Warn("failed to deliver envelopes", "peer", peer.peer.ID(), "error", err)
return err
}
// mark envelopes only if they were successfully sent
for _, e := range bundle {
peer.mark(e)
peer.host.envelopeFeed.Send(EnvelopeEvent{
event := EnvelopeEvent{
Event: EventEnvelopeSent,
Hash: e.Hash(),
Peer: peer.peer.ID(), // specifically discover.NodeID because it can be pretty printed
})
Peer: peer.peer.ID(),
}
if peer.confirmationsEnabled {
event.Batch = batchHash
}
peer.host.envelopeFeed.Send(event)
}
log.Trace("broadcast", "num. messages", len(bundle))
@ -266,3 +281,19 @@ func MakeFullNodeBloom() []byte {
}
return bloom
}
func sendBundle(rw p2p.MsgWriter, bundle []*Envelope) (rst common.Hash, err error) {
data, err := rlp.EncodeToBytes(bundle)
if err != nil {
return
}
err = rw.WriteMsg(p2p.Msg{
Code: messagesCode,
Size: uint32(len(data)),
Payload: bytes.NewBuffer(data),
})
if err != nil {
return
}
return crypto.Keccak256Hash(data), nil
}

View File

@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/syndtr/goleveldb/leveldb/errors"
@ -92,6 +93,8 @@ type Whisper struct {
settings syncmap.Map // holds configuration settings that can be dynamically changed
disableConfirmations bool // do not reply with confirmations
syncAllowance int // maximum time in seconds allowed to process the whisper-related messages
statsMu sync.Mutex // guard stats
@ -111,16 +114,17 @@ func New(cfg *Config) *Whisper {
}
whisper := &Whisper{
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
expirations: make(map[uint32]mapset.Set),
peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
syncAllowance: DefaultSyncAllowance,
timeSource: time.Now,
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
expirations: make(map[uint32]mapset.Set),
peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
syncAllowance: DefaultSyncAllowance,
timeSource: time.Now,
disableConfirmations: cfg.DisableConfirmations,
}
whisper.filters = NewFilters(whisper)
@ -780,6 +784,13 @@ func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
return whisper.runMessageLoop(whisperPeer, rw)
}
func (whisper *Whisper) sendConfirmation(peer enode.ID, rw p2p.MsgReadWriter, data []byte) {
batchHash := crypto.Keccak256Hash(data)
if err := p2p.Send(rw, batchAcknowledgedCode, batchHash); err != nil {
log.Warn("failed to deliver confirmation", "hash", batchHash, "peer", peer, "error", err)
}
}
// runMessageLoop reads and processes inbound messages directly to merge into client-global state.
func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
for {
@ -800,8 +811,17 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
log.Warn("unxepected status message received", "peer", p.peer.ID())
case messagesCode:
// decode the contained envelopes
data, err := ioutil.ReadAll(packet.Payload)
if err != nil {
log.Warn("failed to read envelopes data", "peer", p.peer.ID(), "error", err)
return errors.New("invalid enveloopes")
}
if !whisper.disableConfirmations {
go whisper.sendConfirmation(p.peer.ID(), rw, data)
}
var envelopes []*Envelope
if err := packet.Decode(&envelopes); err != nil {
if err := rlp.DecodeBytes(data, &envelopes); err != nil {
log.Warn("failed to decode envelopes, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid envelopes")
}
@ -821,6 +841,17 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
if trouble {
return errors.New("invalid envelope")
}
case batchAcknowledgedCode:
var batchHash common.Hash
if err := packet.Decode(&batchHash); err != nil {
log.Warn("failed to decode confirmation into common.Hash", "peer", p.peer.ID(), "error", err)
return errors.New("invalid confirmation message")
}
whisper.envelopeFeed.Send(EnvelopeEvent{
Batch: batchHash,
Event: EventBatchAcknowledged,
Peer: p.peer.ID(),
})
case powRequirementCode:
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
i, err := s.Uint()