Implement shh api extension that allows to confirm that message is sent (#814)

* Implement shh api extension that allows to confirm that message is sent

* Add a patch

* Fix linter

* Add readme

* Add tests for tracker

* Address review
This commit is contained in:
Dmitry Shulyak 2018-04-11 18:41:51 +03:00 committed by GitHub
parent e6610f3b15
commit ba9a25e284
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 820 additions and 63 deletions

View File

@ -0,0 +1,206 @@
diff --git c/whisper/whisperv6/api.go w/whisper/whisperv6/api.go
index 16db034e1..25c072355 100644
--- c/whisper/whisperv6/api.go
+++ w/whisper/whisperv6/api.go
@@ -246,6 +246,29 @@ type newMessageOverride struct {
// Post a message on the Whisper network.
func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, error) {
+ env, err := MakeEnvelope(api.w, req)
+ if err != nil {
+ return false, err
+ }
+ // send to specific node (skip PoW check)
+ if len(req.TargetPeer) > 0 {
+ n, err := discover.ParseNode(req.TargetPeer)
+ if err != nil {
+ return false, fmt.Errorf("failed to parse target peer: %s", err)
+ }
+ return true, api.w.SendP2PMessage(n.ID[:], env)
+ }
+
+ // ensure that the message PoW meets the node's minimum accepted PoW
+ if req.PowTarget < api.w.MinPow() {
+ return false, ErrTooLowPoW
+ }
+
+ return true, api.w.Send(env)
+}
+
+// MakeEnvelope create envelopes from request.
+func MakeEnvelope(w *Whisper, req NewMessage) (*Envelope, error) {
var (
symKeyGiven = len(req.SymKeyID) > 0
pubKeyGiven = len(req.PublicKey) > 0
@@ -254,7 +277,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
// user must specify either a symmetric or an asymmetric key
if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) {
- return false, ErrSymAsym
+ return nil, ErrSymAsym
}
params := &MessageParams{
@@ -268,21 +291,21 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
// Set key that is used to sign the message
if len(req.Sig) > 0 {
- if params.Src, err = api.w.GetPrivateKey(req.Sig); err != nil {
- return false, err
+ if params.Src, err = w.GetPrivateKey(req.Sig); err != nil {
+ return nil, err
}
}
// Set symmetric key that is used to encrypt the message
if symKeyGiven {
if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption
- return false, ErrNoTopics
+ return nil, ErrNoTopics
}
- if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil {
- return false, err
+ if params.KeySym, err = w.GetSymKey(req.SymKeyID); err != nil {
+ return nil, err
}
if !validateDataIntegrity(params.KeySym, aesKeyLength) {
- return false, ErrInvalidSymmetricKey
+ return nil, ErrInvalidSymmetricKey
}
}
@@ -290,36 +313,21 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
if pubKeyGiven {
params.Dst = crypto.ToECDSAPub(req.PublicKey)
if !ValidatePublicKey(params.Dst) {
- return false, ErrInvalidPublicKey
+ return nil, ErrInvalidPublicKey
}
}
// encrypt and sent message
whisperMsg, err := NewSentMessage(params)
if err != nil {
- return false, err
+ return nil, err
}
env, err := whisperMsg.Wrap(params)
if err != nil {
- return false, err
+ return nil, err
}
-
- // send to specific node (skip PoW check)
- if len(req.TargetPeer) > 0 {
- n, err := discover.ParseNode(req.TargetPeer)
- if err != nil {
- return false, fmt.Errorf("failed to parse target peer: %s", err)
- }
- return true, api.w.SendP2PMessage(n.ID[:], env)
- }
-
- // ensure that the message PoW meets the node's minimum accepted PoW
- if req.PowTarget < api.w.MinPow() {
- return false, ErrTooLowPoW
- }
-
- return true, api.w.Send(env)
+ return env, nil
}
// UninstallFilter is alias for Unsubscribe
diff --git c/whisper/whisperv6/events.go w/whisper/whisperv6/events.go
new file mode 100644
index 000000000..4f204ab5d
--- /dev/null
+++ w/whisper/whisperv6/events.go
@@ -0,0 +1,23 @@
+package whisperv6
+
+import (
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+// EventType used to define known envelope events.
+type EventType string
+
+const (
+ // EventEnvelopeSent fires when envelope was sent to a peer.
+ EventEnvelopeSent EventType = "envelope.sent"
+ // EventEnvelopeExpired fires when envelop expired
+ EventEnvelopeExpired EventType = "envelope.expired"
+)
+
+// EnvelopeEvent used for envelopes events.
+type EnvelopeEvent struct {
+ Event EventType
+ Hash common.Hash
+ Peer discover.NodeID
+}
diff --git c/whisper/whisperv6/peer.go w/whisper/whisperv6/peer.go
index 6d75290fd..120767c33 100644
--- c/whisper/whisperv6/peer.go
+++ w/whisper/whisperv6/peer.go
@@ -204,6 +204,11 @@ func (peer *Peer) broadcast() error {
// mark envelopes only if they were successfully sent
for _, e := range bundle {
peer.mark(e)
+ peer.host.envelopeFeed.Send(EnvelopeEvent{
+ Event: EventEnvelopeSent,
+ Hash: e.Hash(),
+ Peer: peer.peer.ID(), // specifically discover.NodeID because it can be pretty printed
+ })
}
log.Trace("broadcast", "num. messages", len(bundle))
diff --git c/whisper/whisperv6/whisper.go w/whisper/whisperv6/whisper.go
index 8bd991a9b..98115b20f 100644
--- c/whisper/whisperv6/whisper.go
+++ w/whisper/whisperv6/whisper.go
@@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
@@ -85,8 +86,10 @@ type Whisper struct {
statsMu sync.Mutex // guard stats
stats Statistics // Statistics of whisper node
- mailServer MailServer // MailServer interface
- envelopeTracer EnvelopeTracer // Service collecting envelopes metadata
+ mailServer MailServer // MailServer interface
+ envelopeTracer EnvelopeTracer // Service collecting envelopes metadata
+
+ envelopeFeed event.Feed
}
// New creates a Whisper client ready to communicate through the Ethereum P2P network.
@@ -131,6 +134,12 @@ func New(cfg *Config) *Whisper {
return whisper
}
+// SubscribeEnvelopeEvents subscribes to envelopes feed.
+// In order to prevent blocking whisper producers events must be amply buffered.
+func (whisper *Whisper) SubscribeEnvelopeEvents(events chan<- EnvelopeEvent) event.Subscription {
+ return whisper.envelopeFeed.Subscribe(events)
+}
+
// MinPow returns the PoW value required by this node.
func (whisper *Whisper) MinPow() float64 {
val, exist := whisper.settings.Load(minPowIdx)
@@ -986,6 +995,10 @@ func (whisper *Whisper) expire() {
hashSet.Each(func(v interface{}) bool {
sz := whisper.envelopes[v.(common.Hash)].size()
delete(whisper.envelopes, v.(common.Hash))
+ whisper.envelopeFeed.Send(EnvelopeEvent{
+ Hash: v.(common.Hash),
+ Event: EventEnvelopeExpired,
+ })
whisper.stats.messagesCleared++
whisper.stats.memoryCleared += sz
whisper.stats.memoryUsed -= sz

View File

@ -24,6 +24,7 @@ import (
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/status-im/status-go/geth/params"
shhmetrics "github.com/status-im/status-go/metrics/whisper"
"github.com/status-im/status-go/shhext"
)
// node-related errors
@ -162,13 +163,13 @@ func activateShhService(stack *node.Node, config *params.NodeConfig) error {
logger.Info("SHH protocol is disabled")
return nil
}
serviceConstructor := func(*node.ServiceContext) (node.Service, error) {
var whisperService *whisper.Whisper
if err := stack.Register(func(*node.ServiceContext) (node.Service, error) {
whisperServiceConfig := &whisper.Config{
MaxMessageSize: whisper.DefaultMaxMessageSize,
MinimumAcceptedPOW: 0.001,
}
whisperService := whisper.New(whisperServiceConfig)
whisperService = whisper.New(whisperServiceConfig)
whisperConfig := config.WhisperConfig
// enable metrics
@ -197,9 +198,14 @@ func activateShhService(stack *node.Node, config *params.NodeConfig) error {
}
return whisperService, nil
}); err != nil {
return err
}
return stack.Register(serviceConstructor)
// TODO(dshulyak) add a config option to enable it by default, but disable if app is started from statusd
return stack.Register(func(*node.ServiceContext) (node.Service, error) {
svc := shhext.New(whisperService, shhext.SendEnvelopeSentSignal)
return svc, nil
})
}
// makeIPCPath returns IPC-RPC filename

View File

@ -30,6 +30,9 @@ const (
// EventChainDataRemoved is triggered when node's chain data is removed
EventChainDataRemoved = "chaindata.removed"
// EventEnvelopeSent is triggered when envelope was sent atleast to a one peer.
EventEnvelopeSent = "envelope.sent"
)
// Envelope is a general signal sent upward from node to RN app

27
shhext/README.md Normal file
View File

@ -0,0 +1,27 @@
Whisper API Extension
=====================
API
---
#### shhext_post
Accepts same input as shh_post (see https://github.com/ethereum/wiki/wiki/JSON-RPC#shh_post)
##### Returns
`DATA`, 32 Bytes - the envelope hash
Signals
-------
Sends following event once per envelope.
```json
{
"type": "envelope.sent",
"event": {
"hash": "0xea0b93079ed32588628f1cabbbb5ed9e4d50b7571064c2962c3853972db67790"
}
}
```

56
shhext/api.go Normal file
View File

@ -0,0 +1,56 @@
package shhext
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/discover"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
)
// NewPublicAPI returns instance of the public API.
func NewPublicAPI(w *whisper.Whisper, tracker *tracker) *PublicAPI {
return &PublicAPI{
w: w,
tracker: tracker,
}
}
// PublicAPI extends whisper public API.
type PublicAPI struct {
w *whisper.Whisper
tracker *tracker
}
// Post shamelessly copied from whisper codebase with slight modifications.
func (api *PublicAPI) Post(ctx context.Context, req whisper.NewMessage) (hash common.Hash, err error) {
env, err := whisper.MakeEnvelope(api.w, req)
if err != nil {
return hash, err
}
// send to specific node (skip PoW check)
if len(req.TargetPeer) > 0 {
n, err := discover.ParseNode(req.TargetPeer)
if err != nil {
return hash, fmt.Errorf("failed to parse target peer: %s", err)
}
err = api.w.SendP2PMessage(n.ID[:], env)
if err == nil {
api.tracker.Add(env.Hash())
return env.Hash(), nil
}
return hash, err
}
// ensure that the message PoW meets the node's minimum accepted PoW
if req.PowTarget < api.w.MinPow() {
return hash, whisper.ErrTooLowPoW
}
err = api.w.Send(env)
if err == nil {
api.tracker.Add(env.Hash())
return env.Hash(), nil
}
return hash, err
}

155
shhext/service.go Normal file
View File

@ -0,0 +1,155 @@
package shhext
import (
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
)
// EnvelopeState in local tracker
type EnvelopeState int
const (
// EnvelopePosted is set when envelope was added to a local whisper queue.
EnvelopePosted EnvelopeState = iota
// EnvelopeSent is set when envelope is sent to atleast one peer.
EnvelopeSent
)
// ConfirmationHandler used as a callback for confirming that envelopes were sent.
type ConfirmationHandler func(common.Hash)
// Service is a service that provides some additional Whisper API.
type Service struct {
w *whisper.Whisper
tracker *tracker
}
// Make sure that Service implements node.Service interface.
var _ node.Service = (*Service)(nil)
// New returns a new Service.
func New(w *whisper.Whisper, handler ConfirmationHandler) *Service {
track := &tracker{
w: w,
handler: handler,
cache: map[common.Hash]EnvelopeState{},
}
return &Service{
w: w,
tracker: track,
}
}
// Protocols returns a new protocols list. In this case, there are none.
func (s *Service) Protocols() []p2p.Protocol {
return []p2p.Protocol{}
}
// APIs returns a list of new APIs.
func (s *Service) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "shhext",
Version: "1.0",
Service: NewPublicAPI(s.w, s.tracker),
Public: true,
},
}
}
// Start is run when a service is started.
// It does nothing in this case but is required by `node.Service` interface.
func (s *Service) Start(server *p2p.Server) error {
s.tracker.Start()
return nil
}
// Stop is run when a service is stopped.
// It does nothing in this case but is required by `node.Service` interface.
func (s *Service) Stop() error {
s.tracker.Stop()
return nil
}
// tracker responsible for processing events for envelopes that we are interested in
// and calling specified handler.
type tracker struct {
w *whisper.Whisper
handler ConfirmationHandler
mu sync.Mutex
cache map[common.Hash]EnvelopeState
wg sync.WaitGroup
quit chan struct{}
}
// Start processing events.
func (t *tracker) Start() {
t.quit = make(chan struct{})
t.wg.Add(1)
go func() {
t.handleEnvelopeEvents()
t.wg.Done()
}()
}
// Stop process events.
func (t *tracker) Stop() {
close(t.quit)
t.wg.Wait()
}
// Add hash to a tracker.
func (t *tracker) Add(hash common.Hash) {
t.mu.Lock()
defer t.mu.Unlock()
t.cache[hash] = EnvelopePosted
}
// handleEnvelopeEvents processes whisper envelope events
func (t *tracker) handleEnvelopeEvents() {
events := make(chan whisper.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper
sub := t.w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
for {
select {
case <-t.quit:
return
case event := <-events:
t.handleEvent(event)
}
}
}
// handleEvent based on type of the event either triggers
// confirmation handler or removes hash from tracker
func (t *tracker) handleEvent(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
switch event.Event {
case whisper.EventEnvelopeSent:
state, ok := t.cache[event.Hash]
// if we didn't send a message using extension - skip it
// if message was already confirmed - skip it
if !ok || state == EnvelopeSent {
return
}
if t.handler != nil {
log.Debug("envelope is sent", "hash", event.Hash, "peer", event.Peer)
t.handler(event.Hash)
t.cache[event.Hash] = EnvelopeSent
}
case whisper.EventEnvelopeExpired:
if _, ok := t.cache[event.Hash]; ok {
log.Debug("envelope expired", "hash", event.Hash)
delete(t.cache, event.Hash)
}
}
}

140
shhext/service_test.go Normal file
View File

@ -0,0 +1,140 @@
package shhext
import (
"fmt"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/stretchr/testify/suite"
)
func TestShhExtSuite(t *testing.T) {
suite.Run(t, new(ShhExtSuite))
}
type ShhExtSuite struct {
suite.Suite
nodes []*node.Node
services []*Service
whisper []*whisper.Whisper
}
func (s *ShhExtSuite) SetupTest() {
s.nodes = make([]*node.Node, 2)
s.services = make([]*Service, 2)
s.whisper = make([]*whisper.Whisper, 2)
port := 21313
for i := range s.nodes {
i := i // bind i to be usable in service constructors
cfg := &node.Config{
Name: fmt.Sprintf("node-%d", i),
P2P: p2p.Config{
NoDiscovery: true,
MaxPeers: 20,
ListenAddr: fmt.Sprintf(":%d", port+i),
},
}
stack, err := node.New(cfg)
s.NoError(err)
s.whisper[i] = whisper.New(nil)
s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) {
return s.whisper[i], nil
}))
s.services[i] = New(s.whisper[i], nil)
s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) {
return s.services[i], nil
}))
s.Require().NoError(stack.Start())
s.nodes[i] = stack
}
s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self())
}
func (s *ShhExtSuite) TestPostMessageWithConfirmation() {
confirmations := make(chan common.Hash, 1)
confirmationsHandler := func(hash common.Hash) {
confirmations <- hash
}
s.services[0].tracker.handler = confirmationsHandler
symID, err := s.whisper[0].GenerateSymKey()
s.NoError(err)
client, err := s.nodes[0].Attach()
s.NoError(err)
var hash common.Hash
s.NoError(client.Call(&hash, "shhext_post", whisper.NewMessage{
SymKeyID: symID,
PowTarget: whisper.DefaultMinimumPoW,
PowTime: 200,
Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01},
Payload: []byte("hello"),
}))
s.NoError(err)
select {
case confirmed := <-confirmations:
s.Equal(hash, confirmed)
case <-time.After(time.Second):
s.Fail("timed out while waiting for confirmation")
}
}
func (s *ShhExtSuite) TearDown() {
for _, n := range s.nodes {
s.NoError(n.Stop())
}
}
var (
testHash = common.Hash{0x01}
)
func TestTrackerSuite(t *testing.T) {
suite.Run(t, new(TrackerSuite))
}
type TrackerSuite struct {
suite.Suite
tracker *tracker
}
func (s *TrackerSuite) SetupTest() {
s.tracker = &tracker{
handler: func(common.Hash) {},
cache: map[common.Hash]EnvelopeState{},
}
}
func (s *TrackerSuite) TestConfirmed() {
s.tracker.Add(testHash)
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
})
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopeSent, s.tracker.cache[testHash])
}
func (s *TrackerSuite) TestIgnored() {
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
})
s.NotContains(s.tracker.cache, testHash)
}
func (s *TrackerSuite) TestRemoved() {
s.tracker.Add(testHash)
s.Contains(s.tracker.cache, testHash)
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeExpired,
Hash: testHash,
})
s.NotContains(s.tracker.cache, testHash)
}

