diff --git a/Gopkg.lock b/Gopkg.lock index fe6f6d2f6..c6ec200fb 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -822,12 +822,12 @@ revision = "fbcc46a78cd43fef95a110df664aab513116a850" [[projects]] - digest = "1:6cb252f27feb57ef0e8406556c259d903c0ecff2ab0d2200ca85773b3561777d" + digest = "1:5c62af344925b846377386dec72e06eb3e1e15222542b3d22fe0f0da75c7f090" name = "github.com/status-im/whisper" packages = ["whisperv6"] pruneopts = "NUT" - revision = "76c24476436f0cf832021be98316a4ee62cc83cc" - version = "v1.4.0" + revision = "96d2199ed511430c642d877afe7bacaac5f37426" + version = "v1.4.1" [[projects]] digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e" diff --git a/Gopkg.toml b/Gopkg.toml index e37bc6056..254703bb2 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -29,7 +29,7 @@ [[constraint]] name = "github.com/status-im/whisper" - version = "=v1.4.0" + version = "=v1.4.1" [[override]] name = "github.com/golang/protobuf" diff --git a/cmd/node-canary/main.go b/cmd/node-canary/main.go index 9bc5b04c4..9df9fe6fa 100644 --- a/cmd/node-canary/main.go +++ b/cmd/node-canary/main.go @@ -173,7 +173,7 @@ func verifyMailserverBehavior(mailserverNode *enode.Node) { Limit: 1, Topic: topic, SymKeyID: mailServerKeyID, - Timeout: time.Duration(*timeout) * time.Second, + Timeout: time.Duration(*timeout), }) if err != nil { logger.Error("Error requesting historic messages from mailserver", "error", err) @@ -333,6 +333,8 @@ func waitForMailServerResponse(events chan whisper.EnvelopeEvent, requestID comm func decodeMailServerResponse(event whisper.EnvelopeEvent) (*whisper.MailServerResponse, error) { switch event.Event { + case whisper.EventMailServerRequestSent: + return nil, nil case whisper.EventMailServerRequestCompleted: resp, ok := event.Data.(*whisper.MailServerResponse) if !ok { diff --git a/services/shhext/api.go b/services/shhext/api.go index a6bd10ecb..b3a69e029 100644 --- a/services/shhext/api.go +++ b/services/shhext/api.go @@ -17,6 +17,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/status-im/status-go/services/shhext/chat" + "github.com/status-im/status-go/services/shhext/mailservers" whisper "github.com/status-im/whisper/whisperv6" ) @@ -150,6 +151,13 @@ func (api *PublicAPI) Post(ctx context.Context, req whisper.NewMessage) (hash he return hash, err } +func (api *PublicAPI) getPeer(rawurl string) (*enode.Node, error) { + if len(rawurl) == 0 { + return mailservers.GetFirstConnected(api.service.server, api.service.peerStore) + } + return enode.ParseV4(rawurl) +} + // RequestMessages sends a request for historic messages to a MailServer. func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hexutil.Bytes, error) { api.log.Info("RequestMessages", "request", r) @@ -161,9 +169,7 @@ func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hex return nil, fmt.Errorf("Query range is invalid: from > to (%d > %d)", r.From, r.To) } - var err error - - mailServerNode, err := enode.ParseV4(r.MailServerPeer) + mailServerNode, err := api.getPeer(r.MailServerPeer) if err != nil { return nil, fmt.Errorf("%v: %v", ErrInvalidMailServerPeer, err) } @@ -199,13 +205,10 @@ func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hex return nil, err } - hash := envelope.Hash() - if err := shh.RequestHistoricMessages(mailServerNode.ID().Bytes(), envelope); err != nil { + if err := shh.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, r.Timeout*time.Second); err != nil { return nil, err } - - api.service.tracker.AddRequest(hash, time.After(r.Timeout*time.Second)) - + hash := envelope.Hash() return hash[:], nil } diff --git a/services/shhext/mailservers/connmanager.go b/services/shhext/mailservers/connmanager.go new file mode 100644 index 000000000..905d0c01f --- /dev/null +++ b/services/shhext/mailservers/connmanager.go @@ -0,0 +1,202 @@ +package mailservers + +import ( + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/status-im/whisper/whisperv6" +) + +const ( + peerEventsBuffer = 10 // sufficient buffer to avoid blocking a p2p feed. + whisperEventsBuffer = 20 // sufficient buffer to avod blocking a whisper envelopes feed. +) + +// PeerAdderRemover is an interface for adding or removing peers. +type PeerAdderRemover interface { + AddPeer(node *enode.Node) + RemovePeer(node *enode.Node) +} + +// PeerEventsSubscriber interface to subscribe for p2p.PeerEvent's. +type PeerEventsSubscriber interface { + SubscribeEvents(chan *p2p.PeerEvent) event.Subscription +} + +// EnvelopeEventSubscbriber interface to subscribe for whisper.EnvelopeEvent's. +type EnvelopeEventSubscbriber interface { + SubscribeEnvelopeEvents(chan<- whisperv6.EnvelopeEvent) event.Subscription +} + +type p2pServer interface { + PeerAdderRemover + PeerEventsSubscriber +} + +// NewConnectionManager creates an instance of ConnectionManager. +func NewConnectionManager(server p2pServer, whisper EnvelopeEventSubscbriber, target int) *ConnectionManager { + return &ConnectionManager{ + server: server, + whisper: whisper, + connectedTarget: target, + notifications: make(chan []*enode.Node), + } +} + +// ConnectionManager manages keeps target of peers connected. +type ConnectionManager struct { + wg sync.WaitGroup + quit chan struct{} + + server p2pServer + whisper EnvelopeEventSubscbriber + + notifications chan []*enode.Node + connectedTarget int +} + +// Notify sends a non-blocking notification about new nodes. +func (ps *ConnectionManager) Notify(nodes []*enode.Node) { + ps.wg.Add(1) + go func() { + select { + case ps.notifications <- nodes: + case <-ps.quit: + } + ps.wg.Done() + }() + +} + +// Start subscribes to a p2p server and handles new peers and state updates for those peers. +func (ps *ConnectionManager) Start() { + ps.quit = make(chan struct{}) + ps.wg.Add(1) + go func() { + current := map[enode.ID]*enode.Node{} + connected := map[enode.ID]struct{}{} + events := make(chan *p2p.PeerEvent, peerEventsBuffer) + sub := ps.server.SubscribeEvents(events) + whisperEvents := make(chan whisperv6.EnvelopeEvent, whisperEventsBuffer) + whisperSub := ps.whisper.SubscribeEnvelopeEvents(whisperEvents) + requests := map[common.Hash]struct{}{} + for { + select { + case <-ps.quit: + sub.Unsubscribe() + whisperSub.Unsubscribe() + ps.wg.Done() + return + case err := <-sub.Err(): + log.Error("retry after error subscribing to p2p events", "error", err) + sub = ps.server.SubscribeEvents(events) + case err := <-whisperSub.Err(): + log.Error("retry after error suscribing to whisper events", "error", err) + whisperSub = ps.whisper.SubscribeEnvelopeEvents(whisperEvents) + case newNodes := <-ps.notifications: + replacement := map[enode.ID]*enode.Node{} + for _, n := range newNodes { + replacement[n.ID()] = n + } + replaceNodes(ps.server, ps.connectedTarget, connected, current, replacement) + current = replacement + case ev := <-events: + switch ev.Type { + case p2p.PeerEventTypeAdd: + log.Debug("connected to a mailserver", "address", ev.Peer) + nodeAdded(ps.server, ev.Peer, ps.connectedTarget, connected, current) + case p2p.PeerEventTypeDrop: + log.Debug("mailserver disconnected", "address", ev.Peer) + nodeDisconnected(ps.server, ev.Peer, ps.connectedTarget, connected, current) + } + case ev := <-whisperEvents: + // TODO what about completed but with error? what about expired envelopes? + switch ev.Event { + case whisperv6.EventMailServerRequestSent: + requests[ev.Hash] = struct{}{} + case whisperv6.EventMailServerRequestCompleted: + delete(requests, ev.Hash) + case whisperv6.EventMailServerRequestExpired: + _, exist := requests[ev.Hash] + if !exist { + continue + } + log.Debug("request to a mail server expired, disconncet a peer", "address", ev.Peer) + nodeDisconnected(ps.server, ev.Peer, ps.connectedTarget, connected, current) + } + } + } + }() +} + +// Stop gracefully closes all background goroutines and waits until they finish. +func (ps *ConnectionManager) Stop() { + if ps.quit == nil { + return + } + select { + case <-ps.quit: + return + default: + } + close(ps.quit) + ps.wg.Wait() + ps.quit = nil +} + +func replaceNodes(srv PeerAdderRemover, target int, connected map[enode.ID]struct{}, old, new map[enode.ID]*enode.Node) { + for nid, n := range old { + if _, exist := new[nid]; !exist { + if _, exist := connected[nid]; exist { + delete(connected, nid) + } + srv.RemovePeer(n) + } + } + if len(connected) < target { + for _, n := range new { + srv.AddPeer(n) + } + } +} + +func nodeAdded(srv PeerAdderRemover, peer enode.ID, target int, connected map[enode.ID]struct{}, nodes map[enode.ID]*enode.Node) { + n, exist := nodes[peer] + if !exist { + return + } + if len(connected) == target { + srv.RemovePeer(n) + } else { + connected[n.ID()] = struct{}{} + } +} + +func nodeDisconnected(srv PeerAdderRemover, peer enode.ID, target int, connected map[enode.ID]struct{}, nodes map[enode.ID]*enode.Node) { + n, exist := nodes[peer] // unrelated event + if !exist { + return + } + _, exist = connected[peer] // check if already disconnected + if !exist { + return + } + if len(nodes) == 1 { // keep node connected if we don't have another choice + return + } + srv.RemovePeer(n) // remove peer permanently, otherwise p2p.Server will try to reconnect + delete(connected, peer) + if len(connected) < target { // try to connect with any other selected (but not connected) node + for nid, n := range nodes { + _, exist := connected[nid] + if exist || peer == nid { + continue + } + srv.AddPeer(n) + } + } +} diff --git a/services/shhext/mailservers/connmanager_test.go b/services/shhext/mailservers/connmanager_test.go new file mode 100644 index 000000000..0e8f45144 --- /dev/null +++ b/services/shhext/mailservers/connmanager_test.go @@ -0,0 +1,313 @@ +package mailservers + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/status-im/status-go/t/utils" + "github.com/status-im/whisper/whisperv6" + "github.com/stretchr/testify/require" +) + +type fakePeerEvents struct { + mu sync.Mutex + nodes map[enode.ID]struct{} + input chan *p2p.PeerEvent +} + +func (f *fakePeerEvents) Nodes() []enode.ID { + f.mu.Lock() + rst := make([]enode.ID, 0, len(f.nodes)) + for n := range f.nodes { + rst = append(rst, n) + } + f.mu.Unlock() + return rst +} + +func (f *fakePeerEvents) AddPeer(node *enode.Node) { + f.mu.Lock() + f.nodes[node.ID()] = struct{}{} + f.mu.Unlock() + if f.input == nil { + return + } + f.input <- &p2p.PeerEvent{ + Peer: node.ID(), + Type: p2p.PeerEventTypeAdd, + } +} + +func (f *fakePeerEvents) RemovePeer(node *enode.Node) { + f.mu.Lock() + delete(f.nodes, node.ID()) + f.mu.Unlock() + if f.input == nil { + return + } + f.input <- &p2p.PeerEvent{ + Peer: node.ID(), + Type: p2p.PeerEventTypeDrop, + } +} + +func newFakePeerAdderRemover() *fakePeerEvents { + return &fakePeerEvents{nodes: map[enode.ID]struct{}{}} +} + +func (f *fakePeerEvents) SubscribeEvents(output chan *p2p.PeerEvent) event.Subscription { + return event.NewSubscription(func(quit <-chan struct{}) error { + for { + select { + case <-quit: + return nil + case ev := <-f.input: + // will block the same way as in any feed + output <- ev + } + } + }) +} + +func newFakeServer() *fakePeerEvents { + srv := newFakePeerAdderRemover() + srv.input = make(chan *p2p.PeerEvent, 20) + return srv +} + +type fakeEnvelopeEvents struct { + input chan whisperv6.EnvelopeEvent +} + +func (f fakeEnvelopeEvents) SubscribeEnvelopeEvents(output chan<- whisperv6.EnvelopeEvent) event.Subscription { + return event.NewSubscription(func(quit <-chan struct{}) error { + for { + select { + case <-quit: + return nil + case ev := <-f.input: + // will block the same way as in any feed + output <- ev + } + } + }) +} + +func newFakeEnvelopesEvents() fakeEnvelopeEvents { + return fakeEnvelopeEvents{ + input: make(chan whisperv6.EnvelopeEvent), + } +} + +func getNRandomNodes(t *testing.T, n int) map[enode.ID]*enode.Node { + rst := map[enode.ID]*enode.Node{} + for i := 0; i < n; i++ { + n, err := RandomeNode() + require.NoError(t, err) + rst[n.ID()] = n + } + return rst +} + +func mergeOldIntoNew(old, new map[enode.ID]*enode.Node) { + for n := range old { + new[n] = old[n] + } +} + +func TestReplaceNodes(t *testing.T) { + type testCase struct { + description string + old map[enode.ID]*enode.Node + new map[enode.ID]*enode.Node + target int + } + for _, tc := range []testCase{ + { + "InitialReplace", + getNRandomNodes(t, 0), + getNRandomNodes(t, 3), + 2, + }, + { + "FullReplace", + getNRandomNodes(t, 3), + getNRandomNodes(t, 3), + 2, + }, + } { + t.Run(tc.description, func(t *testing.T) { + peers := newFakePeerAdderRemover() + replaceNodes(peers, tc.target, peers.nodes, nil, tc.old) + require.Len(t, peers.nodes, len(tc.old)) + for n := range peers.nodes { + require.Contains(t, tc.old, n) + } + replaceNodes(peers, tc.target, peers.nodes, tc.old, tc.new) + require.Len(t, peers.nodes, len(tc.new)) + for n := range peers.nodes { + require.Contains(t, tc.new, n) + } + }) + } +} + +func TestPartialReplaceNodesBelowTarget(t *testing.T) { + peers := newFakePeerAdderRemover() + old := getNRandomNodes(t, 1) + new := getNRandomNodes(t, 2) + replaceNodes(peers, 2, peers.nodes, nil, old) + mergeOldIntoNew(old, new) + replaceNodes(peers, 2, peers.nodes, old, new) + require.Len(t, peers.nodes, len(new)) +} + +func TestPartialReplaceNodesAboveTarget(t *testing.T) { + peers := newFakePeerAdderRemover() + old := getNRandomNodes(t, 1) + new := getNRandomNodes(t, 2) + replaceNodes(peers, 1, peers.nodes, nil, old) + mergeOldIntoNew(old, new) + replaceNodes(peers, 1, peers.nodes, old, new) + require.Len(t, peers.nodes, 1) +} + +func TestConnectionManagerAddDrop(t *testing.T) { + server := newFakeServer() + whisper := newFakeEnvelopesEvents() + target := 1 + connmanager := NewConnectionManager(server, whisper, target) + connmanager.Start() + defer connmanager.Stop() + nodes := []*enode.Node{} + for _, n := range getNRandomNodes(t, 3) { + nodes = append(nodes, n) + } + // Send 3 random nodes to connection manager. + connmanager.Notify(nodes) + var initial enode.ID + // Wait till connection manager establishes connection with 1 peer. + require.NoError(t, utils.Eventually(func() error { + nodes := server.Nodes() + if len(nodes) != target { + return fmt.Errorf("unexpected number of connected servers: %d", len(nodes)) + } + initial = nodes[0] + return nil + }, time.Second, 100*time.Millisecond)) + // Send an event that peer was dropped. + select { + case server.input <- &p2p.PeerEvent{Peer: initial, Type: p2p.PeerEventTypeDrop}: + case <-time.After(time.Second): + require.FailNow(t, "can't send a drop event") + } + // Connection manager should establish connection with any other peer from initial list. + require.NoError(t, utils.Eventually(func() error { + nodes := server.Nodes() + if len(nodes) != target { + return fmt.Errorf("unexpected number of connected servers: %d", len(nodes)) + } + if nodes[0] == initial { + return fmt.Errorf("connected node wasn't changed from %s", initial) + } + return nil + }, time.Second, 100*time.Millisecond)) +} + +func TestConnectionManagerReplace(t *testing.T) { + server := newFakeServer() + whisper := newFakeEnvelopesEvents() + target := 1 + connmanager := NewConnectionManager(server, whisper, target) + connmanager.Start() + defer connmanager.Stop() + nodes := []*enode.Node{} + for _, n := range getNRandomNodes(t, 3) { + nodes = append(nodes, n) + } + // Send a single node to connection manager. + connmanager.Notify(nodes[:1]) + // Wait until this node will get connected. + require.NoError(t, utils.Eventually(func() error { + connected := server.Nodes() + if len(connected) != target { + return fmt.Errorf("unexpected number of connected servers: %d", len(connected)) + } + if nodes[0].ID() != connected[0] { + return fmt.Errorf("connected with a wrong peer. expected %s, got %s", nodes[0].ID(), connected[0]) + } + return nil + }, time.Second, 100*time.Millisecond)) + // Replace previously sent node with 2 different nodes. + connmanager.Notify(nodes[1:]) + // Wait until connection manager replaces node connected in the first round. + require.NoError(t, utils.Eventually(func() error { + connected := server.Nodes() + if len(connected) != target { + return fmt.Errorf("unexpected number of connected servers: %d", len(connected)) + } + switch connected[0] { + case nodes[1].ID(): + case nodes[2].ID(): + default: + return fmt.Errorf("connected with unexpected peer. got %s, expected %+v", connected[0], nodes[1:]) + } + return nil + }, time.Second, 100*time.Millisecond)) +} + +func TestConnectionChangedAfterExpiry(t *testing.T) { + server := newFakeServer() + whisper := newFakeEnvelopesEvents() + target := 1 + connmanager := NewConnectionManager(server, whisper, target) + connmanager.Start() + defer connmanager.Stop() + nodes := []*enode.Node{} + for _, n := range getNRandomNodes(t, 2) { + nodes = append(nodes, n) + } + // Send two random nodes to connection manager. + connmanager.Notify(nodes) + var initial enode.ID + // Wait until connection manager establishes connection with one node. + require.NoError(t, utils.Eventually(func() error { + nodes := server.Nodes() + if len(nodes) != target { + return fmt.Errorf("unexpected number of connected servers: %d", len(nodes)) + } + initial = nodes[0] + return nil + }, time.Second, 100*time.Millisecond)) + hash := common.Hash{1} + // Send event that history request for connected peer was sent. + select { + case whisper.input <- whisperv6.EnvelopeEvent{ + Event: whisperv6.EventMailServerRequestSent, Peer: initial, Hash: hash}: + case <-time.After(time.Second): + require.FailNow(t, "can't send a 'sent' event") + } + // And eventually expired. + select { + case whisper.input <- whisperv6.EnvelopeEvent{ + Event: whisperv6.EventMailServerRequestExpired, Peer: initial, Hash: hash}: + case <-time.After(time.Second): + require.FailNow(t, "can't send an 'expiry' event") + } + require.NoError(t, utils.Eventually(func() error { + nodes := server.Nodes() + if len(nodes) != target { + return fmt.Errorf("unexpected number of connected servers: %d", len(nodes)) + } + if nodes[0] == initial { + return fmt.Errorf("connected node wasn't changed from %s", initial) + } + return nil + }, time.Second, 100*time.Millisecond)) +} diff --git a/services/shhext/mailservers/peerstore.go b/services/shhext/mailservers/peerstore.go new file mode 100644 index 000000000..e545d9f79 --- /dev/null +++ b/services/shhext/mailservers/peerstore.go @@ -0,0 +1,67 @@ +package mailservers + +import ( + "errors" + "sync" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +var ( + // ErrNoConnected returned when mail servers are not connected. + ErrNoConnected = errors.New("no connected mail servers") +) + +// PeersProvider is an interface for requesting list of peers. +type PeersProvider interface { + Peers() []*p2p.Peer +} + +// NewPeerStore returns an instance of PeerStore. +func NewPeerStore() *PeerStore { + return &PeerStore{nodes: map[enode.ID]*enode.Node{}} +} + +// PeerStore stores list of selected mail servers and keeps N of them connected. +type PeerStore struct { + mu sync.RWMutex + nodes map[enode.ID]*enode.Node +} + +// Exist confirms that peers was added to a store. +func (ps *PeerStore) Exist(peer enode.ID) bool { + ps.mu.RLock() + defer ps.mu.RUnlock() + _, exist := ps.nodes[peer] + return exist +} + +// Get returns instance of the node with requested ID or nil if ID is not found. +func (ps *PeerStore) Get(peer enode.ID) *enode.Node { + ps.mu.RLock() + defer ps.mu.RUnlock() + return ps.nodes[peer] +} + +// Update updates peers locally. +func (ps *PeerStore) Update(nodes []*enode.Node) { + ps.mu.Lock() + defer ps.mu.Unlock() + ps.nodes = map[enode.ID]*enode.Node{} + for _, n := range nodes { + ps.nodes[n.ID()] = n + } +} + +// GetFirstConnected returns first connected peer that is also added to a peer store. +// Raises ErrNoConnected if no peers are added to a peer store. +func GetFirstConnected(provider PeersProvider, store *PeerStore) (*enode.Node, error) { + peers := provider.Peers() + for _, p := range peers { + if store.Exist(p.ID()) { + return p.Node(), nil + } + } + return nil, ErrNoConnected +} diff --git a/services/shhext/mailservers/peerstore_test.go b/services/shhext/mailservers/peerstore_test.go new file mode 100644 index 000000000..111b826c3 --- /dev/null +++ b/services/shhext/mailservers/peerstore_test.go @@ -0,0 +1,77 @@ +package mailservers + +import ( + "testing" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/stretchr/testify/require" +) + +func RandomeNode() (*enode.Node, error) { + pkey, err := crypto.GenerateKey() + if err != nil { + return nil, err + } + return enode.NewV4(&pkey.PublicKey, nil, 0, 0), nil +} + +func TestUpdateResetsInternalStorage(t *testing.T) { + store := NewPeerStore() + r1, err := RandomeNode() + require.NoError(t, err) + r2, err := RandomeNode() + require.NoError(t, err) + store.Update([]*enode.Node{r1, r2}) + require.True(t, store.Exist(r1.ID())) + require.True(t, store.Exist(r2.ID())) + store.Update([]*enode.Node{r2}) + require.False(t, store.Exist(r1.ID())) + require.True(t, store.Exist(r2.ID())) +} + +func TestGetNodeByID(t *testing.T) { + store := NewPeerStore() + r1, err := RandomeNode() + require.NoError(t, err) + store.Update([]*enode.Node{r1}) + require.Equal(t, r1, store.Get(r1.ID())) + require.Nil(t, store.Get(enode.ID{1})) +} + +type fakePeerProvider struct { + peers []*p2p.Peer +} + +func (f fakePeerProvider) Peers() []*p2p.Peer { + return f.peers +} + +func TestNoConnected(t *testing.T) { + provider := fakePeerProvider{} + store := NewPeerStore() + _, err := GetFirstConnected(provider, store) + require.EqualError(t, ErrNoConnected, err.Error()) +} + +func TestGetFirstConnected(t *testing.T) { + numPeers := 3 + nodes := make([]*enode.Node, numPeers) + peers := make([]*p2p.Peer, numPeers) + nodesMap := getNRandomNodes(t, numPeers) + i := 0 + for _, node := range nodesMap { + nodes[i] = node + peers[i] = p2p.NewPeer(node.ID(), node.ID().String(), nil) + i++ + } + store := NewPeerStore() + provider := fakePeerProvider{peers} + _, err := GetFirstConnected(provider, store) + require.EqualError(t, ErrNoConnected, err.Error()) + store.Update(nodes) + node, err := GetFirstConnected(provider, store) + require.NoError(t, err) + require.Contains(t, nodesMap, node.ID()) +} diff --git a/services/shhext/service.go b/services/shhext/service.go index 595053284..76421bc68 100644 --- a/services/shhext/service.go +++ b/services/shhext/service.go @@ -6,35 +6,26 @@ import ( "fmt" "os" "path/filepath" - "sync" - "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rpc" "github.com/status-im/status-go/services/shhext/chat" "github.com/status-im/status-go/services/shhext/dedup" + "github.com/status-im/status-go/services/shhext/mailservers" whisper "github.com/status-im/whisper/whisperv6" "github.com/syndtr/goleveldb/leveldb" ) -var errProtocolNotInitialized = errors.New("procotol is not initialized") - -// EnvelopeState in local tracker -type EnvelopeState int - const ( - // EnvelopePosted is set when envelope was added to a local whisper queue. - EnvelopePosted EnvelopeState = iota - // EnvelopeSent is set when envelope is sent to atleast one peer. - EnvelopeSent - // MailServerRequestSent is set when p2p request is sent to the mailserver - MailServerRequestSent + // defaultConnectionsTarget used in Service.Start if configured connection target is 0. + defaultConnectionsTarget = 1 ) +var errProtocolNotInitialized = errors.New("procotol is not initialized") + // EnvelopeEventsHandler used for two different event types. type EnvelopeEventsHandler interface { EnvelopeSent(common.Hash) @@ -46,7 +37,9 @@ type EnvelopeEventsHandler interface { // Service is a service that provides some additional Whisper API. type Service struct { w *whisper.Whisper + config *ServiceConfig tracker *tracker + server *p2p.Server nodeID *ecdsa.PrivateKey deduplicator *dedup.Deduplicator protocol *chat.ProtocolService @@ -54,6 +47,9 @@ type Service struct { dataDir string installationID string pfsEnabled bool + + peerStore *mailservers.PeerStore + connManager *mailservers.ConnectionManager } type ServiceConfig struct { @@ -62,6 +58,8 @@ type ServiceConfig struct { Debug bool PFSEnabled bool MailServerConfirmations bool + EnableConnectionManager bool + ConnectionTarget int } // Make sure that Service implements node.Service interface. @@ -69,28 +67,34 @@ var _ node.Service = (*Service)(nil) // New returns a new Service. dataDir is a folder path to a network-independent location func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, config *ServiceConfig) *Service { + ps := mailservers.NewPeerStore() track := &tracker{ w: w, handler: handler, cache: map[common.Hash]EnvelopeState{}, batches: map[common.Hash]map[common.Hash]struct{}{}, - mailservers: map[enode.ID]struct{}{}, + mailPeers: ps, mailServerConfirmation: config.MailServerConfirmations, } return &Service{ w: w, + config: config, tracker: track, deduplicator: dedup.NewDeduplicator(w, db), debug: config.Debug, dataDir: config.DataDir, installationID: config.InstallationID, pfsEnabled: config.PFSEnabled, + peerStore: ps, } } // UpdateMailservers updates information about selected mail servers. func (s *Service) UpdateMailservers(nodes []*enode.Node) { - s.tracker.UpdateMailservers(nodes) + s.peerStore.Update(nodes) + if s.connManager != nil { + s.connManager.Notify(nodes) + } } // Protocols returns a new protocols list. In this case, there are none. @@ -191,237 +195,26 @@ func (s *Service) APIs() []rpc.API { // Start is run when a service is started. // It does nothing in this case but is required by `node.Service` interface. func (s *Service) Start(server *p2p.Server) error { + if s.config.EnableConnectionManager { + connectionsTarget := s.config.ConnectionTarget + if connectionsTarget == 0 { + connectionsTarget = defaultConnectionsTarget + } + s.connManager = mailservers.NewConnectionManager(server, s.w, connectionsTarget) + s.connManager.Start() + } s.tracker.Start() s.nodeID = server.PrivateKey + s.server = server return nil } // Stop is run when a service is stopped. // It does nothing in this case but is required by `node.Service` interface. func (s *Service) Stop() error { + if s.config.EnableConnectionManager { + s.connManager.Stop() + } s.tracker.Stop() return nil } - -// tracker responsible for processing events for envelopes that we are interested in -// and calling specified handler. -type tracker struct { - w *whisper.Whisper - handler EnvelopeEventsHandler - mailServerConfirmation bool - - mu sync.Mutex - cache map[common.Hash]EnvelopeState - batches map[common.Hash]map[common.Hash]struct{} - - mailMu sync.Mutex - mailservers map[enode.ID]struct{} - - wg sync.WaitGroup - quit chan struct{} -} - -// Start processing events. -func (t *tracker) Start() { - t.quit = make(chan struct{}) - t.wg.Add(1) - go func() { - t.handleEnvelopeEvents() - t.wg.Done() - }() -} - -// Stop process events. -func (t *tracker) Stop() { - close(t.quit) - t.wg.Wait() -} - -// Add hash to a tracker. -func (t *tracker) Add(hash common.Hash) { - t.mu.Lock() - defer t.mu.Unlock() - t.cache[hash] = EnvelopePosted -} - -func (t *tracker) UpdateMailservers(nodes []*enode.Node) { - t.mailMu.Lock() - defer t.mailMu.Unlock() - t.mailservers = map[enode.ID]struct{}{} - for _, n := range nodes { - t.mailservers[n.ID()] = struct{}{} - } -} - -// Add request hash to a tracker. -func (t *tracker) AddRequest(hash common.Hash, timerC <-chan time.Time) { - t.mu.Lock() - defer t.mu.Unlock() - t.cache[hash] = MailServerRequestSent - go t.expireRequest(hash, timerC) -} - -func (t *tracker) expireRequest(hash common.Hash, timerC <-chan time.Time) { - select { - case <-t.quit: - return - case <-timerC: - t.handleEvent(whisper.EnvelopeEvent{ - Event: whisper.EventMailServerRequestExpired, - Hash: hash, - }) - } -} - -// handleEnvelopeEvents processes whisper envelope events -func (t *tracker) handleEnvelopeEvents() { - events := make(chan whisper.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper - sub := t.w.SubscribeEnvelopeEvents(events) - defer sub.Unsubscribe() - for { - select { - case <-t.quit: - return - case event := <-events: - t.handleEvent(event) - } - } -} - -// handleEvent based on type of the event either triggers -// confirmation handler or removes hash from tracker -func (t *tracker) handleEvent(event whisper.EnvelopeEvent) { - handlers := map[whisper.EventType]func(whisper.EnvelopeEvent){ - whisper.EventEnvelopeSent: t.handleEventEnvelopeSent, - whisper.EventEnvelopeExpired: t.handleEventEnvelopeExpired, - whisper.EventBatchAcknowledged: t.handleAcknowledgedBatch, - whisper.EventMailServerRequestCompleted: t.handleEventMailServerRequestCompleted, - whisper.EventMailServerRequestExpired: t.handleEventMailServerRequestExpired, - } - - if handler, ok := handlers[event.Event]; ok { - handler(event) - } -} - -func (t *tracker) handleEventEnvelopeSent(event whisper.EnvelopeEvent) { - if t.mailServerConfirmation { - if !t.isMailserver(event.Peer) { - return - } - } - - t.mu.Lock() - defer t.mu.Unlock() - - state, ok := t.cache[event.Hash] - // if we didn't send a message using extension - skip it - // if message was already confirmed - skip it - if !ok || state == EnvelopeSent { - return - } - if event.Batch != (common.Hash{}) { - if _, ok := t.batches[event.Batch]; !ok { - t.batches[event.Batch] = map[common.Hash]struct{}{} - } - t.batches[event.Batch][event.Hash] = struct{}{} - log.Debug("waiting for a confirmation", "batch", event.Batch) - } else { - log.Debug("envelope is sent", "hash", event.Hash, "peer", event.Peer) - t.cache[event.Hash] = EnvelopeSent - if t.handler != nil { - t.handler.EnvelopeSent(event.Hash) - } - } -} - -func (t *tracker) isMailserver(peer enode.ID) bool { - t.mailMu.Lock() - defer t.mailMu.Unlock() - if len(t.mailservers) == 0 { - log.Error("mail servers are empty") - return false - } - _, exist := t.mailservers[peer] - if !exist { - log.Debug("confirmation received not from a mail server is skipped", "peer", peer) - return false - } - return true -} - -func (t *tracker) handleAcknowledgedBatch(event whisper.EnvelopeEvent) { - if t.mailServerConfirmation { - if !t.isMailserver(event.Peer) { - return - } - } - - t.mu.Lock() - defer t.mu.Unlock() - - envelopes, ok := t.batches[event.Batch] - if !ok { - log.Debug("batch is not found", "batch", event.Batch) - } - log.Debug("received a confirmation", "batch", event.Batch, "peer", event.Peer) - for hash := range envelopes { - state, ok := t.cache[hash] - if !ok || state == EnvelopeSent { - continue - } - t.cache[hash] = EnvelopeSent - if t.handler != nil { - t.handler.EnvelopeSent(hash) - } - } - delete(t.batches, event.Batch) -} - -func (t *tracker) handleEventEnvelopeExpired(event whisper.EnvelopeEvent) { - t.mu.Lock() - defer t.mu.Unlock() - - if state, ok := t.cache[event.Hash]; ok { - delete(t.cache, event.Hash) - if state == EnvelopeSent { - return - } - log.Debug("envelope expired", "hash", event.Hash, "state", state) - if t.handler != nil { - t.handler.EnvelopeExpired(event.Hash) - } - } -} - -func (t *tracker) handleEventMailServerRequestCompleted(event whisper.EnvelopeEvent) { - t.mu.Lock() - defer t.mu.Unlock() - - state, ok := t.cache[event.Hash] - if !ok || state != MailServerRequestSent { - return - } - log.Debug("mailserver response received", "hash", event.Hash) - delete(t.cache, event.Hash) - if t.handler != nil { - if resp, ok := event.Data.(*whisper.MailServerResponse); ok { - t.handler.MailServerRequestCompleted(event.Hash, resp.LastEnvelopeHash, resp.Cursor, resp.Error) - } - } -} - -func (t *tracker) handleEventMailServerRequestExpired(event whisper.EnvelopeEvent) { - t.mu.Lock() - defer t.mu.Unlock() - - state, ok := t.cache[event.Hash] - if !ok || state != MailServerRequestSent { - return - } - log.Debug("mailserver response expired", "hash", event.Hash) - delete(t.cache, event.Hash) - if t.handler != nil { - t.handler.MailServerRequestExpired(event.Hash) - } -} diff --git a/services/shhext/service_test.go b/services/shhext/service_test.go index 254e5c40b..969d96e7f 100644 --- a/services/shhext/service_test.go +++ b/services/shhext/service_test.go @@ -94,6 +94,7 @@ func (s *ShhExtSuite) SetupTest() { Debug: true, PFSEnabled: false, MailServerConfirmations: true, + ConnectionTarget: 10, } s.services[i] = New(s.whisper[i], nil, nil, config) s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) { @@ -236,12 +237,12 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() { NoDiscovery: true, }, }) // in-memory node as no data dir - s.NoError(err) + s.Require().NoError(err) err = aNode.Register(func(*node.ServiceContext) (node.Service, error) { return shh, nil }) - s.NoError(err) + s.Require().NoError(err) err = aNode.Start() - s.NoError(err) + s.Require().NoError(err) defer func() { err := aNode.Stop(); s.NoError(err) }() mock := newHandlerMock(1) @@ -252,6 +253,7 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() { PFSEnabled: false, } service := New(shh, mock, nil, config) + s.Require().NoError(service.Start(aNode.Server())) api := NewPublicAPI(service) // with a peer acting as a mailserver @@ -263,41 +265,40 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() { ListenAddr: ":0", }, }) // in-memory node as no data dir - s.NoError(err) + s.Require().NoError(err) err = mailNode.Register(func(*node.ServiceContext) (node.Service, error) { return whisper.New(nil), nil }) s.NoError(err) err = mailNode.Start() - s.NoError(err) + s.Require().NoError(err) defer func() { s.NoError(mailNode.Stop()) }() // add mailPeer as a peer waitErr := helpers.WaitForPeerAsync(aNode.Server(), mailNode.Server().Self().String(), p2p.PeerEventTypeAdd, time.Second) aNode.Server().AddPeer(mailNode.Server().Self()) - s.NoError(<-waitErr) + s.Require().NoError(<-waitErr) var hash []byte // send a request with a symmetric key symKeyID, symKeyErr := shh.AddSymKeyFromPassword("some-pass") - s.NoError(symKeyErr) + s.Require().NoError(symKeyErr) hash, err = api.RequestMessages(context.TODO(), MessagesRequest{ MailServerPeer: mailNode.Server().Self().String(), SymKeyID: symKeyID, }) - s.NoError(err) - s.NotNil(hash) - s.Contains(api.service.tracker.cache, common.BytesToHash(hash)) - + s.Require().NoError(err) + s.Require().NotNil(hash) + s.Require().NoError(waitForHashInTracker(api.service.tracker, common.BytesToHash(hash), MailServerRequestSent, time.Second)) // Send a request without a symmetric key. In this case, // a public key extracted from MailServerPeer will be used. hash, err = api.RequestMessages(context.TODO(), MessagesRequest{ MailServerPeer: mailNode.Server().Self().String(), }) - s.NoError(err) - s.NotNil(hash) - s.Contains(api.service.tracker.cache, common.BytesToHash(hash)) + s.Require().NoError(err) + s.Require().NotNil(hash) + s.Require().NoError(waitForHashInTracker(api.service.tracker, common.BytesToHash(hash), MailServerRequestSent, time.Second)) } func (s *ShhExtSuite) TestDebugPostSync() { @@ -398,141 +399,17 @@ func (s *ShhExtSuite) TearDown() { } } -var ( - testHash = common.Hash{0x01} -) - -func TestTrackerSuite(t *testing.T) { - suite.Run(t, new(TrackerSuite)) -} - -type TrackerSuite struct { - suite.Suite - - tracker *tracker -} - -func (s *TrackerSuite) SetupTest() { - s.tracker = &tracker{ - cache: map[common.Hash]EnvelopeState{}, - batches: map[common.Hash]map[common.Hash]struct{}{}, - mailservers: map[enode.ID]struct{}{}, - mailServerConfirmation: true, - } -} - -func (s *TrackerSuite) TestConfirmed() { - testPeer := enode.ID{1} - s.tracker.mailservers[testPeer] = struct{}{} - s.tracker.Add(testHash) - s.Contains(s.tracker.cache, testHash) - s.Equal(EnvelopePosted, s.tracker.cache[testHash]) - s.tracker.handleEvent(whisper.EnvelopeEvent{ - Event: whisper.EventEnvelopeSent, - Hash: testHash, - Peer: testPeer, - }) - s.Contains(s.tracker.cache, testHash) - s.Equal(EnvelopeSent, s.tracker.cache[testHash]) -} - -func (s *TrackerSuite) TestConfirmedWithAcknowledge() { - testBatch := common.Hash{1} - testPeer := enode.ID{1} - s.tracker.Add(testHash) - s.tracker.mailservers[testPeer] = struct{}{} - s.Contains(s.tracker.cache, testHash) - s.Equal(EnvelopePosted, s.tracker.cache[testHash]) - s.tracker.handleEvent(whisper.EnvelopeEvent{ - Event: whisper.EventEnvelopeSent, - Hash: testHash, - Batch: testBatch, - Peer: testPeer, - }) - s.Equal(EnvelopePosted, s.tracker.cache[testHash]) - s.tracker.handleEvent(whisper.EnvelopeEvent{ - Event: whisper.EventBatchAcknowledged, - Batch: testBatch, - Peer: testPeer, - }) - s.Contains(s.tracker.cache, testHash) - s.Equal(EnvelopeSent, s.tracker.cache[testHash]) -} - -func (s *TrackerSuite) TestIgnored() { - s.tracker.handleEvent(whisper.EnvelopeEvent{ - Event: whisper.EventEnvelopeSent, - Hash: testHash, - }) - s.NotContains(s.tracker.cache, testHash) -} - -func (s *TrackerSuite) TestRemoved() { - s.tracker.Add(testHash) - s.Contains(s.tracker.cache, testHash) - s.tracker.handleEvent(whisper.EnvelopeEvent{ - Event: whisper.EventEnvelopeExpired, - Hash: testHash, - }) - s.NotContains(s.tracker.cache, testHash) -} - -func (s *TrackerSuite) TestRequestCompleted() { - mock := newHandlerMock(1) - s.tracker.handler = mock - s.tracker.AddRequest(testHash, time.After(defaultRequestTimeout*time.Second)) - s.Contains(s.tracker.cache, testHash) - s.Equal(MailServerRequestSent, s.tracker.cache[testHash]) - s.tracker.handleEvent(whisper.EnvelopeEvent{ - Event: whisper.EventMailServerRequestCompleted, - Hash: testHash, - Data: &whisper.MailServerResponse{}, - }) - select { - case requestID := <-mock.requestsCompleted: - s.Equal(testHash, requestID) - s.NotContains(s.tracker.cache, testHash) - case <-time.After(10 * time.Second): - s.Fail("timed out while waiting for a request to be completed") - } -} - -func (s *TrackerSuite) TestRequestFailed() { - mock := newHandlerMock(1) - s.tracker.handler = mock - s.tracker.AddRequest(testHash, time.After(defaultRequestTimeout*time.Second)) - s.Contains(s.tracker.cache, testHash) - s.Equal(MailServerRequestSent, s.tracker.cache[testHash]) - s.tracker.handleEvent(whisper.EnvelopeEvent{ - Event: whisper.EventMailServerRequestCompleted, - Hash: testHash, - Data: &whisper.MailServerResponse{Error: errors.New("test error")}, - }) - select { - case requestID := <-mock.requestsFailed: - s.Equal(testHash, requestID) - s.NotContains(s.tracker.cache, testHash) - case <-time.After(10 * time.Second): - s.Fail("timed out while waiting for a request to be failed") - } -} - -func (s *TrackerSuite) TestRequestExpiration() { - mock := newHandlerMock(1) - s.tracker.handler = mock - c := make(chan time.Time) - s.tracker.AddRequest(testHash, c) - s.Contains(s.tracker.cache, testHash) - s.Equal(MailServerRequestSent, s.tracker.cache[testHash]) - s.tracker.handleEvent(whisper.EnvelopeEvent{ - Event: whisper.EventMailServerRequestExpired, - Hash: testHash, - }) - select { - case requestID := <-mock.requestsExpired: - s.Equal(testHash, requestID) - s.NotContains(s.tracker.cache, testHash) - case <-time.After(10 * time.Second): - s.Fail("timed out while waiting for request expiration") +func waitForHashInTracker(track *tracker, hash common.Hash, state EnvelopeState, deadline time.Duration) error { + after := time.After(deadline) + ticker := time.Tick(100 * time.Millisecond) + for { + select { + case <-after: + return fmt.Errorf("failed while waiting for %s to get into state %d", hash, state) + case <-ticker: + if track.GetState(hash) == state { + return nil + } + } } } diff --git a/services/shhext/tracker.go b/services/shhext/tracker.go new file mode 100644 index 000000000..af1c73bca --- /dev/null +++ b/services/shhext/tracker.go @@ -0,0 +1,218 @@ +package shhext + +import ( + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/status-im/status-go/services/shhext/mailservers" + whisper "github.com/status-im/whisper/whisperv6" +) + +// EnvelopeState in local tracker +type EnvelopeState int + +const ( + // NotRegistered returned if asked hash wasn't registered in the tracker. + NotRegistered EnvelopeState = -1 + // EnvelopePosted is set when envelope was added to a local whisper queue. + EnvelopePosted EnvelopeState = iota + // EnvelopeSent is set when envelope is sent to atleast one peer. + EnvelopeSent + // MailServerRequestSent is set when p2p request is sent to the mailserver + MailServerRequestSent +) + +// tracker responsible for processing events for envelopes that we are interested in +// and calling specified handler. +type tracker struct { + w *whisper.Whisper + handler EnvelopeEventsHandler + mailServerConfirmation bool + + mu sync.Mutex + cache map[common.Hash]EnvelopeState + batches map[common.Hash]map[common.Hash]struct{} + + mailPeers *mailservers.PeerStore + + wg sync.WaitGroup + quit chan struct{} +} + +// Start processing events. +func (t *tracker) Start() { + t.quit = make(chan struct{}) + t.wg.Add(1) + go func() { + t.handleEnvelopeEvents() + t.wg.Done() + }() +} + +// Stop process events. +func (t *tracker) Stop() { + close(t.quit) + t.wg.Wait() +} + +// Add hash to a tracker. +func (t *tracker) Add(hash common.Hash) { + t.mu.Lock() + defer t.mu.Unlock() + t.cache[hash] = EnvelopePosted +} + +func (t *tracker) GetState(hash common.Hash) EnvelopeState { + t.mu.Lock() + defer t.mu.Unlock() + state, exist := t.cache[hash] + if !exist { + return NotRegistered + } + return state +} + +// handleEnvelopeEvents processes whisper envelope events +func (t *tracker) handleEnvelopeEvents() { + events := make(chan whisper.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper + sub := t.w.SubscribeEnvelopeEvents(events) + defer sub.Unsubscribe() + for { + select { + case <-t.quit: + return + case event := <-events: + t.handleEvent(event) + } + } +} + +// handleEvent based on type of the event either triggers +// confirmation handler or removes hash from tracker +func (t *tracker) handleEvent(event whisper.EnvelopeEvent) { + handlers := map[whisper.EventType]func(whisper.EnvelopeEvent){ + whisper.EventEnvelopeSent: t.handleEventEnvelopeSent, + whisper.EventEnvelopeExpired: t.handleEventEnvelopeExpired, + whisper.EventBatchAcknowledged: t.handleAcknowledgedBatch, + whisper.EventMailServerRequestSent: t.handleRequestSent, + whisper.EventMailServerRequestCompleted: t.handleEventMailServerRequestCompleted, + whisper.EventMailServerRequestExpired: t.handleEventMailServerRequestExpired, + } + + if handler, ok := handlers[event.Event]; ok { + handler(event) + } +} + +func (t *tracker) handleEventEnvelopeSent(event whisper.EnvelopeEvent) { + t.mu.Lock() + defer t.mu.Unlock() + + state, ok := t.cache[event.Hash] + // if we didn't send a message using extension - skip it + // if message was already confirmed - skip it + if !ok || state == EnvelopeSent { + return + } + log.Debug("envelope is sent", "hash", event.Hash, "peer", event.Peer) + if event.Batch != (common.Hash{}) { + if _, ok := t.batches[event.Batch]; !ok { + t.batches[event.Batch] = map[common.Hash]struct{}{} + } + t.batches[event.Batch][event.Hash] = struct{}{} + log.Debug("waiting for a confirmation", "batch", event.Batch) + } else { + t.cache[event.Hash] = EnvelopeSent + if t.handler != nil { + t.handler.EnvelopeSent(event.Hash) + } + } +} + +func (t *tracker) isMailserver(peer enode.ID) bool { + return t.mailPeers.Exist(peer) +} + +func (t *tracker) handleAcknowledgedBatch(event whisper.EnvelopeEvent) { + if t.mailServerConfirmation { + if !t.isMailserver(event.Peer) { + return + } + } + + t.mu.Lock() + defer t.mu.Unlock() + + envelopes, ok := t.batches[event.Batch] + if !ok { + log.Debug("batch is not found", "batch", event.Batch) + } + log.Debug("received a confirmation", "batch", event.Batch, "peer", event.Peer) + for hash := range envelopes { + state, ok := t.cache[hash] + if !ok || state == EnvelopeSent { + continue + } + t.cache[hash] = EnvelopeSent + if t.handler != nil { + t.handler.EnvelopeSent(hash) + } + } + delete(t.batches, event.Batch) +} + +func (t *tracker) handleEventEnvelopeExpired(event whisper.EnvelopeEvent) { + t.mu.Lock() + defer t.mu.Unlock() + + if state, ok := t.cache[event.Hash]; ok { + delete(t.cache, event.Hash) + if state == EnvelopeSent { + return + } + log.Debug("envelope expired", "hash", event.Hash, "state", state) + if t.handler != nil { + t.handler.EnvelopeExpired(event.Hash) + } + } +} + +func (t *tracker) handleRequestSent(event whisper.EnvelopeEvent) { + t.mu.Lock() + defer t.mu.Unlock() + t.cache[event.Hash] = MailServerRequestSent +} + +func (t *tracker) handleEventMailServerRequestCompleted(event whisper.EnvelopeEvent) { + t.mu.Lock() + defer t.mu.Unlock() + + state, ok := t.cache[event.Hash] + if !ok || state != MailServerRequestSent { + return + } + log.Debug("mailserver response received", "hash", event.Hash) + delete(t.cache, event.Hash) + if t.handler != nil { + if resp, ok := event.Data.(*whisper.MailServerResponse); ok { + t.handler.MailServerRequestCompleted(event.Hash, resp.LastEnvelopeHash, resp.Cursor, resp.Error) + } + } +} + +func (t *tracker) handleEventMailServerRequestExpired(event whisper.EnvelopeEvent) { + t.mu.Lock() + defer t.mu.Unlock() + + state, ok := t.cache[event.Hash] + if !ok || state != MailServerRequestSent { + return + } + log.Debug("mailserver response expired", "hash", event.Hash) + delete(t.cache, event.Hash) + if t.handler != nil { + t.handler.MailServerRequestExpired(event.Hash) + } +} diff --git a/services/shhext/tracker_test.go b/services/shhext/tracker_test.go new file mode 100644 index 000000000..718e33f40 --- /dev/null +++ b/services/shhext/tracker_test.go @@ -0,0 +1,144 @@ +package shhext + +import ( + "errors" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/status-im/status-go/services/shhext/mailservers" + whisper "github.com/status-im/whisper/whisperv6" + "github.com/stretchr/testify/suite" +) + +var ( + testHash = common.Hash{0x01} +) + +func TestTrackerSuite(t *testing.T) { + suite.Run(t, new(TrackerSuite)) +} + +type TrackerSuite struct { + suite.Suite + + tracker *tracker +} + +func (s *TrackerSuite) SetupTest() { + s.tracker = &tracker{ + cache: map[common.Hash]EnvelopeState{}, + batches: map[common.Hash]map[common.Hash]struct{}{}, + mailPeers: mailservers.NewPeerStore(), + mailServerConfirmation: true, + } +} + +func (s *TrackerSuite) TestConfirmed() { + s.tracker.Add(testHash) + s.Contains(s.tracker.cache, testHash) + s.Equal(EnvelopePosted, s.tracker.cache[testHash]) + s.tracker.handleEvent(whisper.EnvelopeEvent{ + Event: whisper.EventEnvelopeSent, + Hash: testHash, + }) + s.Contains(s.tracker.cache, testHash) + s.Equal(EnvelopeSent, s.tracker.cache[testHash]) +} + +func (s *TrackerSuite) TestConfirmedWithAcknowledge() { + testBatch := common.Hash{1} + pkey, err := crypto.GenerateKey() + s.Require().NoError(err) + node := enode.NewV4(&pkey.PublicKey, nil, 0, 0) + s.tracker.mailPeers.Update([]*enode.Node{node}) + s.tracker.Add(testHash) + s.Contains(s.tracker.cache, testHash) + s.Equal(EnvelopePosted, s.tracker.cache[testHash]) + s.tracker.handleEvent(whisper.EnvelopeEvent{ + Event: whisper.EventEnvelopeSent, + Hash: testHash, + Batch: testBatch, + }) + s.Equal(EnvelopePosted, s.tracker.cache[testHash]) + s.tracker.handleEvent(whisper.EnvelopeEvent{ + Event: whisper.EventBatchAcknowledged, + Batch: testBatch, + Peer: node.ID(), + }) + s.Contains(s.tracker.cache, testHash) + s.Equal(EnvelopeSent, s.tracker.cache[testHash]) +} + +func (s *TrackerSuite) TestIgnored() { + s.tracker.handleEvent(whisper.EnvelopeEvent{ + Event: whisper.EventEnvelopeSent, + Hash: testHash, + }) + s.NotContains(s.tracker.cache, testHash) +} + +func (s *TrackerSuite) TestRemoved() { + s.tracker.Add(testHash) + s.Contains(s.tracker.cache, testHash) + s.tracker.handleEvent(whisper.EnvelopeEvent{ + Event: whisper.EventEnvelopeExpired, + Hash: testHash, + }) + s.NotContains(s.tracker.cache, testHash) +} + +func (s *TrackerSuite) TestRequestCompleted() { + mock := newHandlerMock(1) + s.tracker.handler = mock + s.tracker.cache[testHash] = MailServerRequestSent + s.tracker.handleEvent(whisper.EnvelopeEvent{ + Event: whisper.EventMailServerRequestCompleted, + Hash: testHash, + Data: &whisper.MailServerResponse{}, + }) + select { + case requestID := <-mock.requestsCompleted: + s.Equal(testHash, requestID) + s.NotContains(s.tracker.cache, testHash) + case <-time.After(10 * time.Second): + s.Fail("timed out while waiting for a request to be completed") + } +} + +func (s *TrackerSuite) TestRequestFailed() { + mock := newHandlerMock(1) + s.tracker.handler = mock + s.tracker.cache[testHash] = MailServerRequestSent + s.tracker.handleEvent(whisper.EnvelopeEvent{ + Event: whisper.EventMailServerRequestCompleted, + Hash: testHash, + Data: &whisper.MailServerResponse{Error: errors.New("test error")}, + }) + select { + case requestID := <-mock.requestsFailed: + s.Equal(testHash, requestID) + s.NotContains(s.tracker.cache, testHash) + case <-time.After(10 * time.Second): + s.Fail("timed out while waiting for a request to be failed") + } +} + +func (s *TrackerSuite) TestRequestExpiration() { + mock := newHandlerMock(1) + s.tracker.handler = mock + s.tracker.cache[testHash] = MailServerRequestSent + s.tracker.handleEvent(whisper.EnvelopeEvent{ + Event: whisper.EventMailServerRequestExpired, + Hash: testHash, + }) + select { + case requestID := <-mock.requestsExpired: + s.Equal(testHash, requestID) + s.NotContains(s.tracker.cache, testHash) + case <-time.After(10 * time.Second): + s.Fail("timed out while waiting for request expiration") + } +} diff --git a/t/e2e/whisper/mailservice_test.go b/t/e2e/whisper/mailservice_test.go index 3a021e35b..96ea8df50 100644 --- a/t/e2e/whisper/mailservice_test.go +++ b/t/e2e/whisper/mailservice_test.go @@ -40,7 +40,9 @@ func (s *MailServiceSuite) TestShhextRequestMessagesRPCMethodAvailability() { // This error means that the method is available through inproc communication // as the validation of params occurred. - err := client.Call(nil, "shhext_requestMessages", map[string]interface{}{}) + err := client.Call(nil, "shhext_requestMessages", map[string]interface{}{ + "mailServerPeer": "xxx", + }) r.EqualError(err, `invalid mailServerPeer value: invalid URL scheme, want "enode"`) // Do the same but using HTTP interface. diff --git a/vendor/github.com/status-im/whisper/whisperv6/events.go b/vendor/github.com/status-im/whisper/whisperv6/events.go index bfbce00c7..2faabe1f7 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/events.go +++ b/vendor/github.com/status-im/whisper/whisperv6/events.go @@ -17,9 +17,13 @@ const ( EventBatchAcknowledged EventType = "batch.acknowleged" // EventEnvelopeAvailable fires when envelop is available for filters EventEnvelopeAvailable EventType = "envelope.available" + // EventMailServerRequestSent fires when such request is sent. + EventMailServerRequestSent EventType = "mailserver.request.sent" // EventMailServerRequestCompleted fires after mailserver sends all the requested messages EventMailServerRequestCompleted EventType = "mailserver.request.completed" - // EventMailServerRequestExpired fires after mailserver the request TTL ends + // EventMailServerRequestExpired fires after mailserver the request TTL ends. + // This event is independent and concurrent to EventMailServerRequestCompleted. + // Request should be considered as expired only if expiry event was received first. EventMailServerRequestExpired EventType = "mailserver.request.expired" // EventMailServerEnvelopeArchived fires after an envelope has been archived EventMailServerEnvelopeArchived EventType = "mailserver.envelope.archived" diff --git a/vendor/github.com/status-im/whisper/whisperv6/whisper.go b/vendor/github.com/status-im/whisper/whisperv6/whisper.go index cbe44c48a..c8bed7564 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/whisper.go +++ b/vendor/github.com/status-im/whisper/whisperv6/whisper.go @@ -407,12 +407,40 @@ func (whisper *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error { // which are not supposed to be forwarded any further. // The whisper protocol is agnostic of the format and contents of envelope. func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error { + return whisper.RequestHistoricMessagesWithTimeout(peerID, envelope, 0) +} + +func (whisper *Whisper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope *Envelope, timeout time.Duration) error { p, err := whisper.getPeer(peerID) if err != nil { return err } + whisper.envelopeFeed.Send(EnvelopeEvent{ + Peer: p.peer.ID(), + Hash: envelope.Hash(), + Event: EventMailServerRequestSent, + }) p.trusted = true - return p2p.Send(p.ws, p2pRequestCode, envelope) + err = p2p.Send(p.ws, p2pRequestCode, envelope) + if timeout != 0 { + go whisper.expireRequestHistoricMessages(p.peer.ID(), envelope.Hash(), timeout) + } + return err +} + +func (whisper *Whisper) expireRequestHistoricMessages(peer enode.ID, hash common.Hash, timeout time.Duration) { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case <-whisper.quit: + return + case <-timer.C: + whisper.envelopeFeed.Send(EnvelopeEvent{ + Peer: peer, + Hash: hash, + Event: EventMailServerRequestExpired, + }) + } } func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, payload []byte) error {