diff --git a/_assets/patches/geth/0022-whisper-confirmations.patch b/_assets/patches/geth/0022-whisper-confirmations.patch new file mode 100644 index 000000000..39ba50edd --- /dev/null +++ b/_assets/patches/geth/0022-whisper-confirmations.patch @@ -0,0 +1,206 @@ +diff --git c/whisper/whisperv6/api.go w/whisper/whisperv6/api.go +index 16db034e1..25c072355 100644 +--- c/whisper/whisperv6/api.go ++++ w/whisper/whisperv6/api.go +@@ -246,6 +246,29 @@ type newMessageOverride struct { + + // Post a message on the Whisper network. + func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, error) { ++ env, err := MakeEnvelope(api.w, req) ++ if err != nil { ++ return false, err ++ } ++ // send to specific node (skip PoW check) ++ if len(req.TargetPeer) > 0 { ++ n, err := discover.ParseNode(req.TargetPeer) ++ if err != nil { ++ return false, fmt.Errorf("failed to parse target peer: %s", err) ++ } ++ return true, api.w.SendP2PMessage(n.ID[:], env) ++ } ++ ++ // ensure that the message PoW meets the node's minimum accepted PoW ++ if req.PowTarget < api.w.MinPow() { ++ return false, ErrTooLowPoW ++ } ++ ++ return true, api.w.Send(env) ++} ++ ++// MakeEnvelope create envelopes from request. ++func MakeEnvelope(w *Whisper, req NewMessage) (*Envelope, error) { + var ( + symKeyGiven = len(req.SymKeyID) > 0 + pubKeyGiven = len(req.PublicKey) > 0 +@@ -254,7 +277,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er + + // user must specify either a symmetric or an asymmetric key + if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) { +- return false, ErrSymAsym ++ return nil, ErrSymAsym + } + + params := &MessageParams{ +@@ -268,21 +291,21 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er + + // Set key that is used to sign the message + if len(req.Sig) > 0 { +- if params.Src, err = api.w.GetPrivateKey(req.Sig); err != nil { +- return false, err ++ if params.Src, err = w.GetPrivateKey(req.Sig); err != nil { ++ return nil, err + } + } + + // Set symmetric key that is used to encrypt the message + if symKeyGiven { + if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption +- return false, ErrNoTopics ++ return nil, ErrNoTopics + } +- if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil { +- return false, err ++ if params.KeySym, err = w.GetSymKey(req.SymKeyID); err != nil { ++ return nil, err + } + if !validateDataIntegrity(params.KeySym, aesKeyLength) { +- return false, ErrInvalidSymmetricKey ++ return nil, ErrInvalidSymmetricKey + } + } + +@@ -290,36 +313,21 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er + if pubKeyGiven { + params.Dst = crypto.ToECDSAPub(req.PublicKey) + if !ValidatePublicKey(params.Dst) { +- return false, ErrInvalidPublicKey ++ return nil, ErrInvalidPublicKey + } + } + + // encrypt and sent message + whisperMsg, err := NewSentMessage(params) + if err != nil { +- return false, err ++ return nil, err + } + + env, err := whisperMsg.Wrap(params) + if err != nil { +- return false, err ++ return nil, err + } +- +- // send to specific node (skip PoW check) +- if len(req.TargetPeer) > 0 { +- n, err := discover.ParseNode(req.TargetPeer) +- if err != nil { +- return false, fmt.Errorf("failed to parse target peer: %s", err) +- } +- return true, api.w.SendP2PMessage(n.ID[:], env) +- } +- +- // ensure that the message PoW meets the node's minimum accepted PoW +- if req.PowTarget < api.w.MinPow() { +- return false, ErrTooLowPoW +- } +- +- return true, api.w.Send(env) ++ return env, nil + } + + // UninstallFilter is alias for Unsubscribe +diff --git c/whisper/whisperv6/events.go w/whisper/whisperv6/events.go +new file mode 100644 +index 000000000..4f204ab5d +--- /dev/null ++++ w/whisper/whisperv6/events.go +@@ -0,0 +1,23 @@ ++package whisperv6 ++ ++import ( ++ "github.com/ethereum/go-ethereum/common" ++ "github.com/ethereum/go-ethereum/p2p/discover" ++) ++ ++// EventType used to define known envelope events. ++type EventType string ++ ++const ( ++ // EventEnvelopeSent fires when envelope was sent to a peer. ++ EventEnvelopeSent EventType = "envelope.sent" ++ // EventEnvelopeExpired fires when envelop expired ++ EventEnvelopeExpired EventType = "envelope.expired" ++) ++ ++// EnvelopeEvent used for envelopes events. ++type EnvelopeEvent struct { ++ Event EventType ++ Hash common.Hash ++ Peer discover.NodeID ++} +diff --git c/whisper/whisperv6/peer.go w/whisper/whisperv6/peer.go +index 6d75290fd..120767c33 100644 +--- c/whisper/whisperv6/peer.go ++++ w/whisper/whisperv6/peer.go +@@ -204,6 +204,11 @@ func (peer *Peer) broadcast() error { + // mark envelopes only if they were successfully sent + for _, e := range bundle { + peer.mark(e) ++ peer.host.envelopeFeed.Send(EnvelopeEvent{ ++ Event: EventEnvelopeSent, ++ Hash: e.Hash(), ++ Peer: peer.peer.ID(), // specifically discover.NodeID because it can be pretty printed ++ }) + } + + log.Trace("broadcast", "num. messages", len(bundle)) +diff --git c/whisper/whisperv6/whisper.go w/whisper/whisperv6/whisper.go +index 8bd991a9b..98115b20f 100644 +--- c/whisper/whisperv6/whisper.go ++++ w/whisper/whisperv6/whisper.go +@@ -28,6 +28,7 @@ import ( + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" ++ "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" +@@ -85,8 +86,10 @@ type Whisper struct { + statsMu sync.Mutex // guard stats + stats Statistics // Statistics of whisper node + +- mailServer MailServer // MailServer interface +- envelopeTracer EnvelopeTracer // Service collecting envelopes metadata ++ mailServer MailServer // MailServer interface ++ envelopeTracer EnvelopeTracer // Service collecting envelopes metadata ++ ++ envelopeFeed event.Feed + } + + // New creates a Whisper client ready to communicate through the Ethereum P2P network. +@@ -131,6 +134,12 @@ func New(cfg *Config) *Whisper { + return whisper + } + ++// SubscribeEnvelopeEvents subscribes to envelopes feed. ++// In order to prevent blocking whisper producers events must be amply buffered. ++func (whisper *Whisper) SubscribeEnvelopeEvents(events chan<- EnvelopeEvent) event.Subscription { ++ return whisper.envelopeFeed.Subscribe(events) ++} ++ + // MinPow returns the PoW value required by this node. + func (whisper *Whisper) MinPow() float64 { + val, exist := whisper.settings.Load(minPowIdx) +@@ -986,6 +995,10 @@ func (whisper *Whisper) expire() { + hashSet.Each(func(v interface{}) bool { + sz := whisper.envelopes[v.(common.Hash)].size() + delete(whisper.envelopes, v.(common.Hash)) ++ whisper.envelopeFeed.Send(EnvelopeEvent{ ++ Hash: v.(common.Hash), ++ Event: EventEnvelopeExpired, ++ }) + whisper.stats.messagesCleared++ + whisper.stats.memoryCleared += sz + whisper.stats.memoryUsed -= sz diff --git a/geth/node/node.go b/geth/node/node.go index f5743165a..ea8b409a0 100644 --- a/geth/node/node.go +++ b/geth/node/node.go @@ -24,6 +24,7 @@ import ( whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" "github.com/status-im/status-go/geth/params" shhmetrics "github.com/status-im/status-go/metrics/whisper" + "github.com/status-im/status-go/shhext" ) // node-related errors @@ -162,13 +163,13 @@ func activateShhService(stack *node.Node, config *params.NodeConfig) error { logger.Info("SHH protocol is disabled") return nil } - - serviceConstructor := func(*node.ServiceContext) (node.Service, error) { + var whisperService *whisper.Whisper + if err := stack.Register(func(*node.ServiceContext) (node.Service, error) { whisperServiceConfig := &whisper.Config{ MaxMessageSize: whisper.DefaultMaxMessageSize, MinimumAcceptedPOW: 0.001, } - whisperService := whisper.New(whisperServiceConfig) + whisperService = whisper.New(whisperServiceConfig) whisperConfig := config.WhisperConfig // enable metrics @@ -197,9 +198,14 @@ func activateShhService(stack *node.Node, config *params.NodeConfig) error { } return whisperService, nil + }); err != nil { + return err } - - return stack.Register(serviceConstructor) + // TODO(dshulyak) add a config option to enable it by default, but disable if app is started from statusd + return stack.Register(func(*node.ServiceContext) (node.Service, error) { + svc := shhext.New(whisperService, shhext.SendEnvelopeSentSignal) + return svc, nil + }) } // makeIPCPath returns IPC-RPC filename diff --git a/geth/signal/signals.go b/geth/signal/signals.go index 63a56ce2a..f3e4af79c 100644 --- a/geth/signal/signals.go +++ b/geth/signal/signals.go @@ -30,6 +30,9 @@ const ( // EventChainDataRemoved is triggered when node's chain data is removed EventChainDataRemoved = "chaindata.removed" + + // EventEnvelopeSent is triggered when envelope was sent atleast to a one peer. + EventEnvelopeSent = "envelope.sent" ) // Envelope is a general signal sent upward from node to RN app diff --git a/shhext/README.md b/shhext/README.md new file mode 100644 index 000000000..73993afca --- /dev/null +++ b/shhext/README.md @@ -0,0 +1,27 @@ +Whisper API Extension +===================== + +API +--- + +#### shhext_post + +Accepts same input as shh_post (see https://github.com/ethereum/wiki/wiki/JSON-RPC#shh_post) + +##### Returns + +`DATA`, 32 Bytes - the envelope hash + +Signals +------- + +Sends following event once per envelope. + +```json +{ + "type": "envelope.sent", + "event": { + "hash": "0xea0b93079ed32588628f1cabbbb5ed9e4d50b7571064c2962c3853972db67790" + } +} +``` diff --git a/shhext/api.go b/shhext/api.go new file mode 100644 index 000000000..a243c5219 --- /dev/null +++ b/shhext/api.go @@ -0,0 +1,56 @@ +package shhext + +import ( + "context" + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/p2p/discover" + whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" +) + +// NewPublicAPI returns instance of the public API. +func NewPublicAPI(w *whisper.Whisper, tracker *tracker) *PublicAPI { + return &PublicAPI{ + w: w, + tracker: tracker, + } +} + +// PublicAPI extends whisper public API. +type PublicAPI struct { + w *whisper.Whisper + tracker *tracker +} + +// Post shamelessly copied from whisper codebase with slight modifications. +func (api *PublicAPI) Post(ctx context.Context, req whisper.NewMessage) (hash common.Hash, err error) { + env, err := whisper.MakeEnvelope(api.w, req) + if err != nil { + return hash, err + } + // send to specific node (skip PoW check) + if len(req.TargetPeer) > 0 { + n, err := discover.ParseNode(req.TargetPeer) + if err != nil { + return hash, fmt.Errorf("failed to parse target peer: %s", err) + } + err = api.w.SendP2PMessage(n.ID[:], env) + if err == nil { + api.tracker.Add(env.Hash()) + return env.Hash(), nil + } + return hash, err + } + + // ensure that the message PoW meets the node's minimum accepted PoW + if req.PowTarget < api.w.MinPow() { + return hash, whisper.ErrTooLowPoW + } + err = api.w.Send(env) + if err == nil { + api.tracker.Add(env.Hash()) + return env.Hash(), nil + } + return hash, err +} diff --git a/shhext/service.go b/shhext/service.go new file mode 100644 index 000000000..08c53c23c --- /dev/null +++ b/shhext/service.go @@ -0,0 +1,155 @@ +package shhext + +import ( + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" + whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" +) + +// EnvelopeState in local tracker +type EnvelopeState int + +const ( + // EnvelopePosted is set when envelope was added to a local whisper queue. + EnvelopePosted EnvelopeState = iota + // EnvelopeSent is set when envelope is sent to atleast one peer. + EnvelopeSent +) + +// ConfirmationHandler used as a callback for confirming that envelopes were sent. +type ConfirmationHandler func(common.Hash) + +// Service is a service that provides some additional Whisper API. +type Service struct { + w *whisper.Whisper + tracker *tracker +} + +// Make sure that Service implements node.Service interface. +var _ node.Service = (*Service)(nil) + +// New returns a new Service. +func New(w *whisper.Whisper, handler ConfirmationHandler) *Service { + track := &tracker{ + w: w, + handler: handler, + cache: map[common.Hash]EnvelopeState{}, + } + return &Service{ + w: w, + tracker: track, + } +} + +// Protocols returns a new protocols list. In this case, there are none. +func (s *Service) Protocols() []p2p.Protocol { + return []p2p.Protocol{} +} + +// APIs returns a list of new APIs. +func (s *Service) APIs() []rpc.API { + return []rpc.API{ + { + Namespace: "shhext", + Version: "1.0", + Service: NewPublicAPI(s.w, s.tracker), + Public: true, + }, + } +} + +// Start is run when a service is started. +// It does nothing in this case but is required by `node.Service` interface. +func (s *Service) Start(server *p2p.Server) error { + s.tracker.Start() + return nil +} + +// Stop is run when a service is stopped. +// It does nothing in this case but is required by `node.Service` interface. +func (s *Service) Stop() error { + s.tracker.Stop() + return nil +} + +// tracker responsible for processing events for envelopes that we are interested in +// and calling specified handler. +type tracker struct { + w *whisper.Whisper + handler ConfirmationHandler + + mu sync.Mutex + cache map[common.Hash]EnvelopeState + + wg sync.WaitGroup + quit chan struct{} +} + +// Start processing events. +func (t *tracker) Start() { + t.quit = make(chan struct{}) + t.wg.Add(1) + go func() { + t.handleEnvelopeEvents() + t.wg.Done() + }() +} + +// Stop process events. +func (t *tracker) Stop() { + close(t.quit) + t.wg.Wait() +} + +// Add hash to a tracker. +func (t *tracker) Add(hash common.Hash) { + t.mu.Lock() + defer t.mu.Unlock() + t.cache[hash] = EnvelopePosted +} + +// handleEnvelopeEvents processes whisper envelope events +func (t *tracker) handleEnvelopeEvents() { + events := make(chan whisper.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper + sub := t.w.SubscribeEnvelopeEvents(events) + defer sub.Unsubscribe() + for { + select { + case <-t.quit: + return + case event := <-events: + t.handleEvent(event) + } + } +} + +// handleEvent based on type of the event either triggers +// confirmation handler or removes hash from tracker +func (t *tracker) handleEvent(event whisper.EnvelopeEvent) { + t.mu.Lock() + defer t.mu.Unlock() + switch event.Event { + case whisper.EventEnvelopeSent: + state, ok := t.cache[event.Hash] + // if we didn't send a message using extension - skip it + // if message was already confirmed - skip it + if !ok || state == EnvelopeSent { + return + } + if t.handler != nil { + log.Debug("envelope is sent", "hash", event.Hash, "peer", event.Peer) + t.handler(event.Hash) + t.cache[event.Hash] = EnvelopeSent + } + case whisper.EventEnvelopeExpired: + if _, ok := t.cache[event.Hash]; ok { + log.Debug("envelope expired", "hash", event.Hash) + delete(t.cache, event.Hash) + } + } +} diff --git a/shhext/service_test.go b/shhext/service_test.go new file mode 100644 index 000000000..b438aecbb --- /dev/null +++ b/shhext/service_test.go @@ -0,0 +1,140 @@ +package shhext + +import ( + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" + "github.com/stretchr/testify/suite" +) + +func TestShhExtSuite(t *testing.T) { + suite.Run(t, new(ShhExtSuite)) +} + +type ShhExtSuite struct { + suite.Suite + + nodes []*node.Node + services []*Service + whisper []*whisper.Whisper +} + +func (s *ShhExtSuite) SetupTest() { + s.nodes = make([]*node.Node, 2) + s.services = make([]*Service, 2) + s.whisper = make([]*whisper.Whisper, 2) + port := 21313 + for i := range s.nodes { + i := i // bind i to be usable in service constructors + cfg := &node.Config{ + Name: fmt.Sprintf("node-%d", i), + P2P: p2p.Config{ + NoDiscovery: true, + MaxPeers: 20, + ListenAddr: fmt.Sprintf(":%d", port+i), + }, + } + stack, err := node.New(cfg) + s.NoError(err) + s.whisper[i] = whisper.New(nil) + s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) { + return s.whisper[i], nil + })) + s.services[i] = New(s.whisper[i], nil) + s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) { + return s.services[i], nil + })) + s.Require().NoError(stack.Start()) + s.nodes[i] = stack + } + s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self()) +} + +func (s *ShhExtSuite) TestPostMessageWithConfirmation() { + confirmations := make(chan common.Hash, 1) + confirmationsHandler := func(hash common.Hash) { + confirmations <- hash + } + s.services[0].tracker.handler = confirmationsHandler + symID, err := s.whisper[0].GenerateSymKey() + s.NoError(err) + client, err := s.nodes[0].Attach() + s.NoError(err) + var hash common.Hash + s.NoError(client.Call(&hash, "shhext_post", whisper.NewMessage{ + SymKeyID: symID, + PowTarget: whisper.DefaultMinimumPoW, + PowTime: 200, + Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01}, + Payload: []byte("hello"), + })) + s.NoError(err) + select { + case confirmed := <-confirmations: + s.Equal(hash, confirmed) + case <-time.After(time.Second): + s.Fail("timed out while waiting for confirmation") + } +} + +func (s *ShhExtSuite) TearDown() { + for _, n := range s.nodes { + s.NoError(n.Stop()) + } +} + +var ( + testHash = common.Hash{0x01} +) + +func TestTrackerSuite(t *testing.T) { + suite.Run(t, new(TrackerSuite)) +} + +type TrackerSuite struct { + suite.Suite + + tracker *tracker +} + +func (s *TrackerSuite) SetupTest() { + s.tracker = &tracker{ + handler: func(common.Hash) {}, + cache: map[common.Hash]EnvelopeState{}, + } +} + +func (s *TrackerSuite) TestConfirmed() { + s.tracker.Add(testHash) + s.Contains(s.tracker.cache, testHash) + s.Equal(EnvelopePosted, s.tracker.cache[testHash]) + s.tracker.handleEvent(whisper.EnvelopeEvent{ + Event: whisper.EventEnvelopeSent, + Hash: testHash, + }) + s.Contains(s.tracker.cache, testHash) + s.Equal(EnvelopeSent, s.tracker.cache[testHash]) +} + +func (s *TrackerSuite) TestIgnored() { + s.tracker.handleEvent(whisper.EnvelopeEvent{ + Event: whisper.EventEnvelopeSent, + Hash: testHash, + }) + s.NotContains(s.tracker.cache, testHash) +} + +func (s *TrackerSuite) TestRemoved() { + s.tracker.Add(testHash) + s.Contains(s.tracker.cache, testHash) + s.tracker.handleEvent(whisper.EnvelopeEvent{ + Event: whisper.EventEnvelopeExpired, + Hash: testHash, + }) + s.NotContains(s.tracker.cache, testHash) +} diff --git a/shhext/signal.go b/shhext/signal.go new file mode 100644 index 000000000..fe5f8a343 --- /dev/null +++ b/shhext/signal.go @@ -0,0 +1,21 @@ +package shhext + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/status-im/status-go/geth/signal" +) + +// EnvelopeSentSignal includes hash of the sent envelope. +type EnvelopeSentSignal struct { + Hash common.Hash `json:"hash"` +} + +// SendEnvelopeSentSignal sends an envelope.sent signal with hash of the envelope. +func SendEnvelopeSentSignal(hash common.Hash) { + signal.Send(signal.Envelope{ + Type: signal.EventEnvelopeSent, + Event: EnvelopeSentSignal{ + Hash: hash, + }, + }) +} diff --git a/t/e2e/whisper/whisper_ext_test.go b/t/e2e/whisper/whisper_ext_test.go new file mode 100644 index 000000000..7eecddd2b --- /dev/null +++ b/t/e2e/whisper/whisper_ext_test.go @@ -0,0 +1,94 @@ +package whisper + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" + "github.com/status-im/status-go/geth/node" + "github.com/status-im/status-go/geth/params" + "github.com/status-im/status-go/geth/signal" + "github.com/status-im/status-go/shhext" + "github.com/stretchr/testify/suite" +) + +func TestWhisperExtentionSuite(t *testing.T) { + suite.Run(t, new(WhisperExtentionSuite)) +} + +type WhisperExtentionSuite struct { + suite.Suite + + nodes []*node.StatusNode +} + +func (s *WhisperExtentionSuite) SetupTest() { + s.nodes = make([]*node.StatusNode, 2) + for i := range s.nodes { + dir, err := ioutil.TempDir("", "test-shhext-") + s.NoError(err) + // network id is irrelevant + cfg, err := params.NewNodeConfig(dir, "", 777, true) + cfg.LightEthConfig.Enabled = false + cfg.Name = fmt.Sprintf("test-shhext-%d", i) + s.Require().NoError(err) + s.nodes[i] = node.New() + s.Require().NoError(s.nodes[i].Start(cfg)) + } + node1, err := s.nodes[0].GethNode() + s.NoError(err) + node2, err := s.nodes[1].GethNode() + s.NoError(err) + node1.Server().AddPeer(node2.Server().Self()) +} + +func (s *WhisperExtentionSuite) TestRecievedSignal() { + confirmed := make(chan common.Hash, 1) + signal.SetDefaultNodeNotificationHandler(func(rawSignal string) { + var sg struct { + Type string + Event json.RawMessage + } + s.NoError(json.Unmarshal([]byte(rawSignal), &sg)) + + if sg.Type == signal.EventEnvelopeSent { + var event shhext.EnvelopeSentSignal + s.NoError(json.Unmarshal(sg.Event, &event)) + confirmed <- event.Hash + } + }) + client := s.nodes[0].RPCClient() + s.NotNil(client) + var symID string + s.NoError(client.Call(&symID, "shh_newSymKey")) + msg := whisper.NewMessage{ + SymKeyID: symID, + PowTarget: whisper.DefaultMinimumPoW, + PowTime: 200, + Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01}, + Payload: []byte("hello"), + } + var hash common.Hash + s.NoError(client.Call(&hash, "shhext_post", msg)) + s.NotEqual(common.Hash{}, hash) + select { + case conf := <-confirmed: + s.Equal(hash, conf) + case <-time.After(time.Second): + s.Fail("timed out while waiting for confirmation") + } +} + +func (s *WhisperExtentionSuite) TearDown() { + for _, n := range s.nodes { + cfg, err := n.Config() + s.NoError(err) + s.NoError(n.Stop()) + s.NoError(os.Remove(cfg.DataDir)) + } +} diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/api.go b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/api.go index 16db034e1..25c072355 100644 --- a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/api.go +++ b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/api.go @@ -246,65 +246,10 @@ type newMessageOverride struct { // Post a message on the Whisper network. func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, error) { - var ( - symKeyGiven = len(req.SymKeyID) > 0 - pubKeyGiven = len(req.PublicKey) > 0 - err error - ) - - // user must specify either a symmetric or an asymmetric key - if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) { - return false, ErrSymAsym - } - - params := &MessageParams{ - TTL: req.TTL, - Payload: req.Payload, - Padding: req.Padding, - WorkTime: req.PowTime, - PoW: req.PowTarget, - Topic: req.Topic, - } - - // Set key that is used to sign the message - if len(req.Sig) > 0 { - if params.Src, err = api.w.GetPrivateKey(req.Sig); err != nil { - return false, err - } - } - - // Set symmetric key that is used to encrypt the message - if symKeyGiven { - if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption - return false, ErrNoTopics - } - if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil { - return false, err - } - if !validateDataIntegrity(params.KeySym, aesKeyLength) { - return false, ErrInvalidSymmetricKey - } - } - - // Set asymmetric key that is used to encrypt the message - if pubKeyGiven { - params.Dst = crypto.ToECDSAPub(req.PublicKey) - if !ValidatePublicKey(params.Dst) { - return false, ErrInvalidPublicKey - } - } - - // encrypt and sent message - whisperMsg, err := NewSentMessage(params) + env, err := MakeEnvelope(api.w, req) if err != nil { return false, err } - - env, err := whisperMsg.Wrap(params) - if err != nil { - return false, err - } - // send to specific node (skip PoW check) if len(req.TargetPeer) > 0 { n, err := discover.ParseNode(req.TargetPeer) @@ -322,6 +267,69 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er return true, api.w.Send(env) } +// MakeEnvelope create envelopes from request. +func MakeEnvelope(w *Whisper, req NewMessage) (*Envelope, error) { + var ( + symKeyGiven = len(req.SymKeyID) > 0 + pubKeyGiven = len(req.PublicKey) > 0 + err error + ) + + // user must specify either a symmetric or an asymmetric key + if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) { + return nil, ErrSymAsym + } + + params := &MessageParams{ + TTL: req.TTL, + Payload: req.Payload, + Padding: req.Padding, + WorkTime: req.PowTime, + PoW: req.PowTarget, + Topic: req.Topic, + } + + // Set key that is used to sign the message + if len(req.Sig) > 0 { + if params.Src, err = w.GetPrivateKey(req.Sig); err != nil { + return nil, err + } + } + + // Set symmetric key that is used to encrypt the message + if symKeyGiven { + if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption + return nil, ErrNoTopics + } + if params.KeySym, err = w.GetSymKey(req.SymKeyID); err != nil { + return nil, err + } + if !validateDataIntegrity(params.KeySym, aesKeyLength) { + return nil, ErrInvalidSymmetricKey + } + } + + // Set asymmetric key that is used to encrypt the message + if pubKeyGiven { + params.Dst = crypto.ToECDSAPub(req.PublicKey) + if !ValidatePublicKey(params.Dst) { + return nil, ErrInvalidPublicKey + } + } + + // encrypt and sent message + whisperMsg, err := NewSentMessage(params) + if err != nil { + return nil, err + } + + env, err := whisperMsg.Wrap(params) + if err != nil { + return nil, err + } + return env, nil +} + // UninstallFilter is alias for Unsubscribe func (api *PublicWhisperAPI) UninstallFilter(id string) { api.w.Unsubscribe(id) diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/events.go b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/events.go new file mode 100644 index 000000000..4f204ab5d --- /dev/null +++ b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/events.go @@ -0,0 +1,23 @@ +package whisperv6 + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +// EventType used to define known envelope events. +type EventType string + +const ( + // EventEnvelopeSent fires when envelope was sent to a peer. + EventEnvelopeSent EventType = "envelope.sent" + // EventEnvelopeExpired fires when envelop expired + EventEnvelopeExpired EventType = "envelope.expired" +) + +// EnvelopeEvent used for envelopes events. +type EnvelopeEvent struct { + Event EventType + Hash common.Hash + Peer discover.NodeID +} diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/peer.go b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/peer.go index 6d75290fd..120767c33 100644 --- a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/peer.go +++ b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/peer.go @@ -204,6 +204,11 @@ func (peer *Peer) broadcast() error { // mark envelopes only if they were successfully sent for _, e := range bundle { peer.mark(e) + peer.host.envelopeFeed.Send(EnvelopeEvent{ + Event: EventEnvelopeSent, + Hash: e.Hash(), + Peer: peer.peer.ID(), // specifically discover.NodeID because it can be pretty printed + }) } log.Trace("broadcast", "num. messages", len(bundle)) diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/whisper.go b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/whisper.go index 8bd991a9b..98115b20f 100644 --- a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/whisper.go +++ b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/whisper.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" @@ -85,8 +86,10 @@ type Whisper struct { statsMu sync.Mutex // guard stats stats Statistics // Statistics of whisper node - mailServer MailServer // MailServer interface - envelopeTracer EnvelopeTracer // Service collecting envelopes metadata + mailServer MailServer // MailServer interface + envelopeTracer EnvelopeTracer // Service collecting envelopes metadata + + envelopeFeed event.Feed } // New creates a Whisper client ready to communicate through the Ethereum P2P network. @@ -131,6 +134,12 @@ func New(cfg *Config) *Whisper { return whisper } +// SubscribeEnvelopeEvents subscribes to envelopes feed. +// In order to prevent blocking whisper producers events must be amply buffered. +func (whisper *Whisper) SubscribeEnvelopeEvents(events chan<- EnvelopeEvent) event.Subscription { + return whisper.envelopeFeed.Subscribe(events) +} + // MinPow returns the PoW value required by this node. func (whisper *Whisper) MinPow() float64 { val, exist := whisper.settings.Load(minPowIdx) @@ -986,6 +995,10 @@ func (whisper *Whisper) expire() { hashSet.Each(func(v interface{}) bool { sz := whisper.envelopes[v.(common.Hash)].size() delete(whisper.envelopes, v.(common.Hash)) + whisper.envelopeFeed.Send(EnvelopeEvent{ + Hash: v.(common.Hash), + Event: EventEnvelopeExpired, + }) whisper.stats.messagesCleared++ whisper.stats.memoryCleared += sz whisper.stats.memoryUsed -= sz