21
shhext/signal.go Normal file
View File

@ -0,0 +1,21 @@
package shhext
import (
"github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/geth/signal"
)
// EnvelopeSentSignal includes hash of the sent envelope.
type EnvelopeSentSignal struct {
Hash common.Hash `json:"hash"`
}
// SendEnvelopeSentSignal sends an envelope.sent signal with hash of the envelope.
func SendEnvelopeSentSignal(hash common.Hash) {
signal.Send(signal.Envelope{
Type: signal.EventEnvelopeSent,
Event: EnvelopeSentSignal{
Hash: hash,
},
})
}

View File

@ -0,0 +1,94 @@
package whisper
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/status-im/status-go/geth/node"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/shhext"
"github.com/stretchr/testify/suite"
)
func TestWhisperExtentionSuite(t *testing.T) {
suite.Run(t, new(WhisperExtentionSuite))
}
type WhisperExtentionSuite struct {
suite.Suite
nodes []*node.StatusNode
}
func (s *WhisperExtentionSuite) SetupTest() {
s.nodes = make([]*node.StatusNode, 2)
for i := range s.nodes {
dir, err := ioutil.TempDir("", "test-shhext-")
s.NoError(err)
// network id is irrelevant
cfg, err := params.NewNodeConfig(dir, "", 777, true)
cfg.LightEthConfig.Enabled = false
cfg.Name = fmt.Sprintf("test-shhext-%d", i)
s.Require().NoError(err)
s.nodes[i] = node.New()
s.Require().NoError(s.nodes[i].Start(cfg))
}
node1, err := s.nodes[0].GethNode()
s.NoError(err)
node2, err := s.nodes[1].GethNode()
s.NoError(err)
node1.Server().AddPeer(node2.Server().Self())
}
func (s *WhisperExtentionSuite) TestRecievedSignal() {
confirmed := make(chan common.Hash, 1)
signal.SetDefaultNodeNotificationHandler(func(rawSignal string) {
var sg struct {
Type string
Event json.RawMessage
}
s.NoError(json.Unmarshal([]byte(rawSignal), &sg))
if sg.Type == signal.EventEnvelopeSent {
var event shhext.EnvelopeSentSignal
s.NoError(json.Unmarshal(sg.Event, &event))
confirmed <- event.Hash
}
})
client := s.nodes[0].RPCClient()
s.NotNil(client)
var symID string
s.NoError(client.Call(&symID, "shh_newSymKey"))
msg := whisper.NewMessage{
SymKeyID: symID,
PowTarget: whisper.DefaultMinimumPoW,
PowTime: 200,
Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01},
Payload: []byte("hello"),
}
var hash common.Hash
s.NoError(client.Call(&hash, "shhext_post", msg))
s.NotEqual(common.Hash{}, hash)
select {
case conf := <-confirmed:
s.Equal(hash, conf)
case <-time.After(time.Second):
s.Fail("timed out while waiting for confirmation")
}
}
func (s *WhisperExtentionSuite) TearDown() {
for _, n := range s.nodes {
cfg, err := n.Config()
s.NoError(err)
s.NoError(n.Stop())
s.NoError(os.Remove(cfg.DataDir))
}
}

