Integrate Whisper-Waku bridge in status-go (#1854)

This commit is contained in:
Adam Babik 2020-02-18 12:21:01 +01:00 committed by GitHub
parent 491e3be799
commit 76b5dc29dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 766 additions and 113 deletions

View File

@ -907,7 +907,7 @@ func (b *GethStatusBackend) injectAccountIntoServices() error {
return err
}
if err := st.InitProtocol(identity, b.appDB); err != nil {
if err := st.InitProtocol(identity, b.appDB, logutils.ZapLogger()); err != nil {
return err
}
return nil

95
bridge/bridge.go Normal file
View File

@ -0,0 +1,95 @@
package bridge
import (
"sync"
"unsafe"
"go.uber.org/zap"
"github.com/status-im/status-go/waku"
"github.com/status-im/status-go/whisper/v6"
)
type Bridge struct {
whisper *whisper.Whisper
waku *waku.Waku
logger *zap.Logger
cancel chan struct{}
wg sync.WaitGroup
whisperIn chan *whisper.Envelope
whisperOut chan *whisper.Envelope
wakuIn chan *waku.Envelope
wakuOut chan *waku.Envelope
}
func New(shh *whisper.Whisper, w *waku.Waku, logger *zap.Logger) *Bridge {
return &Bridge{
whisper: shh,
waku: w,
logger: logger,
whisperOut: make(chan *whisper.Envelope),
whisperIn: make(chan *whisper.Envelope),
wakuIn: make(chan *waku.Envelope),
wakuOut: make(chan *waku.Envelope),
}
}
type bridgeWhisper struct {
*Bridge
}
func (b *bridgeWhisper) Pipe() (<-chan *whisper.Envelope, chan<- *whisper.Envelope) {
return b.whisperOut, b.whisperIn
}
type bridgeWaku struct {
*Bridge
}
func (b *bridgeWaku) Pipe() (<-chan *waku.Envelope, chan<- *waku.Envelope) {
return b.wakuOut, b.wakuIn
}
func (b *Bridge) Start() {
b.cancel = make(chan struct{})
b.waku.RegisterBridge(&bridgeWaku{Bridge: b})
b.whisper.RegisterBridge(&bridgeWhisper{Bridge: b})
b.wg.Add(1)
go func() {
defer b.wg.Done()
for {
select {
case <-b.cancel:
return
case env := <-b.wakuIn:
shhEnvelope := (*whisper.Envelope)(unsafe.Pointer(env))
b.logger.Info("received whisper envelope from waku", zap.Any("envelope", shhEnvelope))
b.whisperOut <- shhEnvelope
}
}
}()
b.wg.Add(1)
go func() {
defer b.wg.Done()
for {
select {
case <-b.cancel:
return
case env := <-b.whisperIn:
wakuEnvelope := (*waku.Envelope)(unsafe.Pointer(env))
b.logger.Info("received whisper envelope from waku", zap.Any("envelope", wakuEnvelope))
b.wakuOut <- wakuEnvelope
}
}
}()
}
func (b *Bridge) Cancel() {
close(b.cancel)
b.wg.Wait()
}

188
bridge/bridge_test.go Normal file
View File

@ -0,0 +1,188 @@
package bridge
import (
"math"
"testing"
"time"
"unsafe"
"go.uber.org/zap"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/p2p"
"github.com/status-im/status-go/waku"
"github.com/status-im/status-go/whisper/v6"
)
func TestEnvelopesBeingIdentical(t *testing.T) {
// whisper.Envelope --> waku.Envelope
whisperEnvelope, err := createWhisperEnvelope()
require.NoError(t, err)
wakuEnvelope := (*waku.Envelope)(unsafe.Pointer(whisperEnvelope))
require.Equal(t, whisperEnvelope.Hash(), wakuEnvelope.Hash())
// waku.Envelope --> whisper.Envelope
wakuEnvelope, err = createWakuEnvelope()
require.NoError(t, err)
whisperEnvelope = (*whisper.Envelope)(unsafe.Pointer(wakuEnvelope))
require.Equal(t, wakuEnvelope.Hash(), whisperEnvelope.Hash())
}
func TestBridgeWhisperToWaku(t *testing.T) {
shh := whisper.New(nil)
shh.SetTimeSource(time.Now)
wak := waku.New(nil, nil)
wak.SetTimeSource(time.Now)
b := New(shh, wak, zap.NewNop())
b.Start()
defer b.Cancel()
server1 := createServer()
err := shh.Start(server1)
require.NoError(t, err)
server2 := createServer()
err = wak.Start(server2)
require.NoError(t, err)
// Subscribe for envelope events in Waku.
eventsWaku := make(chan waku.EnvelopeEvent, 10)
sub1 := wak.SubscribeEnvelopeEvents(eventsWaku)
defer sub1.Unsubscribe()
// Subscribe for envelope events in Whisper.
eventsWhsiper := make(chan whisper.EnvelopeEvent, 10)
sub2 := shh.SubscribeEnvelopeEvents(eventsWhsiper)
defer sub2.Unsubscribe()
// Send message to Whisper and receive in Waku.
envelope, err := createWhisperEnvelope()
require.NoError(t, err)
err = shh.Send(envelope)
require.NoError(t, err)
<-eventsWhsiper // skip event resulting from calling Send()
// Verify that the message was received by waku.
select {
case err := <-sub1.Err():
require.NoError(t, err)
case event := <-eventsWaku:
require.Equal(t, envelope.Hash(), event.Hash)
case <-time.After(time.Second):
t.Fatal("timed out")
}
// Verify that the message was NOT received by whisper.
select {
case err := <-sub1.Err():
require.NoError(t, err)
case event := <-eventsWhsiper:
t.Fatalf("unexpected event: %v", event)
case <-time.After(time.Second):
// expect to time out; TODO: replace with a bridge event which should not be sent by Waku
}
}
func TestBridgeWakuToWhisper(t *testing.T) {
shh := whisper.New(nil)
shh.SetTimeSource(time.Now)
wak := waku.New(nil, nil)
wak.SetTimeSource(time.Now)
b := New(shh, wak, zap.NewNop())
b.Start()
defer b.Cancel()
server1 := createServer()
err := shh.Start(server1)
require.NoError(t, err)
server2 := createServer()
err = wak.Start(server2)
require.NoError(t, err)
// Subscribe for envelope events in Whisper.
eventsWhisper := make(chan whisper.EnvelopeEvent, 10)
sub1 := shh.SubscribeEnvelopeEvents(eventsWhisper)
defer sub1.Unsubscribe()
// Subscribe for envelope events in Waku.
eventsWaku := make(chan waku.EnvelopeEvent, 10)
sub2 := wak.SubscribeEnvelopeEvents(eventsWaku)
defer sub2.Unsubscribe()
// Send message to Waku and receive in Whisper.
envelope, err := createWakuEnvelope()
require.NoError(t, err)
err = wak.Send(envelope)
require.NoError(t, err)
<-eventsWaku // skip event resulting from calling Send()
// Verify that the message was received by Whisper.
select {
case err := <-sub1.Err():
require.NoError(t, err)
case event := <-eventsWhisper:
require.Equal(t, envelope.Hash(), event.Hash)
case <-time.After(time.Second):
t.Fatal("timed out")
}
// Verify that the message was NOT received by Waku.
select {
case err := <-sub1.Err():
require.NoError(t, err)
case event := <-eventsWaku:
t.Fatalf("unexpected event: %v", event)
case <-time.After(time.Second):
// expect to time out; TODO: replace with a bridge event which should not be sent by Waku
}
}
func createServer() *p2p.Server {
return &p2p.Server{
Config: p2p.Config{
MaxPeers: math.MaxInt32,
NoDiscovery: true,
},
}
}
func createWhisperEnvelope() (*whisper.Envelope, error) {
messageParams := &whisper.MessageParams{
TTL: 120,
KeySym: []byte{0xaa, 0xbb, 0xcc},
Topic: whisper.BytesToTopic([]byte{0x01}),
WorkTime: 10,
PoW: 2.0,
Payload: []byte("hello!"),
}
sentMessage, err := whisper.NewSentMessage(messageParams)
if err != nil {
return nil, err
}
envelope := whisper.NewEnvelope(120, whisper.BytesToTopic([]byte{0x01}), sentMessage, time.Now())
if err := envelope.Seal(messageParams); err != nil {
return nil, err
}
return envelope, nil
}
func createWakuEnvelope() (*waku.Envelope, error) {
messageParams := &waku.MessageParams{
TTL: 120,
KeySym: []byte{0xaa, 0xbb, 0xcc},
Topic: waku.BytesToTopic([]byte{0x01}),
WorkTime: 10,
PoW: 2.0,
Payload: []byte("hello!"),
}
sentMessage, err := waku.NewSentMessage(messageParams)
if err != nil {
return nil, err
}
envelope := waku.NewEnvelope(120, waku.BytesToTopic([]byte{0x01}), sentMessage, time.Now())
if err := envelope.Seal(messageParams); err != nil {
return nil, err
}
return envelope, nil
}

6
bridge/doc.go Normal file
View File

@ -0,0 +1,6 @@
// Bridge bridges Whisper and Waku subprotocols.
// This is possible because both use the same envelope format.
// What's more, both envelope formats are identical structs,
// that is having the same ordered fields.
package bridge

4
go.mod
View File

@ -42,8 +42,8 @@ require (
github.com/status-im/migrate/v4 v4.6.2-status.2
github.com/status-im/rendezvous v1.3.0
github.com/status-im/status-go/extkeys v1.1.0
github.com/status-im/status-go/waku v1.2.0
github.com/status-im/status-go/whisper/v6 v6.1.0
github.com/status-im/status-go/waku v1.3.0
github.com/status-im/status-go/whisper/v6 v6.2.0
github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501
github.com/stretchr/testify v1.4.0
github.com/syndtr/goleveldb v1.0.0

8
go.sum
View File

@ -642,10 +642,10 @@ github.com/status-im/rendezvous v1.3.0/go.mod h1:+hzjuP+j/XzLPeF6E50b88pWOTLdTcw
github.com/status-im/status-go/extkeys v1.0.0/go.mod h1:GdqJbrcpkNm5ZsSCpp+PdMxnXx+OcRBdm3PI0rs1FpU=
github.com/status-im/status-go/extkeys v1.1.0 h1:QgnXlMvhlFyRu+GdpPn1Ve22IidnDdslFB/Py6HWj78=
github.com/status-im/status-go/extkeys v1.1.0/go.mod h1:nT/T2+G4L/6qPVIIfI3oT8dQSVyn7fQYY8G3yL3PIGY=
github.com/status-im/status-go/waku v1.2.0 h1:bhAm5XpvIT+oPHE8Yq6OWoAprTiERfGu1WrO/OR9crk=
github.com/status-im/status-go/waku v1.2.0/go.mod h1:1bjvQAL4cJYtxCsm6DnKdJbxcZwnvvZmxb6pmoUDtuY=
github.com/status-im/status-go/whisper/v6 v6.1.0 h1:jFGK8zr5bXaFTcyS/xIKh/5TlyqUks+5kyivDUii/1c=
github.com/status-im/status-go/whisper/v6 v6.1.0/go.mod h1:csqMoPMkCPW1NJO56HJzNTWAl9UMdetnQzkPbPjsAC4=
github.com/status-im/status-go/waku v1.3.0 h1:sULZzzz8fV3Ufn8HI5BmQaqWxyJiH8P/8Z9I920sGPk=
github.com/status-im/status-go/waku v1.3.0/go.mod h1:hmq99wlA8qKyYEYalqMz1FieIWhq7pl9zDlkw/jsd4M=
github.com/status-im/status-go/whisper/v6 v6.2.0 h1:7QB5Ztlcn7n5WO3gKa4KnIoCvnIa0rVMM810lHCK2ws=
github.com/status-im/status-go/whisper/v6 v6.2.0/go.mod h1:csqMoPMkCPW1NJO56HJzNTWAl9UMdetnQzkPbPjsAC4=
github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501 h1:oa0KU5jJRNtXaM/P465MhvSFo/HM2O8qi2DDuPcd7ro=
github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501/go.mod h1:RYo/itke1oU5k/6sj9DNM3QAwtE5rZSgg5JnkOv83hk=
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 h1:gIlAHnH1vJb5vwEjIp5kBj/eu99p/bl0Ay2goiPe5xE=

View File

@ -1,6 +1,10 @@
package logutils
import (
"sync"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/log"
)
@ -8,3 +12,21 @@ import (
func Logger() log.Logger {
return log.Root()
}
var (
_zapLogger *zap.Logger
_initZapLogger sync.Once
)
// ZapLogger creates a custom zap.Logger which will forward logs
// to status-go logger.
func ZapLogger() *zap.Logger {
_initZapLogger.Do(func() {
var err error
_zapLogger, err = NewZapLoggerWithAdapter(Logger())
if err != nil {
panic(err)
}
})
return _zapLogger
}

View File

@ -25,8 +25,10 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/status-im/status-go/bridge"
"github.com/status-im/status-go/db"
"github.com/status-im/status-go/discovery"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/peers"
"github.com/status-im/status-go/rpc"
@ -69,6 +71,8 @@ type StatusNode struct {
peerPool *peers.PeerPool
db *leveldb.DB // used as a cache for PeerPool
bridge *bridge.Bridge // Whisper-Waku bridge
log log.Logger
}
@ -176,6 +180,10 @@ func (n *StatusNode) startWithDB(config *params.NodeConfig, accs *accounts.Manag
return err
}
if err := n.setupBridge(); err != nil {
return err
}
return nil
}
@ -216,6 +224,25 @@ func (n *StatusNode) setupRPCClient() (err error) {
return
}
func (n *StatusNode) setupBridge() error {
if !n.config.BridgeConfig.Enabled {
return nil
}
var shh *whisper.Whisper
if err := n.gethService(&shh); err != nil {
return fmt.Errorf("setup bridge: failed to get Whisper: %v", err)
}
var wak *waku.Waku
if err := n.gethService(&wak); err != nil {
return fmt.Errorf("setup bridge: failed to get Waku: %v", err)
}
n.bridge = bridge.New(shh, wak, logutils.ZapLogger())
n.bridge.Start()
return nil
}
func (n *StatusNode) discoveryEnabled() bool {
return n.config != nil && (!n.config.NoDiscovery || n.config.Rendezvous) && n.config.ClusterConfig.Enabled
}
@ -355,6 +382,11 @@ func (n *StatusNode) stop() error {
n.discovery = nil
}
if n.bridge != nil {
n.bridge.Cancel()
n.bridge = nil
}
if err := n.gethNode.Stop(); err != nil {
return err
}
@ -687,7 +719,7 @@ func (n *StatusNode) RPCPrivateClient() *rpc.Client {
// ChaosModeCheckRPCClientsUpstreamURL updates RPCClient and RPCPrivateClient upstream URLs,
// if defined, without restarting the node. This is required for the Chaos Unicorn Day.
// Additionally, if the passed URL is Infura, it changes it to httpbin.org/status/500.
// Additionally, if the passed URL is Infura, it changes it to httpstat.us/500.
func (n *StatusNode) ChaosModeCheckRPCClientsUpstreamURL(on bool) error {
url := n.config.UpstreamConfig.URL

View File

@ -29,6 +29,7 @@ import (
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/mailserver"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/ext"
@ -87,14 +88,6 @@ func MakeNode(config *params.NodeConfig, accs *accounts.Manager, db *leveldb.DB)
return nil, fmt.Errorf(ErrNodeMakeFailureFormat, err.Error())
}
if config.EnableNTPSync {
if err = stack.Register(func(*node.ServiceContext) (node.Service, error) {
return timesource.Default(), nil
}); err != nil {
return nil, fmt.Errorf("failed to register NTP time source: %v", err)
}
}
err = activateServices(stack, config, accs, db)
if err != nil {
return nil, err
@ -103,6 +96,15 @@ func MakeNode(config *params.NodeConfig, accs *accounts.Manager, db *leveldb.DB)
}
func activateServices(stack *node.Node, config *params.NodeConfig, accs *accounts.Manager, db *leveldb.DB) error {
if config.EnableNTPSync {
err := stack.Register(func(*node.ServiceContext) (node.Service, error) {
return timesource.Default(), nil
})
if err != nil {
return fmt.Errorf("failed to register NTP time source: %v", err)
}
}
// start Ethereum service if we are not expected to use an upstream server
if !config.UpstreamConfig.Enabled {
if err := activateLightEthService(stack, accs, config); err != nil {
@ -465,8 +467,7 @@ func createWakuService(ctx *node.ServiceContext, wakuCfg *params.WakuConfig, clu
cfg.MinimumAcceptedPoW = wakuCfg.MinimumPoW
}
// TODO: provide a logger
w := waku.New(cfg, nil)
w := waku.New(cfg, logutils.ZapLogger())
if wakuCfg.EnableRateLimiter {
r := wakuRateLimiter(wakuCfg, clusterCfg)

View File

@ -52,3 +52,68 @@ func TestWhisperLightModeEnabledSetsNilBloomFilter(t *testing.T) {
require.NoError(t, node.gethService(&whisper))
require.Nil(t, whisper.BloomFilter())
}
func TestBridgeSetup(t *testing.T) {
testCases := []struct {
Name string
Cfg params.NodeConfig
ErrorMessage string
}{
{
Name: "no whisper and waku",
Cfg: params.NodeConfig{
BridgeConfig: params.BridgeConfig{Enabled: true},
},
ErrorMessage: "setup bridge: failed to get Whisper: unknown service",
},
{
Name: "only whisper",
Cfg: params.NodeConfig{
WhisperConfig: params.WhisperConfig{
Enabled: true,
LightClient: false,
},
BridgeConfig: params.BridgeConfig{Enabled: true},
},
ErrorMessage: "setup bridge: failed to get Waku: unknown service",
},
{
Name: "only waku",
Cfg: params.NodeConfig{
WakuConfig: params.WakuConfig{
Enabled: true,
LightClient: false,
},
BridgeConfig: params.BridgeConfig{Enabled: true},
},
ErrorMessage: "setup bridge: failed to get Whisper: unknown service",
},
{
Name: "both",
Cfg: params.NodeConfig{
WhisperConfig: params.WhisperConfig{
Enabled: true,
LightClient: false,
},
WakuConfig: params.WakuConfig{
Enabled: true,
LightClient: false,
},
BridgeConfig: params.BridgeConfig{Enabled: true},
},
},
}
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
node := New()
err := node.Start(&tc.Cfg, &accounts.Manager{})
if err != nil {
require.EqualError(t, err, tc.ErrorMessage)
} else if tc.ErrorMessage != "" {
t.Fatalf("expected an error: %s", tc.ErrorMessage)
}
require.NoError(t, node.Stop())
})
}
}

View File

@ -428,6 +428,9 @@ type NodeConfig struct {
// WakuConfig provides a configuration for Waku subprotocol.
WakuConfig WakuConfig `json:"WakuConfig" validate:"structonly"`
// BridgeConfig provides a configuration for Whisper-Waku bridge.
BridgeConfig BridgeConfig `json:"BridgeConfig" validate:"structonly"`
// IncentivisationConfig extra configuration for incentivisation service
IncentivisationConfig IncentivisationConfig `json:"IncentivisationConfig," validate:"structonly"`
@ -482,6 +485,11 @@ type MailserversConfig struct {
Enabled bool
}
// BridgeConfig provides configuration for Whisper-Waku bridge.
type BridgeConfig struct {
Enabled bool
}
// ShhextConfig defines options used by shhext service.
type ShhextConfig struct {
PFSEnabled bool

View File

@ -13,8 +13,6 @@ import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/status-im/status-go/logutils"
commongethtypes "github.com/ethereum/go-ethereum/common"
gethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
@ -115,7 +113,7 @@ func (s *Service) GetPeer(rawURL string) (*enode.Node, error) {
return enode.ParseV4(rawURL)
}
func (s *Service) InitProtocol(identity *ecdsa.PrivateKey, db *sql.DB) error {
func (s *Service) InitProtocol(identity *ecdsa.PrivateKey, db *sql.DB, logger *zap.Logger) error {
if !s.config.PFSEnabled {
return nil
}
@ -137,12 +135,6 @@ func (s *Service) InitProtocol(identity *ecdsa.PrivateKey, db *sql.DB) error {
return err
}
// Create a custom zap.Logger which will forward logs from status-go/protocol to status-go logger.
zapLogger, err := logutils.NewZapLoggerWithAdapter(logutils.Logger())
if err != nil {
return err
}
envelopesMonitorConfig := &transport.EnvelopesMonitorConfig{
MaxAttempts: s.config.MaxMessageDeliveryAttempts,
MailserverConfirmationsEnabled: s.config.MailServerConfirmations,
@ -150,9 +142,9 @@ func (s *Service) InitProtocol(identity *ecdsa.PrivateKey, db *sql.DB) error {
return s.peerStore.Exist(peer)
},
EnvelopeEventsHandler: EnvelopeSignalHandler{},
Logger: zapLogger,
Logger: logger,
}
options := buildMessengerOptions(s.config, db, envelopesMonitorConfig, zapLogger)
options := buildMessengerOptions(s.config, db, envelopesMonitorConfig, logger)
messenger, err := protocol.NewMessenger(
identity,

View File

@ -12,6 +12,8 @@ import (
"testing"
"time"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
@ -210,7 +212,7 @@ func TestInitProtocol(t *testing.T) {
sqlDB, err := sqlite.OpenDB(fmt.Sprintf("%s/db.sql", tmpdir), "password")
require.NoError(t, err)
err = service.InitProtocol(privateKey, sqlDB)
err = service.InitProtocol(privateKey, sqlDB, zap.NewNop())
require.NoError(t, err)
}
@ -264,7 +266,7 @@ func (s *ShhExtSuite) createAndAddNode() {
s.Require().NoError(err)
privateKey, err := crypto.GenerateKey()
s.NoError(err)
err = service.InitProtocol(privateKey, sqlDB)
err = service.InitProtocol(privateKey, sqlDB, zap.NewNop())
s.NoError(err)
err = stack.Register(func(n *node.ServiceContext) (node.Service, error) {
return service, nil

View File

@ -12,6 +12,8 @@ import (
"testing"
"time"
"go.uber.org/zap"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/syndtr/goleveldb/leveldb"
@ -125,7 +127,7 @@ func TestInitProtocol(t *testing.T) {
sqlDB, err := sqlite.OpenDB(fmt.Sprintf("%s/db.sql", tmpdir), "password")
require.NoError(t, err)
err = service.InitProtocol(privateKey, sqlDB)
err = service.InitProtocol(privateKey, sqlDB, zap.NewNop())
require.NoError(t, err)
}
@ -179,7 +181,7 @@ func (s *ShhExtSuite) createAndAddNode() {
s.Require().NoError(err)
privateKey, err := crypto.GenerateKey()
s.NoError(err)
err = service.InitProtocol(privateKey, sqlDB)
err = service.InitProtocol(privateKey, sqlDB, zap.NewNop())
s.NoError(err)
err = stack.Register(func(n *node.ServiceContext) (node.Service, error) {
return service, nil
@ -301,30 +303,13 @@ func (s *WakuNodeMockSuite) SetupTest() {
node := enode.NewV4(&pkey.PublicKey, net.ParseIP("127.0.0.1"), 1, 1)
peer := p2p.NewPeer(node.ID(), "1", []p2p.Cap{{"shh", 6}})
rw1, rw2 := p2p.MsgPipe()
errorc := make(chan error, 1)
go func() {
err := w.HandlePeer(peer, rw2)
errorc <- err
panic(err)
}()
wakuWrapper := gethbridge.NewGethWakuWrapper(w)
s.Require().NoError(p2p.ExpectMsg(rw1, statusCode, []interface{}{
waku.ProtocolVersion,
math.Float64bits(wakuWrapper.MinPow()),
wakuWrapper.BloomFilter(),
false,
true,
waku.RateLimits{},
}))
s.Require().NoError(p2p.SendItems(
rw1,
statusCode,
waku.ProtocolVersion,
math.Float64bits(wakuWrapper.MinPow()),
wakuWrapper.BloomFilter(),
true,
true,
waku.RateLimits{},
))
s.Require().NoError(p2p.ExpectMsg(rw1, statusCode, nil))
s.Require().NoError(p2p.SendItems(rw1, statusCode, waku.ProtocolVersion, []interface{}{}))
nodeWrapper := ext.NewTestNodeWrapper(nil, wakuWrapper)
s.localService = New(

View File

@ -2,7 +2,7 @@ module github.com/status-im/status-go/waku
go 1.13
replace github.com/ethereum/go-ethereum v1.9.5 => github.com/status-im/go-ethereum v1.9.5-status.6
replace github.com/ethereum/go-ethereum v1.9.5 => github.com/status-im/go-ethereum v1.9.5-status.7
require (
github.com/aristanetworks/goarista v0.0.0-20191106175434-873d404c7f40 // indirect

View File

@ -27,6 +27,7 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/btcsuite/btcd v0.20.0-beta h1:DnZGUjFbRkpytojHWwy6nfUSA7vFrzWXDLpFNzt74ZA=
github.com/btcsuite/btcd v0.20.0-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
@ -191,6 +192,9 @@ github.com/status-im/go-ethereum v1.9.5-status.5 h1:d2RJC6ltNZJM2mrAW6kDWYdzewF8
github.com/status-im/go-ethereum v1.9.5-status.5/go.mod h1:g2+E89NWtyA+55p6XEl5Sdt7Mtez3V0T3+Y7mJNb+tI=
github.com/status-im/go-ethereum v1.9.5-status.6 h1:ytuTO1yBIAuTVRtRQoc2mrdyngtP+XOQ9IHIibbz7/I=
github.com/status-im/go-ethereum v1.9.5-status.6/go.mod h1:08JvQWE+IOnAFSe4UD4ACLNe2fDd9XmWMCq5Yzy9mk0=
github.com/status-im/go-ethereum v1.9.5-status.7 h1:DKH1GiF52LwaZaw6YDBliFEgm/JDsbIT+hn7ph6X94Q=
github.com/status-im/go-ethereum v1.9.5-status.7/go.mod h1:YyH5DKB6+z+Vaya7eIm67pnuPZ1oiUMbbsZW41ktN0g=
github.com/status-im/status-go/extkeys v1.0.0/go.mod h1:GdqJbrcpkNm5ZsSCpp+PdMxnXx+OcRBdm3PI0rs1FpU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@ -226,6 +230,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191029031824-8986dd9e96cf/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20191119213627-4f8c1d86b1ba/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20191122220453-ac88ee75c92c h1:/nJuwDLoL/zrqY6gf57vxC+Pi+pZ8bfhpPkicO5H7W4=
golang.org/x/crypto v0.0.0-20191122220453-ac88ee75c92c/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=

125
vendor/github.com/status-im/status-go/waku/handshake.go generated vendored Normal file
View File

@ -0,0 +1,125 @@
package waku
import (
"errors"
"fmt"
"io"
"math"
"reflect"
"strings"
"github.com/ethereum/go-ethereum/rlp"
)
// statusOptions defines additional information shared between peers
// during the handshake.
// There might be more options provided then fields in statusOptions
// and they should be ignored during deserialization to stay forward compatible.
// In the case of RLP, options should be serialized to an array of tuples
// where the first item is a field name and the second is a RLP-serialized value.
type statusOptions struct {
PoWRequirement uint64 `rlp:"key=0"` // RLP does not support float64 natively
BloomFilter []byte `rlp:"key=1"`
LightNodeEnabled bool `rlp:"key=2"`
ConfirmationsEnabled bool `rlp:"key=3"`
RateLimits RateLimits `rlp:"key=4"`
TopicInterest []TopicType `rlp:"key=5"`
}
var idxFieldKey = make(map[int]string)
var keyFieldIdx = func() map[string]int {
result := make(map[string]int)
opts := statusOptions{}
v := reflect.ValueOf(opts)
for i := 0; i < v.NumField(); i++ {
// skip unexported fields
if !v.Field(i).CanInterface() {
continue
}
rlpTag := v.Type().Field(i).Tag.Get("rlp")
// skip fields without rlp field tag
if rlpTag == "" {
continue
}
key := strings.Split(rlpTag, "=")[1]
result[key] = i
idxFieldKey[i] = key
}
return result
}()
func (o statusOptions) PoWRequirementF() float64 {
return math.Float64frombits(o.PoWRequirement)
}
func (o *statusOptions) SetPoWRequirementFromF(val float64) {
o.PoWRequirement = math.Float64bits(val)
}
func (o statusOptions) EncodeRLP(w io.Writer) error {
v := reflect.ValueOf(o)
optionsList := make([]interface{}, 0, v.NumField())
for i := 0; i < v.NumField(); i++ {
value := v.Field(i).Interface()
key, ok := idxFieldKey[i]
if !ok {
continue
}
optionsList = append(optionsList, []interface{}{key, value})
}
return rlp.Encode(w, optionsList)
}
func (o *statusOptions) DecodeRLP(s *rlp.Stream) error {
_, err := s.List()
if err != nil {
return fmt.Errorf("expected an outer list: %w", err)
}
v := reflect.ValueOf(o)
loop:
for {
_, err := s.List()
switch err {
case nil:
// continue to decode a key
case rlp.EOL:
break loop
default:
return fmt.Errorf("expected an inner list: %w", err)
}
var key string
if err := s.Decode(&key); err != nil {
return fmt.Errorf("invalid key: %w", err)
}
// Skip processing if a key does not exist.
// It might happen when there is a new peer
// which supports a new option with
// a higher index.
idx, ok := keyFieldIdx[key]
if !ok {
// Read the rest of the list items and dump them.
_, err := s.Raw()
if err != nil {
return fmt.Errorf("failed to read the value of key %s: %w", key, err)
}
continue
}
if err := s.Decode(v.Elem().Field(idx).Addr().Interface()); err != nil {
return fmt.Errorf("failed to decode an option %s: %w", key, err)
}
if err := s.ListEnd(); err != nil {
return err
}
}
return s.ListEnd()
}
func (o statusOptions) Validate() error {
if len(o.TopicInterest) > 1000 {
return errors.New("topic interest is limited by 1000 items")
}
return nil
}

View File

@ -96,13 +96,15 @@ func (p *Peer) handshake() error {
isLightNode := p.host.LightClientMode()
isRestrictedLightNodeConnection := p.host.LightClientModeConnectionRestricted()
go func() {
pow := p.host.MinPow()
powConverted := math.Float64bits(pow)
bloom := p.host.BloomFilter()
confirmationsEnabled := p.host.ConfirmationsEnabled()
rateLimits := p.host.RateLimits()
errc <- p2p.SendItems(p.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode, confirmationsEnabled, rateLimits)
opts := statusOptions{
BloomFilter: p.host.BloomFilter(),
LightNodeEnabled: isLightNode,
ConfirmationsEnabled: p.host.ConfirmationsEnabled(),
RateLimits: p.host.RateLimits(),
TopicInterest: nil,
}
opts.SetPoWRequirementFromF(p.host.MinPow())
errc <- p2p.SendItems(p.ws, statusCode, ProtocolVersion, opts)
}()
// Fetch the remote status packet and verify protocol match
@ -113,56 +115,51 @@ func (p *Peer) handshake() error {
if packet.Code != statusCode {
return fmt.Errorf("p [%x] sent packet %x before status packet", p.ID(), packet.Code)
}
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
_, err = s.List()
if err != nil {
return fmt.Errorf("p [%x] sent bad status message: %v", p.ID(), err)
}
peerVersion, err := s.Uint()
if err != nil {
return fmt.Errorf("p [%x] sent bad status message (unable to decode version): %v", p.ID(), err)
}
if peerVersion != ProtocolVersion {
return fmt.Errorf("p [%x]: protocol version mismatch %d != %d", p.ID(), peerVersion, ProtocolVersion)
}
// only version is mandatory, subsequent parameters are optional
powRaw, err := s.Uint()
if err == nil {
pow := math.Float64frombits(powRaw)
var (
peerProtocolVersion uint64
peerOptions statusOptions
)
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
if _, err := s.List(); err != nil {
return fmt.Errorf("p [%x]: failed to decode status packet: %w", p.ID(), err)
}
// Validate protocol version.
if err := s.Decode(&peerProtocolVersion); err != nil {
return fmt.Errorf("p [%x]: failed to decode peer protocol version: %w", p.ID(), err)
}
if peerProtocolVersion != ProtocolVersion {
return fmt.Errorf("p [%x]: protocol version mismatch %d != %d", p.ID(), peerProtocolVersion, ProtocolVersion)
}
// Decode and validate other status packet options.
if err := s.Decode(&peerOptions); err != nil {
return fmt.Errorf("p [%x]: failed to decode status options: %w", p.ID(), err)
}
if err := s.ListEnd(); err != nil {
return fmt.Errorf("p [%x]: failed to decode status packet: %w", p.ID(), err)
}
if err := peerOptions.Validate(); err != nil {
return fmt.Errorf("p [%x]: sent invalid options: %w", p.ID(), err)
}
// Validate and save peer's PoW.
pow := peerOptions.PoWRequirementF()
if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 {
return fmt.Errorf("p [%x] sent bad status message: invalid pow", p.ID())
return fmt.Errorf("p [%x]: sent bad status message: invalid pow", p.ID())
}
p.powRequirement = pow
var bloom []byte
err = s.Decode(&bloom)
if err == nil {
sz := len(bloom)
if sz != BloomFilterSize && sz != 0 {
return fmt.Errorf("p [%x] sent bad status message: wrong bloom filter size %d", p.ID(), sz)
// Validate and save peer's bloom filters.
bloom := peerOptions.BloomFilter
bloomSize := len(bloom)
if bloomSize != 0 && bloomSize != BloomFilterSize {
return fmt.Errorf("p [%x] sent bad status message: wrong bloom filter size %d", p.ID(), bloomSize)
}
p.setBloomFilter(bloom)
}
}
isRemotePeerLightNode, _ := s.Bool()
if isRemotePeerLightNode && isLightNode && isRestrictedLightNodeConnection {
// Validate and save other peer's options.
if peerOptions.LightNodeEnabled && isLightNode && isRestrictedLightNodeConnection {
return fmt.Errorf("p [%x] is useless: two light client communication restricted", p.ID())
}
confirmationsEnabled, err := s.Bool()
if err != nil || !confirmationsEnabled {
p.logger.Info("confirmations are disabled for peer", zap.Binary("peer", p.ID()))
} else {
p.confirmationsEnabled = confirmationsEnabled
}
var rateLimits RateLimits
if err := s.Decode(&rateLimits); err != nil {
p.logger.Info("rate limiting is disabled for peer", zap.Binary("peer", p.ID()))
} else {
p.setRateLimits(rateLimits)
}
p.confirmationsEnabled = peerOptions.ConfirmationsEnabled
p.setRateLimits(peerOptions.RateLimits)
if err := <-errc; err != nil {
return fmt.Errorf("p [%x] failed to send status packet: %v", p.ID(), err)

View File

@ -51,6 +51,10 @@ import (
// TimeSyncError error for clock skew errors.
type TimeSyncError error
type Bridge interface {
Pipe() (<-chan *Envelope, chan<- *Envelope)
}
type settings struct {
MaxMsgSize uint32 // Maximal message length allowed by the waku node
EnableConfirmations bool // Enable sending message confirmations
@ -95,6 +99,10 @@ type Waku struct {
timeSource func() time.Time // source of time for waku
bridge Bridge
bridgeWg sync.WaitGroup
cancelBridge chan struct{}
logger *zap.Logger
}
@ -343,6 +351,47 @@ func (w *Waku) RegisterRateLimiter(r *PeerRateLimiter) {
w.rateLimiter = r
}
// RegisterBridge registers a new Bridge that moves envelopes
// between different subprotocols.
// It's important that a bridge is registered before the service
// is started, otherwise, it won't read and propagate envelopes.
func (w *Waku) RegisterBridge(b Bridge) {
if w.cancelBridge != nil {
close(w.cancelBridge)
}
w.bridge = b
w.cancelBridge = make(chan struct{})
w.bridgeWg.Add(1)
go w.readBridgeLoop()
}
func (w *Waku) readBridgeLoop() {
defer w.bridgeWg.Done()
out, _ := w.bridge.Pipe()
for {
select {
case <-w.cancelBridge:
return
case env := <-out:
_, err := w.addAndBridge(env, false, true)
if err != nil {
w.logger.Warn(
"failed to add a bridged envelope",
zap.Binary("ID", env.Hash().Bytes()),
zap.Error(err),
)
} else {
w.logger.Debug("bridged envelope successfully", zap.Binary("ID", env.Hash().Bytes()))
w.envelopeFeed.Send(EnvelopeEvent{
Event: EventEnvelopeReceived,
Topic: env.Topic,
Hash: env.Hash(),
})
}
}
}
}
// SubscribeEnvelopeEvents subscribes to envelopes feed.
// In order to prevent blocking waku producers events must be amply buffered.
func (w *Waku) SubscribeEnvelopeEvents(events chan<- EnvelopeEvent) event.Subscription {
@ -829,6 +878,11 @@ func (w *Waku) Start(*p2p.Server) error {
// Stop implements node.Service, stopping the background data propagation thread
// of the Waku protocol.
func (w *Waku) Stop() error {
if w.cancelBridge != nil {
close(w.cancelBridge)
w.cancelBridge = nil
w.bridgeWg.Wait()
}
close(w.quit)
return nil
}
@ -1145,11 +1199,15 @@ func (w *Waku) handleBatchAcknowledgeCode(p *Peer, packet p2p.Msg, logger *zap.L
return nil
}
// add inserts a new envelope into the message pool to be distributed within the
func (w *Waku) add(envelope *Envelope, isP2P bool) (bool, error) {
return w.addAndBridge(envelope, isP2P, false)
}
// addAndBridge inserts a new envelope into the message pool to be distributed within the
// waku network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp. In case of error, connection should be dropped.
// param isP2P indicates whether the message is peer-to-peer (should not be forwarded).
func (w *Waku) add(envelope *Envelope, isP2P bool) (bool, error) {
func (w *Waku) addAndBridge(envelope *Envelope, isP2P bool, bridged bool) (bool, error) {
now := uint32(w.timeSource().Unix())
sent := envelope.Expiry - envelope.TTL
@ -1232,6 +1290,13 @@ func (w *Waku) add(envelope *Envelope, isP2P bool) (bool, error) {
Event: EventMailServerEnvelopeArchived,
})
}
// Bridge only envelopes that are not p2p messages.
// In particular, if a node is a lightweight node,
// it should not bridge any envelopes.
if !isP2P && !bridged && w.bridge != nil {
_, in := w.bridge.Pipe()
in <- envelope
}
}
return true, nil
}

View File

@ -43,6 +43,10 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)
type Bridge interface {
Pipe() (<-chan *Envelope, chan<- *Envelope)
}
// TimeSyncError error for clock skew errors.
type TimeSyncError error
@ -113,6 +117,10 @@ type Whisper struct {
envelopeFeed event.Feed
timeSource func() time.Time // source of time for whisper
bridge Bridge
bridgeWg sync.WaitGroup
cancelBridge chan struct{}
}
// New creates a Whisper client ready to communicate through the Ethereum P2P network.
@ -268,6 +276,51 @@ func (whisper *Whisper) RegisterMailServer(server MailServer) {
whisper.mailServer = server
}
// RegisterBridge registers a new Bridge that moves envelopes
// between different subprotocols.
// It's important that a bridge is registered before the service
// is started, otherwise, it won't read and propagate envelopes.
func (whisper *Whisper) RegisterBridge(b Bridge) {
if whisper.cancelBridge != nil {
close(whisper.cancelBridge)
whisper.bridgeWg.Wait()
}
whisper.bridge = b
whisper.cancelBridge = make(chan struct{})
whisper.bridgeWg.Add(1)
go whisper.readBridgeLoop()
}
func (whisper *Whisper) readBridgeLoop() {
defer whisper.bridgeWg.Done()
out, _ := whisper.bridge.Pipe()
for {
select {
case <-whisper.cancelBridge:
return
case env := <-out:
_, err := whisper.addAndBridge(env, false, true)
if err != nil {
log.Warn(
"failed to add a bridged envelope",
"ID", env.Hash().Bytes(),
"err", err,
)
} else {
log.Debug(
"bridged envelope successfully",
"ID", env.Hash().Bytes(),
)
whisper.envelopeFeed.Send(EnvelopeEvent{
Event: EventEnvelopeReceived,
Topic: env.Topic,
Hash: env.Hash(),
})
}
}
}
}
// Protocols returns the whisper sub-protocols ran by this particular client.
func (whisper *Whisper) Protocols() []p2p.Protocol {
return []p2p.Protocol{whisper.protocol}
@ -877,6 +930,11 @@ func (whisper *Whisper) Start(*p2p.Server) error {
// Stop implements node.Service, stopping the background data propagation thread
// of the Whisper protocol.
func (whisper *Whisper) Stop() error {
if whisper.cancelBridge != nil {
close(whisper.cancelBridge)
whisper.cancelBridge = nil
whisper.bridgeWg.Wait()
}
close(whisper.quit)
log.Info("whisper stopped")
return nil
@ -1202,18 +1260,14 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
log.Warn("failed to decode response message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid request response message")
}
event, err := CreateMailServerEvent(p.peer.ID(), payload)
if err != nil {
log.Warn("error while parsing request complete code, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return err
}
if event != nil {
whisper.postP2P(*event)
}
}
default:
// New message types might be implemented in the future versions of Whisper.
@ -1224,11 +1278,15 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
}
}
func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
return whisper.addAndBridge(envelope, isP2P, false)
}
// add inserts a new envelope into the message pool to be distributed within the
// whisper network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp. In case of error, connection should be dropped.
// param isP2P indicates whether the message is peer-to-peer (should not be forwarded).
func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
func (whisper *Whisper) addAndBridge(envelope *Envelope, isP2P bool, bridged bool) (bool, error) {
now := uint32(whisper.timeSource().Unix())
sent := envelope.Expiry - envelope.TTL
@ -1313,6 +1371,13 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
Event: EventMailServerEnvelopeArchived,
})
}
// Bridge only envelopes that are not p2p messages.
// In particular, if a node is a lightweight node,
// it should not bridge any envelopes.
if !isP2P && !bridged && whisper.bridge != nil {
_, in := whisper.bridge.Pipe()
in <- envelope
}
}
return true, nil
}

4
vendor/modules.txt vendored
View File

@ -374,9 +374,9 @@ github.com/status-im/rendezvous/protocol
github.com/status-im/rendezvous/server
# github.com/status-im/status-go/extkeys v1.1.0
github.com/status-im/status-go/extkeys
# github.com/status-im/status-go/waku v1.2.0
# github.com/status-im/status-go/waku v1.3.0
github.com/status-im/status-go/waku
# github.com/status-im/status-go/whisper/v6 v6.1.0
# github.com/status-im/status-go/whisper/v6 v6.2.0
github.com/status-im/status-go/whisper/v6
# github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501
github.com/status-im/tcp-shaker