status-go/services/wakuext/api_test.go

407 lines
12 KiB
Go

package wakuext
import (
"context"
"encoding/hex"
"fmt"
"io/ioutil"
"math"
"net"
"os"
"strconv"
"testing"
"time"
"go.uber.org/zap"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/appdatabase"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/ext"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/waku"
wakucommon "github.com/status-im/status-go/waku/common"
v0 "github.com/status-im/status-go/waku/v0"
)
func TestRequestMessagesErrors(t *testing.T) {
var err error
waku := gethbridge.NewGethWakuWrapper(waku.New(nil, nil))
aNode, err := node.New(&node.Config{
P2P: p2p.Config{
MaxPeers: math.MaxInt32,
NoDiscovery: true,
},
NoUSB: true,
}) // in-memory node as no data dir
require.NoError(t, err)
err = aNode.Register(func(*node.ServiceContext) (node.Service, error) {
return gethbridge.GetGethWakuFrom(waku), nil
})
require.NoError(t, err)
err = aNode.Start()
require.NoError(t, err)
defer func() { require.NoError(t, aNode.Stop()) }()
handler := ext.NewHandlerMock(1)
config := params.ShhextConfig{
InstallationID: "1",
BackupDisabledDataDir: os.TempDir(),
PFSEnabled: true,
}
nodeWrapper := ext.NewTestNodeWrapper(nil, waku)
service := New(config, nodeWrapper, nil, handler, nil)
api := NewPublicAPI(service)
const mailServerPeer = "enode://b7e65e1bedc2499ee6cbd806945af5e7df0e59e4070c96821570bd581473eade24a489f5ec95d060c0db118c879403ab88d827d3766978f28708989d35474f87@[::]:51920"
var hash []byte
// invalid MailServer enode address
hash, err = api.RequestMessages(context.TODO(), ext.MessagesRequest{MailServerPeer: "invalid-address"})
require.Nil(t, hash)
require.EqualError(t, err, "invalid mailServerPeer value: invalid URL scheme, want \"enode\"")
// non-existent symmetric key
hash, err = api.RequestMessages(context.TODO(), ext.MessagesRequest{
MailServerPeer: mailServerPeer,
SymKeyID: "invalid-sym-key-id",
})
require.Nil(t, hash)
require.EqualError(t, err, "invalid symKeyID value: non-existent key ID")
// with a symmetric key
symKeyID, symKeyErr := waku.AddSymKeyFromPassword("some-pass")
require.NoError(t, symKeyErr)
hash, err = api.RequestMessages(context.TODO(), ext.MessagesRequest{
MailServerPeer: mailServerPeer,
SymKeyID: symKeyID,
})
require.Nil(t, hash)
require.Contains(t, err.Error(), "could not find peer with ID")
// from is greater than to
hash, err = api.RequestMessages(context.TODO(), ext.MessagesRequest{
From: 10,
To: 5,
})
require.Nil(t, hash)
require.Contains(t, err.Error(), "Query range is invalid: from > to (10 > 5)")
}
func TestInitProtocol(t *testing.T) {
directory, err := ioutil.TempDir("", "status-go-testing")
require.NoError(t, err)
config := params.ShhextConfig{
InstallationID: "2",
BackupDisabledDataDir: directory,
PFSEnabled: true,
MailServerConfirmations: true,
ConnectionTarget: 10,
}
db, err := leveldb.Open(storage.NewMemStorage(), nil)
require.NoError(t, err)
waku := gethbridge.NewGethWakuWrapper(waku.New(nil, nil))
privateKey, err := crypto.GenerateKey()
require.NoError(t, err)
nodeWrapper := ext.NewTestNodeWrapper(nil, waku)
service := New(config, nodeWrapper, nil, nil, db)
tmpdir, err := ioutil.TempDir("", "test-shhext-service-init-protocol")
require.NoError(t, err)
sqlDB, err := appdatabase.InitializeDB(fmt.Sprintf("%s/db.sql", tmpdir), "password")
require.NoError(t, err)
err = service.InitProtocol(privateKey, sqlDB, zap.NewNop())
require.NoError(t, err)
}
func TestShhExtSuite(t *testing.T) {
suite.Run(t, new(ShhExtSuite))
}
type ShhExtSuite struct {
suite.Suite
dir string
nodes []*node.Node
wakus []types.Waku
services []*Service
}
func (s *ShhExtSuite) createAndAddNode() {
idx := len(s.nodes)
// create a node
cfg := &node.Config{
Name: strconv.Itoa(idx),
P2P: p2p.Config{
MaxPeers: math.MaxInt32,
NoDiscovery: true,
ListenAddr: ":0",
},
NoUSB: true,
}
stack, err := node.New(cfg)
s.NoError(err)
w := waku.New(nil, nil)
err = stack.Register(func(n *node.ServiceContext) (node.Service, error) {
return w, nil
})
s.NoError(err)
// set up protocol
config := params.ShhextConfig{
InstallationID: "1",
BackupDisabledDataDir: s.dir,
PFSEnabled: true,
MailServerConfirmations: true,
ConnectionTarget: 10,
}
db, err := leveldb.Open(storage.NewMemStorage(), nil)
s.Require().NoError(err)
nodeWrapper := ext.NewTestNodeWrapper(nil, gethbridge.NewGethWakuWrapper(w))
service := New(config, nodeWrapper, nil, nil, db)
sqlDB, err := appdatabase.InitializeDB(fmt.Sprintf("%s/%d", s.dir, idx), "password")
s.Require().NoError(err)
privateKey, err := crypto.GenerateKey()
s.NoError(err)
err = service.InitProtocol(privateKey, sqlDB, zap.NewNop())
s.NoError(err)
err = stack.Register(func(n *node.ServiceContext) (node.Service, error) {
return service, nil
})
s.NoError(err)
// start the node
err = stack.Start()
s.Require().NoError(err)
// store references
s.nodes = append(s.nodes, stack)
s.wakus = append(s.wakus, gethbridge.NewGethWakuWrapper(w))
s.services = append(s.services, service)
}
func (s *ShhExtSuite) SetupTest() {
var err error
s.dir, err = ioutil.TempDir("", "status-go-testing")
s.Require().NoError(err)
}
func (s *ShhExtSuite) TearDownTest() {
for _, n := range s.nodes {
s.NoError(n.Stop())
}
s.nodes = nil
s.wakus = nil
s.services = nil
}
func (s *ShhExtSuite) TestRequestMessagesSuccess() {
// two nodes needed: client and mailserver
s.createAndAddNode()
s.createAndAddNode()
waitErr := helpers.WaitForPeerAsync(s.nodes[0].Server(), s.nodes[1].Server().Self().URLv4(), p2p.PeerEventTypeAdd, time.Second)
s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self())
s.Require().NoError(<-waitErr)
api := NewPublicAPI(s.services[0])
_, err := api.RequestMessages(context.Background(), ext.MessagesRequest{
MailServerPeer: s.nodes[1].Server().Self().URLv4(),
Topics: []types.TopicType{{1}},
})
s.NoError(err)
}
func (s *ShhExtSuite) TestMultipleRequestMessagesWithoutForce() {
// two nodes needed: client and mailserver
s.createAndAddNode()
s.createAndAddNode()
waitErr := helpers.WaitForPeerAsync(s.nodes[0].Server(), s.nodes[1].Server().Self().URLv4(), p2p.PeerEventTypeAdd, time.Second)
s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self())
s.Require().NoError(<-waitErr)
api := NewPublicAPI(s.services[0])
_, err := api.RequestMessages(context.Background(), ext.MessagesRequest{
MailServerPeer: s.nodes[1].Server().Self().URLv4(),
Topics: []types.TopicType{{1}},
})
s.NoError(err)
_, err = api.RequestMessages(context.Background(), ext.MessagesRequest{
MailServerPeer: s.nodes[1].Server().Self().URLv4(),
Topics: []types.TopicType{{1}},
})
s.EqualError(err, "another request with the same topics was sent less than 3s ago. Please wait for a bit longer, or set `force` to true in request parameters")
_, err = api.RequestMessages(context.Background(), ext.MessagesRequest{
MailServerPeer: s.nodes[1].Server().Self().URLv4(),
Topics: []types.TopicType{{2}},
})
s.NoError(err)
}
func (s *ShhExtSuite) TestFailedRequestWithUnknownMailServerPeer() {
s.createAndAddNode()
api := NewPublicAPI(s.services[0])
_, err := api.RequestMessages(context.Background(), ext.MessagesRequest{
MailServerPeer: "enode://19872f94b1e776da3a13e25afa71b47dfa99e658afd6427ea8d6e03c22a99f13590205a8826443e95a37eee1d815fc433af7a8ca9a8d0df7943d1f55684045b7@0.0.0.0:30305",
Topics: []types.TopicType{{1}},
})
s.EqualError(err, "could not find peer with ID: 10841e6db5c02fc331bf36a8d2a9137a1696d9d3b6b1f872f780e02aa8ec5bba")
}
const (
// internal waku protocol codes
p2pRequestCompleteCode = 125
)
type WakuNodeMockSuite struct {
suite.Suite
localWakuAPI *waku.PublicWakuAPI
localAPI *PublicAPI
localNode *enode.Node
remoteRW *p2p.MsgPipeRW
localService *Service
}
func (s *WakuNodeMockSuite) SetupTest() {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
s.Require().NoError(err)
conf := &waku.Config{
MinimumAcceptedPoW: 0,
MaxMessageSize: 100 << 10,
EnableConfirmations: true,
}
w := waku.New(conf, nil)
w2 := waku.New(nil, nil)
s.Require().NoError(w.Start(nil))
pkey, err := crypto.GenerateKey()
s.Require().NoError(err)
node := enode.NewV4(&pkey.PublicKey, net.ParseIP("127.0.0.1"), 1, 1)
rw1, rw2 := p2p.MsgPipe()
peer := v0.NewPeer(w, p2p.NewPeer(node.ID(), "1", []p2p.Cap{{"shh", 6}}), rw2, nil)
go func() {
err := w.HandlePeer(peer, rw2)
panic(err)
}()
wakuWrapper := gethbridge.NewGethWakuWrapper(w)
peer1 := v0.NewPeer(w2, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw1, nil)
err = peer1.Start()
s.Require().NoError(err, "failed run message loop")
nodeWrapper := ext.NewTestNodeWrapper(nil, wakuWrapper)
s.localService = New(
params.ShhextConfig{MailServerConfirmations: true, MaxMessageDeliveryAttempts: 3},
nodeWrapper,
nil,
nil,
db,
)
s.Require().NoError(s.localService.UpdateMailservers([]*enode.Node{node}))
s.localWakuAPI = waku.NewPublicWakuAPI(w)
s.localAPI = NewPublicAPI(s.localService)
s.localNode = node
s.remoteRW = rw1
}
func TestRequestMessagesSync(t *testing.T) {
suite.Run(t, new(RequestMessagesSyncSuite))
}
type RequestMessagesSyncSuite struct {
WakuNodeMockSuite
}
// NOTE: Disabling this for now as too flaky
/*
func (s *RequestMessagesSyncSuite) TestExpired() {
// intentionally discarding all requests, so that request will timeout
go func() {
for {
msg, err := s.remoteRW.ReadMsg()
s.Require().NoError(err)
s.Require().NoError(msg.Discard())
}
}()
_, err := s.localAPI.RequestMessagesSync(
ext.RetryConfig{
BaseTimeout: time.Millisecond * 100,
},
ext.MessagesRequest{
MailServerPeer: s.localNode.String(),
Topics: []common.TopicType{{0x01, 0x02, 0x03, 0x04}},
},
)
s.Require().EqualError(err, "failed to request messages after 1 retries")
}
*/
func (s *RequestMessagesSyncSuite) testCompletedFromAttempt(target int) {
const cursorSize = 36 // taken from mailserver_response.go from waku package
cursor := [cursorSize]byte{}
cursor[0] = 0x01
go func() {
attempt := 0
for {
attempt++
msg, err := s.remoteRW.ReadMsg()
s.Require().NoError(err)
if attempt < target {
s.Require().NoError(msg.Discard())
continue
}
var e wakucommon.Envelope
s.Require().NoError(msg.Decode(&e))
s.Require().NoError(p2p.Send(s.remoteRW, p2pRequestCompleteCode, waku.CreateMailServerRequestCompletedPayload(e.Hash(), common.Hash{}, cursor[:])))
}
}()
resp, err := s.localAPI.RequestMessagesSync(
ext.RetryConfig{
BaseTimeout: time.Second,
MaxRetries: target,
},
ext.MessagesRequest{
MailServerPeer: s.localNode.String(),
Force: true, // force true is convenient here because timeout is less then default delay (3s)
},
)
s.Require().NoError(err)
s.Require().Equal(ext.MessagesResponse{Cursor: hex.EncodeToString(cursor[:])}, resp)
}
func (s *RequestMessagesSyncSuite) TestCompletedFromFirstAttempt() {
s.testCompletedFromAttempt(1)
}
func (s *RequestMessagesSyncSuite) TestCompletedFromSecondAttempt() {
s.testCompletedFromAttempt(2)
}