View File

@ -246,65 +246,10 @@ type newMessageOverride struct {
// Post a message on the Whisper network.
func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, error) {
var (
symKeyGiven = len(req.SymKeyID) > 0
pubKeyGiven = len(req.PublicKey) > 0
err error
)
// user must specify either a symmetric or an asymmetric key
if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) {
return false, ErrSymAsym
}
params := &MessageParams{
TTL: req.TTL,
Payload: req.Payload,
Padding: req.Padding,
WorkTime: req.PowTime,
PoW: req.PowTarget,
Topic: req.Topic,
}
// Set key that is used to sign the message
if len(req.Sig) > 0 {
if params.Src, err = api.w.GetPrivateKey(req.Sig); err != nil {
return false, err
}
}
// Set symmetric key that is used to encrypt the message
if symKeyGiven {
if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption
return false, ErrNoTopics
}
if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil {
return false, err
}
if !validateDataIntegrity(params.KeySym, aesKeyLength) {
return false, ErrInvalidSymmetricKey
}
}
// Set asymmetric key that is used to encrypt the message
if pubKeyGiven {
params.Dst = crypto.ToECDSAPub(req.PublicKey)
if !ValidatePublicKey(params.Dst) {
return false, ErrInvalidPublicKey
}
}
// encrypt and sent message
whisperMsg, err := NewSentMessage(params)
env, err := MakeEnvelope(api.w, req)
if err != nil {
return false, err
}
env, err := whisperMsg.Wrap(params)
if err != nil {
return false, err
}
// send to specific node (skip PoW check)
if len(req.TargetPeer) > 0 {
n, err := discover.ParseNode(req.TargetPeer)
@ -322,6 +267,69 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
return true, api.w.Send(env)
}
// MakeEnvelope create envelopes from request.
func MakeEnvelope(w *Whisper, req NewMessage) (*Envelope, error) {
var (
symKeyGiven = len(req.SymKeyID) > 0
pubKeyGiven = len(req.PublicKey) > 0
err error
)
// user must specify either a symmetric or an asymmetric key
if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) {
return nil, ErrSymAsym
}
params := &MessageParams{
TTL: req.TTL,
Payload: req.Payload,
Padding: req.Padding,
WorkTime: req.PowTime,
PoW: req.PowTarget,
Topic: req.Topic,
}
// Set key that is used to sign the message
if len(req.Sig) > 0 {
if params.Src, err = w.GetPrivateKey(req.Sig); err != nil {
return nil, err
}
}
// Set symmetric key that is used to encrypt the message
if symKeyGiven {
if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption
return nil, ErrNoTopics
}
if params.KeySym, err = w.GetSymKey(req.SymKeyID); err != nil {
return nil, err
}
if !validateDataIntegrity(params.KeySym, aesKeyLength) {
return nil, ErrInvalidSymmetricKey
}
}
// Set asymmetric key that is used to encrypt the message
if pubKeyGiven {
params.Dst = crypto.ToECDSAPub(req.PublicKey)
if !ValidatePublicKey(params.Dst) {
return nil, ErrInvalidPublicKey
}
}
// encrypt and sent message
whisperMsg, err := NewSentMessage(params)
if err != nil {
return nil, err
}
env, err := whisperMsg.Wrap(params)
if err != nil {
return nil, err
}
return env, nil
}
// UninstallFilter is alias for Unsubscribe
func (api *PublicWhisperAPI) UninstallFilter(id string) {
api.w.Unsubscribe(id)

View File

@ -0,0 +1,23 @@
package whisperv6
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/discover"
)
// EventType used to define known envelope events.
type EventType string
const (
// EventEnvelopeSent fires when envelope was sent to a peer.
EventEnvelopeSent EventType = "envelope.sent"
// EventEnvelopeExpired fires when envelop expired
EventEnvelopeExpired EventType = "envelope.expired"
)
// EnvelopeEvent used for envelopes events.
type EnvelopeEvent struct {
Event EventType
Hash common.Hash
Peer discover.NodeID
}

