From 76b5dc29dc576f32e48215d3c966634d2ec5c850 Mon Sep 17 00:00:00 2001 From: Adam Babik Date: Tue, 18 Feb 2020 12:21:01 +0100 Subject: [PATCH] Integrate Whisper-Waku bridge in status-go (#1854) --- api/geth_backend.go | 2 +- bridge/bridge.go | 95 +++++++++ bridge/bridge_test.go | 188 ++++++++++++++++++ bridge/doc.go | 6 + go.mod | 4 +- go.sum | 8 +- logutils/logger.go | 22 ++ node/get_status_node.go | 34 +++- node/geth_node.go | 21 +- node/geth_node_api_test.go | 65 ++++++ params/config.go | 8 + services/ext/service.go | 14 +- services/shhext/api_geth_test.go | 6 +- services/wakuext/api_test.go | 29 +-- .../status-im/status-go/waku/go.mod | 2 +- .../status-im/status-go/waku/go.sum | 5 + .../status-im/status-go/waku/handshake.go | 125 ++++++++++++ .../status-im/status-go/waku/peer.go | 97 +++++---- .../status-im/status-go/waku/waku.go | 69 ++++++- .../status-im/status-go/whisper/v6/whisper.go | 75 ++++++- vendor/modules.txt | 4 +- 21 files changed, 766 insertions(+), 113 deletions(-) create mode 100644 bridge/bridge.go create mode 100644 bridge/bridge_test.go create mode 100644 bridge/doc.go create mode 100644 vendor/github.com/status-im/status-go/waku/handshake.go diff --git a/api/geth_backend.go b/api/geth_backend.go index 10899dd7c..3b273fda6 100644 --- a/api/geth_backend.go +++ b/api/geth_backend.go @@ -907,7 +907,7 @@ func (b *GethStatusBackend) injectAccountIntoServices() error { return err } - if err := st.InitProtocol(identity, b.appDB); err != nil { + if err := st.InitProtocol(identity, b.appDB, logutils.ZapLogger()); err != nil { return err } return nil diff --git a/bridge/bridge.go b/bridge/bridge.go new file mode 100644 index 000000000..96641759d --- /dev/null +++ b/bridge/bridge.go @@ -0,0 +1,95 @@ +package bridge + +import ( + "sync" + "unsafe" + + "go.uber.org/zap" + + "github.com/status-im/status-go/waku" + "github.com/status-im/status-go/whisper/v6" +) + +type Bridge struct { + whisper *whisper.Whisper + waku *waku.Waku + logger *zap.Logger + + cancel chan struct{} + wg sync.WaitGroup + + whisperIn chan *whisper.Envelope + whisperOut chan *whisper.Envelope + wakuIn chan *waku.Envelope + wakuOut chan *waku.Envelope +} + +func New(shh *whisper.Whisper, w *waku.Waku, logger *zap.Logger) *Bridge { + return &Bridge{ + whisper: shh, + waku: w, + logger: logger, + whisperOut: make(chan *whisper.Envelope), + whisperIn: make(chan *whisper.Envelope), + wakuIn: make(chan *waku.Envelope), + wakuOut: make(chan *waku.Envelope), + } +} + +type bridgeWhisper struct { + *Bridge +} + +func (b *bridgeWhisper) Pipe() (<-chan *whisper.Envelope, chan<- *whisper.Envelope) { + return b.whisperOut, b.whisperIn +} + +type bridgeWaku struct { + *Bridge +} + +func (b *bridgeWaku) Pipe() (<-chan *waku.Envelope, chan<- *waku.Envelope) { + return b.wakuOut, b.wakuIn +} + +func (b *Bridge) Start() { + b.cancel = make(chan struct{}) + + b.waku.RegisterBridge(&bridgeWaku{Bridge: b}) + b.whisper.RegisterBridge(&bridgeWhisper{Bridge: b}) + + b.wg.Add(1) + go func() { + defer b.wg.Done() + for { + select { + case <-b.cancel: + return + case env := <-b.wakuIn: + shhEnvelope := (*whisper.Envelope)(unsafe.Pointer(env)) + b.logger.Info("received whisper envelope from waku", zap.Any("envelope", shhEnvelope)) + b.whisperOut <- shhEnvelope + } + } + }() + + b.wg.Add(1) + go func() { + defer b.wg.Done() + for { + select { + case <-b.cancel: + return + case env := <-b.whisperIn: + wakuEnvelope := (*waku.Envelope)(unsafe.Pointer(env)) + b.logger.Info("received whisper envelope from waku", zap.Any("envelope", wakuEnvelope)) + b.wakuOut <- wakuEnvelope + } + } + }() +} + +func (b *Bridge) Cancel() { + close(b.cancel) + b.wg.Wait() +} diff --git a/bridge/bridge_test.go b/bridge/bridge_test.go new file mode 100644 index 000000000..0c5ca3724 --- /dev/null +++ b/bridge/bridge_test.go @@ -0,0 +1,188 @@ +package bridge + +import ( + "math" + "testing" + "time" + "unsafe" + + "go.uber.org/zap" + + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/p2p" + + "github.com/status-im/status-go/waku" + "github.com/status-im/status-go/whisper/v6" +) + +func TestEnvelopesBeingIdentical(t *testing.T) { + // whisper.Envelope --> waku.Envelope + whisperEnvelope, err := createWhisperEnvelope() + require.NoError(t, err) + wakuEnvelope := (*waku.Envelope)(unsafe.Pointer(whisperEnvelope)) + require.Equal(t, whisperEnvelope.Hash(), wakuEnvelope.Hash()) + + // waku.Envelope --> whisper.Envelope + wakuEnvelope, err = createWakuEnvelope() + require.NoError(t, err) + whisperEnvelope = (*whisper.Envelope)(unsafe.Pointer(wakuEnvelope)) + require.Equal(t, wakuEnvelope.Hash(), whisperEnvelope.Hash()) +} + +func TestBridgeWhisperToWaku(t *testing.T) { + shh := whisper.New(nil) + shh.SetTimeSource(time.Now) + wak := waku.New(nil, nil) + wak.SetTimeSource(time.Now) + b := New(shh, wak, zap.NewNop()) + b.Start() + defer b.Cancel() + + server1 := createServer() + err := shh.Start(server1) + require.NoError(t, err) + server2 := createServer() + err = wak.Start(server2) + require.NoError(t, err) + + // Subscribe for envelope events in Waku. + eventsWaku := make(chan waku.EnvelopeEvent, 10) + sub1 := wak.SubscribeEnvelopeEvents(eventsWaku) + defer sub1.Unsubscribe() + + // Subscribe for envelope events in Whisper. + eventsWhsiper := make(chan whisper.EnvelopeEvent, 10) + sub2 := shh.SubscribeEnvelopeEvents(eventsWhsiper) + defer sub2.Unsubscribe() + + // Send message to Whisper and receive in Waku. + envelope, err := createWhisperEnvelope() + require.NoError(t, err) + err = shh.Send(envelope) + require.NoError(t, err) + <-eventsWhsiper // skip event resulting from calling Send() + + // Verify that the message was received by waku. + select { + case err := <-sub1.Err(): + require.NoError(t, err) + case event := <-eventsWaku: + require.Equal(t, envelope.Hash(), event.Hash) + case <-time.After(time.Second): + t.Fatal("timed out") + } + + // Verify that the message was NOT received by whisper. + select { + case err := <-sub1.Err(): + require.NoError(t, err) + case event := <-eventsWhsiper: + t.Fatalf("unexpected event: %v", event) + case <-time.After(time.Second): + // expect to time out; TODO: replace with a bridge event which should not be sent by Waku + } +} + +func TestBridgeWakuToWhisper(t *testing.T) { + shh := whisper.New(nil) + shh.SetTimeSource(time.Now) + wak := waku.New(nil, nil) + wak.SetTimeSource(time.Now) + b := New(shh, wak, zap.NewNop()) + b.Start() + defer b.Cancel() + + server1 := createServer() + err := shh.Start(server1) + require.NoError(t, err) + server2 := createServer() + err = wak.Start(server2) + require.NoError(t, err) + + // Subscribe for envelope events in Whisper. + eventsWhisper := make(chan whisper.EnvelopeEvent, 10) + sub1 := shh.SubscribeEnvelopeEvents(eventsWhisper) + defer sub1.Unsubscribe() + + // Subscribe for envelope events in Waku. + eventsWaku := make(chan waku.EnvelopeEvent, 10) + sub2 := wak.SubscribeEnvelopeEvents(eventsWaku) + defer sub2.Unsubscribe() + + // Send message to Waku and receive in Whisper. + envelope, err := createWakuEnvelope() + require.NoError(t, err) + err = wak.Send(envelope) + require.NoError(t, err) + <-eventsWaku // skip event resulting from calling Send() + + // Verify that the message was received by Whisper. + select { + case err := <-sub1.Err(): + require.NoError(t, err) + case event := <-eventsWhisper: + require.Equal(t, envelope.Hash(), event.Hash) + case <-time.After(time.Second): + t.Fatal("timed out") + } + + // Verify that the message was NOT received by Waku. + select { + case err := <-sub1.Err(): + require.NoError(t, err) + case event := <-eventsWaku: + t.Fatalf("unexpected event: %v", event) + case <-time.After(time.Second): + // expect to time out; TODO: replace with a bridge event which should not be sent by Waku + } +} + +func createServer() *p2p.Server { + return &p2p.Server{ + Config: p2p.Config{ + MaxPeers: math.MaxInt32, + NoDiscovery: true, + }, + } +} + +func createWhisperEnvelope() (*whisper.Envelope, error) { + messageParams := &whisper.MessageParams{ + TTL: 120, + KeySym: []byte{0xaa, 0xbb, 0xcc}, + Topic: whisper.BytesToTopic([]byte{0x01}), + WorkTime: 10, + PoW: 2.0, + Payload: []byte("hello!"), + } + sentMessage, err := whisper.NewSentMessage(messageParams) + if err != nil { + return nil, err + } + envelope := whisper.NewEnvelope(120, whisper.BytesToTopic([]byte{0x01}), sentMessage, time.Now()) + if err := envelope.Seal(messageParams); err != nil { + return nil, err + } + return envelope, nil +} + +func createWakuEnvelope() (*waku.Envelope, error) { + messageParams := &waku.MessageParams{ + TTL: 120, + KeySym: []byte{0xaa, 0xbb, 0xcc}, + Topic: waku.BytesToTopic([]byte{0x01}), + WorkTime: 10, + PoW: 2.0, + Payload: []byte("hello!"), + } + sentMessage, err := waku.NewSentMessage(messageParams) + if err != nil { + return nil, err + } + envelope := waku.NewEnvelope(120, waku.BytesToTopic([]byte{0x01}), sentMessage, time.Now()) + if err := envelope.Seal(messageParams); err != nil { + return nil, err + } + return envelope, nil +} diff --git a/bridge/doc.go b/bridge/doc.go new file mode 100644 index 000000000..4165357f9 --- /dev/null +++ b/bridge/doc.go @@ -0,0 +1,6 @@ +// Bridge bridges Whisper and Waku subprotocols. +// This is possible because both use the same envelope format. +// What's more, both envelope formats are identical structs, +// that is having the same ordered fields. + +package bridge diff --git a/go.mod b/go.mod index d55d954d5..8e1563a91 100644 --- a/go.mod +++ b/go.mod @@ -42,8 +42,8 @@ require ( github.com/status-im/migrate/v4 v4.6.2-status.2 github.com/status-im/rendezvous v1.3.0 github.com/status-im/status-go/extkeys v1.1.0 - github.com/status-im/status-go/waku v1.2.0 - github.com/status-im/status-go/whisper/v6 v6.1.0 + github.com/status-im/status-go/waku v1.3.0 + github.com/status-im/status-go/whisper/v6 v6.2.0 github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501 github.com/stretchr/testify v1.4.0 github.com/syndtr/goleveldb v1.0.0 diff --git a/go.sum b/go.sum index afa2c4e42..921e4afd5 100644 --- a/go.sum +++ b/go.sum @@ -642,10 +642,10 @@ github.com/status-im/rendezvous v1.3.0/go.mod h1:+hzjuP+j/XzLPeF6E50b88pWOTLdTcw github.com/status-im/status-go/extkeys v1.0.0/go.mod h1:GdqJbrcpkNm5ZsSCpp+PdMxnXx+OcRBdm3PI0rs1FpU= github.com/status-im/status-go/extkeys v1.1.0 h1:QgnXlMvhlFyRu+GdpPn1Ve22IidnDdslFB/Py6HWj78= github.com/status-im/status-go/extkeys v1.1.0/go.mod h1:nT/T2+G4L/6qPVIIfI3oT8dQSVyn7fQYY8G3yL3PIGY= -github.com/status-im/status-go/waku v1.2.0 h1:bhAm5XpvIT+oPHE8Yq6OWoAprTiERfGu1WrO/OR9crk= -github.com/status-im/status-go/waku v1.2.0/go.mod h1:1bjvQAL4cJYtxCsm6DnKdJbxcZwnvvZmxb6pmoUDtuY= -github.com/status-im/status-go/whisper/v6 v6.1.0 h1:jFGK8zr5bXaFTcyS/xIKh/5TlyqUks+5kyivDUii/1c= -github.com/status-im/status-go/whisper/v6 v6.1.0/go.mod h1:csqMoPMkCPW1NJO56HJzNTWAl9UMdetnQzkPbPjsAC4= +github.com/status-im/status-go/waku v1.3.0 h1:sULZzzz8fV3Ufn8HI5BmQaqWxyJiH8P/8Z9I920sGPk= +github.com/status-im/status-go/waku v1.3.0/go.mod h1:hmq99wlA8qKyYEYalqMz1FieIWhq7pl9zDlkw/jsd4M= +github.com/status-im/status-go/whisper/v6 v6.2.0 h1:7QB5Ztlcn7n5WO3gKa4KnIoCvnIa0rVMM810lHCK2ws= +github.com/status-im/status-go/whisper/v6 v6.2.0/go.mod h1:csqMoPMkCPW1NJO56HJzNTWAl9UMdetnQzkPbPjsAC4= github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501 h1:oa0KU5jJRNtXaM/P465MhvSFo/HM2O8qi2DDuPcd7ro= github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501/go.mod h1:RYo/itke1oU5k/6sj9DNM3QAwtE5rZSgg5JnkOv83hk= github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 h1:gIlAHnH1vJb5vwEjIp5kBj/eu99p/bl0Ay2goiPe5xE= diff --git a/logutils/logger.go b/logutils/logger.go index 9e01abbae..377c6413b 100644 --- a/logutils/logger.go +++ b/logutils/logger.go @@ -1,6 +1,10 @@ package logutils import ( + "sync" + + "go.uber.org/zap" + "github.com/ethereum/go-ethereum/log" ) @@ -8,3 +12,21 @@ import ( func Logger() log.Logger { return log.Root() } + +var ( + _zapLogger *zap.Logger + _initZapLogger sync.Once +) + +// ZapLogger creates a custom zap.Logger which will forward logs +// to status-go logger. +func ZapLogger() *zap.Logger { + _initZapLogger.Do(func() { + var err error + _zapLogger, err = NewZapLoggerWithAdapter(Logger()) + if err != nil { + panic(err) + } + }) + return _zapLogger +} diff --git a/node/get_status_node.go b/node/get_status_node.go index 512c658a5..42a77cf98 100644 --- a/node/get_status_node.go +++ b/node/get_status_node.go @@ -25,8 +25,10 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/status-im/status-go/bridge" "github.com/status-im/status-go/db" "github.com/status-im/status-go/discovery" + "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/params" "github.com/status-im/status-go/peers" "github.com/status-im/status-go/rpc" @@ -69,6 +71,8 @@ type StatusNode struct { peerPool *peers.PeerPool db *leveldb.DB // used as a cache for PeerPool + bridge *bridge.Bridge // Whisper-Waku bridge + log log.Logger } @@ -176,6 +180,10 @@ func (n *StatusNode) startWithDB(config *params.NodeConfig, accs *accounts.Manag return err } + if err := n.setupBridge(); err != nil { + return err + } + return nil } @@ -216,6 +224,25 @@ func (n *StatusNode) setupRPCClient() (err error) { return } +func (n *StatusNode) setupBridge() error { + if !n.config.BridgeConfig.Enabled { + return nil + } + var shh *whisper.Whisper + if err := n.gethService(&shh); err != nil { + return fmt.Errorf("setup bridge: failed to get Whisper: %v", err) + } + var wak *waku.Waku + if err := n.gethService(&wak); err != nil { + return fmt.Errorf("setup bridge: failed to get Waku: %v", err) + } + + n.bridge = bridge.New(shh, wak, logutils.ZapLogger()) + n.bridge.Start() + + return nil +} + func (n *StatusNode) discoveryEnabled() bool { return n.config != nil && (!n.config.NoDiscovery || n.config.Rendezvous) && n.config.ClusterConfig.Enabled } @@ -355,6 +382,11 @@ func (n *StatusNode) stop() error { n.discovery = nil } + if n.bridge != nil { + n.bridge.Cancel() + n.bridge = nil + } + if err := n.gethNode.Stop(); err != nil { return err } @@ -687,7 +719,7 @@ func (n *StatusNode) RPCPrivateClient() *rpc.Client { // ChaosModeCheckRPCClientsUpstreamURL updates RPCClient and RPCPrivateClient upstream URLs, // if defined, without restarting the node. This is required for the Chaos Unicorn Day. -// Additionally, if the passed URL is Infura, it changes it to httpbin.org/status/500. +// Additionally, if the passed URL is Infura, it changes it to httpstat.us/500. func (n *StatusNode) ChaosModeCheckRPCClientsUpstreamURL(on bool) error { url := n.config.UpstreamConfig.URL diff --git a/node/geth_node.go b/node/geth_node.go index 65bc59297..25604062f 100644 --- a/node/geth_node.go +++ b/node/geth_node.go @@ -29,6 +29,7 @@ import ( gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" "github.com/status-im/status-go/eth-node/crypto" + "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/mailserver" "github.com/status-im/status-go/params" "github.com/status-im/status-go/services/ext" @@ -87,14 +88,6 @@ func MakeNode(config *params.NodeConfig, accs *accounts.Manager, db *leveldb.DB) return nil, fmt.Errorf(ErrNodeMakeFailureFormat, err.Error()) } - if config.EnableNTPSync { - if err = stack.Register(func(*node.ServiceContext) (node.Service, error) { - return timesource.Default(), nil - }); err != nil { - return nil, fmt.Errorf("failed to register NTP time source: %v", err) - } - } - err = activateServices(stack, config, accs, db) if err != nil { return nil, err @@ -103,6 +96,15 @@ func MakeNode(config *params.NodeConfig, accs *accounts.Manager, db *leveldb.DB) } func activateServices(stack *node.Node, config *params.NodeConfig, accs *accounts.Manager, db *leveldb.DB) error { + if config.EnableNTPSync { + err := stack.Register(func(*node.ServiceContext) (node.Service, error) { + return timesource.Default(), nil + }) + if err != nil { + return fmt.Errorf("failed to register NTP time source: %v", err) + } + } + // start Ethereum service if we are not expected to use an upstream server if !config.UpstreamConfig.Enabled { if err := activateLightEthService(stack, accs, config); err != nil { @@ -465,8 +467,7 @@ func createWakuService(ctx *node.ServiceContext, wakuCfg *params.WakuConfig, clu cfg.MinimumAcceptedPoW = wakuCfg.MinimumPoW } - // TODO: provide a logger - w := waku.New(cfg, nil) + w := waku.New(cfg, logutils.ZapLogger()) if wakuCfg.EnableRateLimiter { r := wakuRateLimiter(wakuCfg, clusterCfg) diff --git a/node/geth_node_api_test.go b/node/geth_node_api_test.go index 6f809b668..7c6ec25ea 100644 --- a/node/geth_node_api_test.go +++ b/node/geth_node_api_test.go @@ -52,3 +52,68 @@ func TestWhisperLightModeEnabledSetsNilBloomFilter(t *testing.T) { require.NoError(t, node.gethService(&whisper)) require.Nil(t, whisper.BloomFilter()) } + +func TestBridgeSetup(t *testing.T) { + testCases := []struct { + Name string + Cfg params.NodeConfig + ErrorMessage string + }{ + { + Name: "no whisper and waku", + Cfg: params.NodeConfig{ + BridgeConfig: params.BridgeConfig{Enabled: true}, + }, + ErrorMessage: "setup bridge: failed to get Whisper: unknown service", + }, + { + Name: "only whisper", + Cfg: params.NodeConfig{ + WhisperConfig: params.WhisperConfig{ + Enabled: true, + LightClient: false, + }, + BridgeConfig: params.BridgeConfig{Enabled: true}, + }, + ErrorMessage: "setup bridge: failed to get Waku: unknown service", + }, + { + Name: "only waku", + Cfg: params.NodeConfig{ + WakuConfig: params.WakuConfig{ + Enabled: true, + LightClient: false, + }, + BridgeConfig: params.BridgeConfig{Enabled: true}, + }, + ErrorMessage: "setup bridge: failed to get Whisper: unknown service", + }, + { + Name: "both", + Cfg: params.NodeConfig{ + WhisperConfig: params.WhisperConfig{ + Enabled: true, + LightClient: false, + }, + WakuConfig: params.WakuConfig{ + Enabled: true, + LightClient: false, + }, + BridgeConfig: params.BridgeConfig{Enabled: true}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + node := New() + err := node.Start(&tc.Cfg, &accounts.Manager{}) + if err != nil { + require.EqualError(t, err, tc.ErrorMessage) + } else if tc.ErrorMessage != "" { + t.Fatalf("expected an error: %s", tc.ErrorMessage) + } + require.NoError(t, node.Stop()) + }) + } +} diff --git a/params/config.go b/params/config.go index 25f53a435..2be38471d 100644 --- a/params/config.go +++ b/params/config.go @@ -428,6 +428,9 @@ type NodeConfig struct { // WakuConfig provides a configuration for Waku subprotocol. WakuConfig WakuConfig `json:"WakuConfig" validate:"structonly"` + // BridgeConfig provides a configuration for Whisper-Waku bridge. + BridgeConfig BridgeConfig `json:"BridgeConfig" validate:"structonly"` + // IncentivisationConfig extra configuration for incentivisation service IncentivisationConfig IncentivisationConfig `json:"IncentivisationConfig," validate:"structonly"` @@ -482,6 +485,11 @@ type MailserversConfig struct { Enabled bool } +// BridgeConfig provides configuration for Whisper-Waku bridge. +type BridgeConfig struct { + Enabled bool +} + // ShhextConfig defines options used by shhext service. type ShhextConfig struct { PFSEnabled bool diff --git a/services/ext/service.go b/services/ext/service.go index 1344e1404..b691537d2 100644 --- a/services/ext/service.go +++ b/services/ext/service.go @@ -13,8 +13,6 @@ import ( "github.com/syndtr/goleveldb/leveldb" - "github.com/status-im/status-go/logutils" - commongethtypes "github.com/ethereum/go-ethereum/common" gethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" @@ -115,7 +113,7 @@ func (s *Service) GetPeer(rawURL string) (*enode.Node, error) { return enode.ParseV4(rawURL) } -func (s *Service) InitProtocol(identity *ecdsa.PrivateKey, db *sql.DB) error { +func (s *Service) InitProtocol(identity *ecdsa.PrivateKey, db *sql.DB, logger *zap.Logger) error { if !s.config.PFSEnabled { return nil } @@ -137,12 +135,6 @@ func (s *Service) InitProtocol(identity *ecdsa.PrivateKey, db *sql.DB) error { return err } - // Create a custom zap.Logger which will forward logs from status-go/protocol to status-go logger. - zapLogger, err := logutils.NewZapLoggerWithAdapter(logutils.Logger()) - if err != nil { - return err - } - envelopesMonitorConfig := &transport.EnvelopesMonitorConfig{ MaxAttempts: s.config.MaxMessageDeliveryAttempts, MailserverConfirmationsEnabled: s.config.MailServerConfirmations, @@ -150,9 +142,9 @@ func (s *Service) InitProtocol(identity *ecdsa.PrivateKey, db *sql.DB) error { return s.peerStore.Exist(peer) }, EnvelopeEventsHandler: EnvelopeSignalHandler{}, - Logger: zapLogger, + Logger: logger, } - options := buildMessengerOptions(s.config, db, envelopesMonitorConfig, zapLogger) + options := buildMessengerOptions(s.config, db, envelopesMonitorConfig, logger) messenger, err := protocol.NewMessenger( identity, diff --git a/services/shhext/api_geth_test.go b/services/shhext/api_geth_test.go index 350a3d38c..9c1dd9015 100644 --- a/services/shhext/api_geth_test.go +++ b/services/shhext/api_geth_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "go.uber.org/zap" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" @@ -210,7 +212,7 @@ func TestInitProtocol(t *testing.T) { sqlDB, err := sqlite.OpenDB(fmt.Sprintf("%s/db.sql", tmpdir), "password") require.NoError(t, err) - err = service.InitProtocol(privateKey, sqlDB) + err = service.InitProtocol(privateKey, sqlDB, zap.NewNop()) require.NoError(t, err) } @@ -264,7 +266,7 @@ func (s *ShhExtSuite) createAndAddNode() { s.Require().NoError(err) privateKey, err := crypto.GenerateKey() s.NoError(err) - err = service.InitProtocol(privateKey, sqlDB) + err = service.InitProtocol(privateKey, sqlDB, zap.NewNop()) s.NoError(err) err = stack.Register(func(n *node.ServiceContext) (node.Service, error) { return service, nil diff --git a/services/wakuext/api_test.go b/services/wakuext/api_test.go index 1831f3ce1..f7b06133e 100644 --- a/services/wakuext/api_test.go +++ b/services/wakuext/api_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "go.uber.org/zap" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/syndtr/goleveldb/leveldb" @@ -125,7 +127,7 @@ func TestInitProtocol(t *testing.T) { sqlDB, err := sqlite.OpenDB(fmt.Sprintf("%s/db.sql", tmpdir), "password") require.NoError(t, err) - err = service.InitProtocol(privateKey, sqlDB) + err = service.InitProtocol(privateKey, sqlDB, zap.NewNop()) require.NoError(t, err) } @@ -179,7 +181,7 @@ func (s *ShhExtSuite) createAndAddNode() { s.Require().NoError(err) privateKey, err := crypto.GenerateKey() s.NoError(err) - err = service.InitProtocol(privateKey, sqlDB) + err = service.InitProtocol(privateKey, sqlDB, zap.NewNop()) s.NoError(err) err = stack.Register(func(n *node.ServiceContext) (node.Service, error) { return service, nil @@ -301,30 +303,13 @@ func (s *WakuNodeMockSuite) SetupTest() { node := enode.NewV4(&pkey.PublicKey, net.ParseIP("127.0.0.1"), 1, 1) peer := p2p.NewPeer(node.ID(), "1", []p2p.Cap{{"shh", 6}}) rw1, rw2 := p2p.MsgPipe() - errorc := make(chan error, 1) go func() { err := w.HandlePeer(peer, rw2) - errorc <- err + panic(err) }() wakuWrapper := gethbridge.NewGethWakuWrapper(w) - s.Require().NoError(p2p.ExpectMsg(rw1, statusCode, []interface{}{ - waku.ProtocolVersion, - math.Float64bits(wakuWrapper.MinPow()), - wakuWrapper.BloomFilter(), - false, - true, - waku.RateLimits{}, - })) - s.Require().NoError(p2p.SendItems( - rw1, - statusCode, - waku.ProtocolVersion, - math.Float64bits(wakuWrapper.MinPow()), - wakuWrapper.BloomFilter(), - true, - true, - waku.RateLimits{}, - )) + s.Require().NoError(p2p.ExpectMsg(rw1, statusCode, nil)) + s.Require().NoError(p2p.SendItems(rw1, statusCode, waku.ProtocolVersion, []interface{}{})) nodeWrapper := ext.NewTestNodeWrapper(nil, wakuWrapper) s.localService = New( diff --git a/vendor/github.com/status-im/status-go/waku/go.mod b/vendor/github.com/status-im/status-go/waku/go.mod index dee45c4c4..b11bde74d 100644 --- a/vendor/github.com/status-im/status-go/waku/go.mod +++ b/vendor/github.com/status-im/status-go/waku/go.mod @@ -2,7 +2,7 @@ module github.com/status-im/status-go/waku go 1.13 -replace github.com/ethereum/go-ethereum v1.9.5 => github.com/status-im/go-ethereum v1.9.5-status.6 +replace github.com/ethereum/go-ethereum v1.9.5 => github.com/status-im/go-ethereum v1.9.5-status.7 require ( github.com/aristanetworks/goarista v0.0.0-20191106175434-873d404c7f40 // indirect diff --git a/vendor/github.com/status-im/status-go/waku/go.sum b/vendor/github.com/status-im/status-go/waku/go.sum index e1e50ef6b..d8b156dc4 100644 --- a/vendor/github.com/status-im/status-go/waku/go.sum +++ b/vendor/github.com/status-im/status-go/waku/go.sum @@ -27,6 +27,7 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/btcsuite/btcd v0.20.0-beta h1:DnZGUjFbRkpytojHWwy6nfUSA7vFrzWXDLpFNzt74ZA= github.com/btcsuite/btcd v0.20.0-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= +github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= @@ -191,6 +192,9 @@ github.com/status-im/go-ethereum v1.9.5-status.5 h1:d2RJC6ltNZJM2mrAW6kDWYdzewF8 github.com/status-im/go-ethereum v1.9.5-status.5/go.mod h1:g2+E89NWtyA+55p6XEl5Sdt7Mtez3V0T3+Y7mJNb+tI= github.com/status-im/go-ethereum v1.9.5-status.6 h1:ytuTO1yBIAuTVRtRQoc2mrdyngtP+XOQ9IHIibbz7/I= github.com/status-im/go-ethereum v1.9.5-status.6/go.mod h1:08JvQWE+IOnAFSe4UD4ACLNe2fDd9XmWMCq5Yzy9mk0= +github.com/status-im/go-ethereum v1.9.5-status.7 h1:DKH1GiF52LwaZaw6YDBliFEgm/JDsbIT+hn7ph6X94Q= +github.com/status-im/go-ethereum v1.9.5-status.7/go.mod h1:YyH5DKB6+z+Vaya7eIm67pnuPZ1oiUMbbsZW41ktN0g= +github.com/status-im/status-go/extkeys v1.0.0/go.mod h1:GdqJbrcpkNm5ZsSCpp+PdMxnXx+OcRBdm3PI0rs1FpU= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -226,6 +230,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191029031824-8986dd9e96cf/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20191119213627-4f8c1d86b1ba/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20191122220453-ac88ee75c92c h1:/nJuwDLoL/zrqY6gf57vxC+Pi+pZ8bfhpPkicO5H7W4= golang.org/x/crypto v0.0.0-20191122220453-ac88ee75c92c/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/vendor/github.com/status-im/status-go/waku/handshake.go b/vendor/github.com/status-im/status-go/waku/handshake.go new file mode 100644 index 000000000..99effbe37 --- /dev/null +++ b/vendor/github.com/status-im/status-go/waku/handshake.go @@ -0,0 +1,125 @@ +package waku + +import ( + "errors" + "fmt" + "io" + "math" + "reflect" + "strings" + + "github.com/ethereum/go-ethereum/rlp" +) + +// statusOptions defines additional information shared between peers +// during the handshake. +// There might be more options provided then fields in statusOptions +// and they should be ignored during deserialization to stay forward compatible. +// In the case of RLP, options should be serialized to an array of tuples +// where the first item is a field name and the second is a RLP-serialized value. +type statusOptions struct { + PoWRequirement uint64 `rlp:"key=0"` // RLP does not support float64 natively + BloomFilter []byte `rlp:"key=1"` + LightNodeEnabled bool `rlp:"key=2"` + ConfirmationsEnabled bool `rlp:"key=3"` + RateLimits RateLimits `rlp:"key=4"` + TopicInterest []TopicType `rlp:"key=5"` +} + +var idxFieldKey = make(map[int]string) +var keyFieldIdx = func() map[string]int { + result := make(map[string]int) + opts := statusOptions{} + v := reflect.ValueOf(opts) + for i := 0; i < v.NumField(); i++ { + // skip unexported fields + if !v.Field(i).CanInterface() { + continue + } + rlpTag := v.Type().Field(i).Tag.Get("rlp") + // skip fields without rlp field tag + if rlpTag == "" { + continue + } + key := strings.Split(rlpTag, "=")[1] + result[key] = i + idxFieldKey[i] = key + } + return result +}() + +func (o statusOptions) PoWRequirementF() float64 { + return math.Float64frombits(o.PoWRequirement) +} + +func (o *statusOptions) SetPoWRequirementFromF(val float64) { + o.PoWRequirement = math.Float64bits(val) +} + +func (o statusOptions) EncodeRLP(w io.Writer) error { + v := reflect.ValueOf(o) + optionsList := make([]interface{}, 0, v.NumField()) + for i := 0; i < v.NumField(); i++ { + value := v.Field(i).Interface() + key, ok := idxFieldKey[i] + if !ok { + continue + } + optionsList = append(optionsList, []interface{}{key, value}) + } + return rlp.Encode(w, optionsList) +} + +func (o *statusOptions) DecodeRLP(s *rlp.Stream) error { + _, err := s.List() + if err != nil { + return fmt.Errorf("expected an outer list: %w", err) + } + + v := reflect.ValueOf(o) + +loop: + for { + _, err := s.List() + switch err { + case nil: + // continue to decode a key + case rlp.EOL: + break loop + default: + return fmt.Errorf("expected an inner list: %w", err) + } + var key string + if err := s.Decode(&key); err != nil { + return fmt.Errorf("invalid key: %w", err) + } + // Skip processing if a key does not exist. + // It might happen when there is a new peer + // which supports a new option with + // a higher index. + idx, ok := keyFieldIdx[key] + if !ok { + // Read the rest of the list items and dump them. + _, err := s.Raw() + if err != nil { + return fmt.Errorf("failed to read the value of key %s: %w", key, err) + } + continue + } + if err := s.Decode(v.Elem().Field(idx).Addr().Interface()); err != nil { + return fmt.Errorf("failed to decode an option %s: %w", key, err) + } + if err := s.ListEnd(); err != nil { + return err + } + } + + return s.ListEnd() +} + +func (o statusOptions) Validate() error { + if len(o.TopicInterest) > 1000 { + return errors.New("topic interest is limited by 1000 items") + } + return nil +} diff --git a/vendor/github.com/status-im/status-go/waku/peer.go b/vendor/github.com/status-im/status-go/waku/peer.go index 92506173e..01d8edc78 100644 --- a/vendor/github.com/status-im/status-go/waku/peer.go +++ b/vendor/github.com/status-im/status-go/waku/peer.go @@ -96,13 +96,15 @@ func (p *Peer) handshake() error { isLightNode := p.host.LightClientMode() isRestrictedLightNodeConnection := p.host.LightClientModeConnectionRestricted() go func() { - pow := p.host.MinPow() - powConverted := math.Float64bits(pow) - bloom := p.host.BloomFilter() - confirmationsEnabled := p.host.ConfirmationsEnabled() - rateLimits := p.host.RateLimits() - - errc <- p2p.SendItems(p.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode, confirmationsEnabled, rateLimits) + opts := statusOptions{ + BloomFilter: p.host.BloomFilter(), + LightNodeEnabled: isLightNode, + ConfirmationsEnabled: p.host.ConfirmationsEnabled(), + RateLimits: p.host.RateLimits(), + TopicInterest: nil, + } + opts.SetPoWRequirementFromF(p.host.MinPow()) + errc <- p2p.SendItems(p.ws, statusCode, ProtocolVersion, opts) }() // Fetch the remote status packet and verify protocol match @@ -113,56 +115,51 @@ func (p *Peer) handshake() error { if packet.Code != statusCode { return fmt.Errorf("p [%x] sent packet %x before status packet", p.ID(), packet.Code) } + + var ( + peerProtocolVersion uint64 + peerOptions statusOptions + ) s := rlp.NewStream(packet.Payload, uint64(packet.Size)) - _, err = s.List() - if err != nil { - return fmt.Errorf("p [%x] sent bad status message: %v", p.ID(), err) + if _, err := s.List(); err != nil { + return fmt.Errorf("p [%x]: failed to decode status packet: %w", p.ID(), err) } - peerVersion, err := s.Uint() - if err != nil { - return fmt.Errorf("p [%x] sent bad status message (unable to decode version): %v", p.ID(), err) + // Validate protocol version. + if err := s.Decode(&peerProtocolVersion); err != nil { + return fmt.Errorf("p [%x]: failed to decode peer protocol version: %w", p.ID(), err) } - if peerVersion != ProtocolVersion { - return fmt.Errorf("p [%x]: protocol version mismatch %d != %d", p.ID(), peerVersion, ProtocolVersion) + if peerProtocolVersion != ProtocolVersion { + return fmt.Errorf("p [%x]: protocol version mismatch %d != %d", p.ID(), peerProtocolVersion, ProtocolVersion) } - - // only version is mandatory, subsequent parameters are optional - powRaw, err := s.Uint() - if err == nil { - pow := math.Float64frombits(powRaw) - if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 { - return fmt.Errorf("p [%x] sent bad status message: invalid pow", p.ID()) - } - p.powRequirement = pow - - var bloom []byte - err = s.Decode(&bloom) - if err == nil { - sz := len(bloom) - if sz != BloomFilterSize && sz != 0 { - return fmt.Errorf("p [%x] sent bad status message: wrong bloom filter size %d", p.ID(), sz) - } - p.setBloomFilter(bloom) - } + // Decode and validate other status packet options. + if err := s.Decode(&peerOptions); err != nil { + return fmt.Errorf("p [%x]: failed to decode status options: %w", p.ID(), err) } - - isRemotePeerLightNode, _ := s.Bool() - if isRemotePeerLightNode && isLightNode && isRestrictedLightNodeConnection { + if err := s.ListEnd(); err != nil { + return fmt.Errorf("p [%x]: failed to decode status packet: %w", p.ID(), err) + } + if err := peerOptions.Validate(); err != nil { + return fmt.Errorf("p [%x]: sent invalid options: %w", p.ID(), err) + } + // Validate and save peer's PoW. + pow := peerOptions.PoWRequirementF() + if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 { + return fmt.Errorf("p [%x]: sent bad status message: invalid pow", p.ID()) + } + p.powRequirement = pow + // Validate and save peer's bloom filters. + bloom := peerOptions.BloomFilter + bloomSize := len(bloom) + if bloomSize != 0 && bloomSize != BloomFilterSize { + return fmt.Errorf("p [%x] sent bad status message: wrong bloom filter size %d", p.ID(), bloomSize) + } + p.setBloomFilter(bloom) + // Validate and save other peer's options. + if peerOptions.LightNodeEnabled && isLightNode && isRestrictedLightNodeConnection { return fmt.Errorf("p [%x] is useless: two light client communication restricted", p.ID()) } - confirmationsEnabled, err := s.Bool() - if err != nil || !confirmationsEnabled { - p.logger.Info("confirmations are disabled for peer", zap.Binary("peer", p.ID())) - } else { - p.confirmationsEnabled = confirmationsEnabled - } - - var rateLimits RateLimits - if err := s.Decode(&rateLimits); err != nil { - p.logger.Info("rate limiting is disabled for peer", zap.Binary("peer", p.ID())) - } else { - p.setRateLimits(rateLimits) - } + p.confirmationsEnabled = peerOptions.ConfirmationsEnabled + p.setRateLimits(peerOptions.RateLimits) if err := <-errc; err != nil { return fmt.Errorf("p [%x] failed to send status packet: %v", p.ID(), err) diff --git a/vendor/github.com/status-im/status-go/waku/waku.go b/vendor/github.com/status-im/status-go/waku/waku.go index fe68e660d..ad06fc63f 100644 --- a/vendor/github.com/status-im/status-go/waku/waku.go +++ b/vendor/github.com/status-im/status-go/waku/waku.go @@ -51,6 +51,10 @@ import ( // TimeSyncError error for clock skew errors. type TimeSyncError error +type Bridge interface { + Pipe() (<-chan *Envelope, chan<- *Envelope) +} + type settings struct { MaxMsgSize uint32 // Maximal message length allowed by the waku node EnableConfirmations bool // Enable sending message confirmations @@ -95,6 +99,10 @@ type Waku struct { timeSource func() time.Time // source of time for waku + bridge Bridge + bridgeWg sync.WaitGroup + cancelBridge chan struct{} + logger *zap.Logger } @@ -343,6 +351,47 @@ func (w *Waku) RegisterRateLimiter(r *PeerRateLimiter) { w.rateLimiter = r } +// RegisterBridge registers a new Bridge that moves envelopes +// between different subprotocols. +// It's important that a bridge is registered before the service +// is started, otherwise, it won't read and propagate envelopes. +func (w *Waku) RegisterBridge(b Bridge) { + if w.cancelBridge != nil { + close(w.cancelBridge) + } + w.bridge = b + w.cancelBridge = make(chan struct{}) + w.bridgeWg.Add(1) + go w.readBridgeLoop() +} + +func (w *Waku) readBridgeLoop() { + defer w.bridgeWg.Done() + out, _ := w.bridge.Pipe() + for { + select { + case <-w.cancelBridge: + return + case env := <-out: + _, err := w.addAndBridge(env, false, true) + if err != nil { + w.logger.Warn( + "failed to add a bridged envelope", + zap.Binary("ID", env.Hash().Bytes()), + zap.Error(err), + ) + } else { + w.logger.Debug("bridged envelope successfully", zap.Binary("ID", env.Hash().Bytes())) + w.envelopeFeed.Send(EnvelopeEvent{ + Event: EventEnvelopeReceived, + Topic: env.Topic, + Hash: env.Hash(), + }) + } + } + } +} + // SubscribeEnvelopeEvents subscribes to envelopes feed. // In order to prevent blocking waku producers events must be amply buffered. func (w *Waku) SubscribeEnvelopeEvents(events chan<- EnvelopeEvent) event.Subscription { @@ -829,6 +878,11 @@ func (w *Waku) Start(*p2p.Server) error { // Stop implements node.Service, stopping the background data propagation thread // of the Waku protocol. func (w *Waku) Stop() error { + if w.cancelBridge != nil { + close(w.cancelBridge) + w.cancelBridge = nil + w.bridgeWg.Wait() + } close(w.quit) return nil } @@ -1145,11 +1199,15 @@ func (w *Waku) handleBatchAcknowledgeCode(p *Peer, packet p2p.Msg, logger *zap.L return nil } -// add inserts a new envelope into the message pool to be distributed within the +func (w *Waku) add(envelope *Envelope, isP2P bool) (bool, error) { + return w.addAndBridge(envelope, isP2P, false) +} + +// addAndBridge inserts a new envelope into the message pool to be distributed within the // waku network. It also inserts the envelope into the expiration pool at the // appropriate time-stamp. In case of error, connection should be dropped. // param isP2P indicates whether the message is peer-to-peer (should not be forwarded). -func (w *Waku) add(envelope *Envelope, isP2P bool) (bool, error) { +func (w *Waku) addAndBridge(envelope *Envelope, isP2P bool, bridged bool) (bool, error) { now := uint32(w.timeSource().Unix()) sent := envelope.Expiry - envelope.TTL @@ -1232,6 +1290,13 @@ func (w *Waku) add(envelope *Envelope, isP2P bool) (bool, error) { Event: EventMailServerEnvelopeArchived, }) } + // Bridge only envelopes that are not p2p messages. + // In particular, if a node is a lightweight node, + // it should not bridge any envelopes. + if !isP2P && !bridged && w.bridge != nil { + _, in := w.bridge.Pipe() + in <- envelope + } } return true, nil } diff --git a/vendor/github.com/status-im/status-go/whisper/v6/whisper.go b/vendor/github.com/status-im/status-go/whisper/v6/whisper.go index 693fede2d..f0c306fc6 100644 --- a/vendor/github.com/status-im/status-go/whisper/v6/whisper.go +++ b/vendor/github.com/status-im/status-go/whisper/v6/whisper.go @@ -43,6 +43,10 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) +type Bridge interface { + Pipe() (<-chan *Envelope, chan<- *Envelope) +} + // TimeSyncError error for clock skew errors. type TimeSyncError error @@ -113,6 +117,10 @@ type Whisper struct { envelopeFeed event.Feed timeSource func() time.Time // source of time for whisper + + bridge Bridge + bridgeWg sync.WaitGroup + cancelBridge chan struct{} } // New creates a Whisper client ready to communicate through the Ethereum P2P network. @@ -268,6 +276,51 @@ func (whisper *Whisper) RegisterMailServer(server MailServer) { whisper.mailServer = server } +// RegisterBridge registers a new Bridge that moves envelopes +// between different subprotocols. +// It's important that a bridge is registered before the service +// is started, otherwise, it won't read and propagate envelopes. +func (whisper *Whisper) RegisterBridge(b Bridge) { + if whisper.cancelBridge != nil { + close(whisper.cancelBridge) + whisper.bridgeWg.Wait() + } + whisper.bridge = b + whisper.cancelBridge = make(chan struct{}) + whisper.bridgeWg.Add(1) + go whisper.readBridgeLoop() +} + +func (whisper *Whisper) readBridgeLoop() { + defer whisper.bridgeWg.Done() + out, _ := whisper.bridge.Pipe() + for { + select { + case <-whisper.cancelBridge: + return + case env := <-out: + _, err := whisper.addAndBridge(env, false, true) + if err != nil { + log.Warn( + "failed to add a bridged envelope", + "ID", env.Hash().Bytes(), + "err", err, + ) + } else { + log.Debug( + "bridged envelope successfully", + "ID", env.Hash().Bytes(), + ) + whisper.envelopeFeed.Send(EnvelopeEvent{ + Event: EventEnvelopeReceived, + Topic: env.Topic, + Hash: env.Hash(), + }) + } + } + } +} + // Protocols returns the whisper sub-protocols ran by this particular client. func (whisper *Whisper) Protocols() []p2p.Protocol { return []p2p.Protocol{whisper.protocol} @@ -877,6 +930,11 @@ func (whisper *Whisper) Start(*p2p.Server) error { // Stop implements node.Service, stopping the background data propagation thread // of the Whisper protocol. func (whisper *Whisper) Stop() error { + if whisper.cancelBridge != nil { + close(whisper.cancelBridge) + whisper.cancelBridge = nil + whisper.bridgeWg.Wait() + } close(whisper.quit) log.Info("whisper stopped") return nil @@ -1202,18 +1260,14 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("failed to decode response message, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid request response message") } - event, err := CreateMailServerEvent(p.peer.ID(), payload) - if err != nil { log.Warn("error while parsing request complete code, peer will be disconnected", "peer", p.peer.ID(), "err", err) return err } - if event != nil { whisper.postP2P(*event) } - } default: // New message types might be implemented in the future versions of Whisper. @@ -1224,11 +1278,15 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { } } +func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) { + return whisper.addAndBridge(envelope, isP2P, false) +} + // add inserts a new envelope into the message pool to be distributed within the // whisper network. It also inserts the envelope into the expiration pool at the // appropriate time-stamp. In case of error, connection should be dropped. // param isP2P indicates whether the message is peer-to-peer (should not be forwarded). -func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) { +func (whisper *Whisper) addAndBridge(envelope *Envelope, isP2P bool, bridged bool) (bool, error) { now := uint32(whisper.timeSource().Unix()) sent := envelope.Expiry - envelope.TTL @@ -1313,6 +1371,13 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) { Event: EventMailServerEnvelopeArchived, }) } + // Bridge only envelopes that are not p2p messages. + // In particular, if a node is a lightweight node, + // it should not bridge any envelopes. + if !isP2P && !bridged && whisper.bridge != nil { + _, in := whisper.bridge.Pipe() + in <- envelope + } } return true, nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index ebcd1fcc0..0df9f6734 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -374,9 +374,9 @@ github.com/status-im/rendezvous/protocol github.com/status-im/rendezvous/server # github.com/status-im/status-go/extkeys v1.1.0 github.com/status-im/status-go/extkeys -# github.com/status-im/status-go/waku v1.2.0 +# github.com/status-im/status-go/waku v1.3.0 github.com/status-im/status-go/waku -# github.com/status-im/status-go/whisper/v6 v6.1.0 +# github.com/status-im/status-go/whisper/v6 v6.2.0 github.com/status-im/status-go/whisper/v6 # github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501 github.com/status-im/tcp-shaker