View File

@ -204,6 +204,11 @@ func (peer *Peer) broadcast() error {
// mark envelopes only if they were successfully sent
for _, e := range bundle {
peer.mark(e)
peer.host.envelopeFeed.Send(EnvelopeEvent{
Event: EventEnvelopeSent,
Hash: e.Hash(),
Peer: peer.peer.ID(), // specifically discover.NodeID because it can be pretty printed
})
}
log.Trace("broadcast", "num. messages", len(bundle))

View File

@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
@ -85,8 +86,10 @@ type Whisper struct {
statsMu sync.Mutex // guard stats
stats Statistics // Statistics of whisper node
mailServer MailServer // MailServer interface
envelopeTracer EnvelopeTracer // Service collecting envelopes metadata
mailServer MailServer // MailServer interface
envelopeTracer EnvelopeTracer // Service collecting envelopes metadata
envelopeFeed event.Feed
}
// New creates a Whisper client ready to communicate through the Ethereum P2P network.
@ -131,6 +134,12 @@ func New(cfg *Config) *Whisper {
return whisper
}
// SubscribeEnvelopeEvents subscribes to envelopes feed.
// In order to prevent blocking whisper producers events must be amply buffered.
func (whisper *Whisper) SubscribeEnvelopeEvents(events chan<- EnvelopeEvent) event.Subscription {
return whisper.envelopeFeed.Subscribe(events)
}
// MinPow returns the PoW value required by this node.
func (whisper *Whisper) MinPow() float64 {
val, exist := whisper.settings.Load(minPowIdx)
@ -986,6 +995,10 @@ func (whisper *Whisper) expire() {
hashSet.Each(func(v interface{}) bool {
sz := whisper.envelopes[v.(common.Hash)].size()
delete(whisper.envelopes, v.(common.Hash))
whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: v.(common.Hash),
Event: EventEnvelopeExpired,
})
whisper.stats.messagesCleared++
whisper.stats.memoryCleared += sz
whisper.stats.memoryUsed -= sz