Mail peer store and connection manager (#1295)

This change implements connection manager that monitors 3 types of events:
1. update of the selected mail servers
2. disconnect from a mail server
3. errors for requesting mail history

When selected mail servers provided we will try to connect with as many as possible, and later disconnect the surplus. For example if we want to connect with one mail server and 3 were selected, we try to connect with all (3), and later disconnect with 2. It will to establish connection with live mail server faster.

If mail server disconnects we will choose any other mail server from the list of selected. Unless we have only one mail server. In such case we don't have any other choice and we will leave things as is.

If request for history was expired we will disconnect such peer and try to find another one. We will follow same rules as described above.

We will have two components that will rely on this logic:
1. requesting history

If target peer is provided we will use that peer, otherwise we will request history from any selected mail server that is connected at the time of request.

2. confirmation from selected mail server

Confirmation from any selected mail server will bee used to send a feedback that envelope was sent.

I will add several extensions, but probably in separate PRs:
1. prioritize connection with mail server that was used before reboot
2. disconnect from mail servers if history request wasn't expired but failed.
3. wait some time in RequestsMessage RPC to establish connection with any mail server

Currently this feature is hidden, as certain changes will be necessary in status-react. 

partially implements: https://github.com/status-im/status-go/issues/1285
This commit is contained in:
Dmitry Shulyak 2018-12-05 15:57:05 +02:00 committed by GitHub
parent afc3017e07
commit a609b468fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1135 additions and 405 deletions

6
Gopkg.lock generated
View File

@ -822,12 +822,12 @@
revision = "fbcc46a78cd43fef95a110df664aab513116a850" revision = "fbcc46a78cd43fef95a110df664aab513116a850"
[[projects]] [[projects]]
digest = "1:6cb252f27feb57ef0e8406556c259d903c0ecff2ab0d2200ca85773b3561777d" digest = "1:5c62af344925b846377386dec72e06eb3e1e15222542b3d22fe0f0da75c7f090"
name = "github.com/status-im/whisper" name = "github.com/status-im/whisper"
packages = ["whisperv6"] packages = ["whisperv6"]
pruneopts = "NUT" pruneopts = "NUT"
revision = "76c24476436f0cf832021be98316a4ee62cc83cc" revision = "96d2199ed511430c642d877afe7bacaac5f37426"
version = "v1.4.0" version = "v1.4.1"
[[projects]] [[projects]]
digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e" digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e"

View File

@ -29,7 +29,7 @@
[[constraint]] [[constraint]]
name = "github.com/status-im/whisper" name = "github.com/status-im/whisper"
version = "=v1.4.0" version = "=v1.4.1"
[[override]] [[override]]
name = "github.com/golang/protobuf" name = "github.com/golang/protobuf"

View File

@ -173,7 +173,7 @@ func verifyMailserverBehavior(mailserverNode *enode.Node) {
Limit: 1, Limit: 1,
Topic: topic, Topic: topic,
SymKeyID: mailServerKeyID, SymKeyID: mailServerKeyID,
Timeout: time.Duration(*timeout) * time.Second, Timeout: time.Duration(*timeout),
}) })
if err != nil { if err != nil {
logger.Error("Error requesting historic messages from mailserver", "error", err) logger.Error("Error requesting historic messages from mailserver", "error", err)
@ -333,6 +333,8 @@ func waitForMailServerResponse(events chan whisper.EnvelopeEvent, requestID comm
func decodeMailServerResponse(event whisper.EnvelopeEvent) (*whisper.MailServerResponse, error) { func decodeMailServerResponse(event whisper.EnvelopeEvent) (*whisper.MailServerResponse, error) {
switch event.Event { switch event.Event {
case whisper.EventMailServerRequestSent:
return nil, nil
case whisper.EventMailServerRequestCompleted: case whisper.EventMailServerRequestCompleted:
resp, ok := event.Data.(*whisper.MailServerResponse) resp, ok := event.Data.(*whisper.MailServerResponse)
if !ok { if !ok {

View File

@ -17,6 +17,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/services/shhext/chat" "github.com/status-im/status-go/services/shhext/chat"
"github.com/status-im/status-go/services/shhext/mailservers"
whisper "github.com/status-im/whisper/whisperv6" whisper "github.com/status-im/whisper/whisperv6"
) )
@ -150,6 +151,13 @@ func (api *PublicAPI) Post(ctx context.Context, req whisper.NewMessage) (hash he
return hash, err return hash, err
} }
func (api *PublicAPI) getPeer(rawurl string) (*enode.Node, error) {
if len(rawurl) == 0 {
return mailservers.GetFirstConnected(api.service.server, api.service.peerStore)
}
return enode.ParseV4(rawurl)
}
// RequestMessages sends a request for historic messages to a MailServer. // RequestMessages sends a request for historic messages to a MailServer.
func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hexutil.Bytes, error) { func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hexutil.Bytes, error) {
api.log.Info("RequestMessages", "request", r) api.log.Info("RequestMessages", "request", r)
@ -161,9 +169,7 @@ func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hex
return nil, fmt.Errorf("Query range is invalid: from > to (%d > %d)", r.From, r.To) return nil, fmt.Errorf("Query range is invalid: from > to (%d > %d)", r.From, r.To)
} }
var err error mailServerNode, err := api.getPeer(r.MailServerPeer)
mailServerNode, err := enode.ParseV4(r.MailServerPeer)
if err != nil { if err != nil {
return nil, fmt.Errorf("%v: %v", ErrInvalidMailServerPeer, err) return nil, fmt.Errorf("%v: %v", ErrInvalidMailServerPeer, err)
} }
@ -199,13 +205,10 @@ func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hex
return nil, err return nil, err
} }
hash := envelope.Hash() if err := shh.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, r.Timeout*time.Second); err != nil {
if err := shh.RequestHistoricMessages(mailServerNode.ID().Bytes(), envelope); err != nil {
return nil, err return nil, err
} }
hash := envelope.Hash()
api.service.tracker.AddRequest(hash, time.After(r.Timeout*time.Second))
return hash[:], nil return hash[:], nil
} }

View File

@ -0,0 +1,202 @@
package mailservers
import (
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/whisper/whisperv6"
)
const (
peerEventsBuffer = 10 // sufficient buffer to avoid blocking a p2p feed.
whisperEventsBuffer = 20 // sufficient buffer to avod blocking a whisper envelopes feed.
)
// PeerAdderRemover is an interface for adding or removing peers.
type PeerAdderRemover interface {
AddPeer(node *enode.Node)
RemovePeer(node *enode.Node)
}
// PeerEventsSubscriber interface to subscribe for p2p.PeerEvent's.
type PeerEventsSubscriber interface {
SubscribeEvents(chan *p2p.PeerEvent) event.Subscription
}
// EnvelopeEventSubscbriber interface to subscribe for whisper.EnvelopeEvent's.
type EnvelopeEventSubscbriber interface {
SubscribeEnvelopeEvents(chan<- whisperv6.EnvelopeEvent) event.Subscription
}
type p2pServer interface {
PeerAdderRemover
PeerEventsSubscriber
}
// NewConnectionManager creates an instance of ConnectionManager.
func NewConnectionManager(server p2pServer, whisper EnvelopeEventSubscbriber, target int) *ConnectionManager {
return &ConnectionManager{
server: server,
whisper: whisper,
connectedTarget: target,
notifications: make(chan []*enode.Node),
}
}
// ConnectionManager manages keeps target of peers connected.
type ConnectionManager struct {
wg sync.WaitGroup
quit chan struct{}
server p2pServer
whisper EnvelopeEventSubscbriber
notifications chan []*enode.Node
connectedTarget int
}
// Notify sends a non-blocking notification about new nodes.
func (ps *ConnectionManager) Notify(nodes []*enode.Node) {
ps.wg.Add(1)
go func() {
select {
case ps.notifications <- nodes:
case <-ps.quit:
}
ps.wg.Done()
}()
}
// Start subscribes to a p2p server and handles new peers and state updates for those peers.
func (ps *ConnectionManager) Start() {
ps.quit = make(chan struct{})
ps.wg.Add(1)
go func() {
current := map[enode.ID]*enode.Node{}
connected := map[enode.ID]struct{}{}
events := make(chan *p2p.PeerEvent, peerEventsBuffer)
sub := ps.server.SubscribeEvents(events)
whisperEvents := make(chan whisperv6.EnvelopeEvent, whisperEventsBuffer)
whisperSub := ps.whisper.SubscribeEnvelopeEvents(whisperEvents)
requests := map[common.Hash]struct{}{}
for {
select {
case <-ps.quit:
sub.Unsubscribe()
whisperSub.Unsubscribe()
ps.wg.Done()
return
case err := <-sub.Err():
log.Error("retry after error subscribing to p2p events", "error", err)
sub = ps.server.SubscribeEvents(events)
case err := <-whisperSub.Err():
log.Error("retry after error suscribing to whisper events", "error", err)
whisperSub = ps.whisper.SubscribeEnvelopeEvents(whisperEvents)
case newNodes := <-ps.notifications:
replacement := map[enode.ID]*enode.Node{}
for _, n := range newNodes {
replacement[n.ID()] = n
}
replaceNodes(ps.server, ps.connectedTarget, connected, current, replacement)
current = replacement
case ev := <-events:
switch ev.Type {
case p2p.PeerEventTypeAdd:
log.Debug("connected to a mailserver", "address", ev.Peer)
nodeAdded(ps.server, ev.Peer, ps.connectedTarget, connected, current)
case p2p.PeerEventTypeDrop:
log.Debug("mailserver disconnected", "address", ev.Peer)
nodeDisconnected(ps.server, ev.Peer, ps.connectedTarget, connected, current)
}
case ev := <-whisperEvents:
// TODO what about completed but with error? what about expired envelopes?
switch ev.Event {
case whisperv6.EventMailServerRequestSent:
requests[ev.Hash] = struct{}{}
case whisperv6.EventMailServerRequestCompleted:
delete(requests, ev.Hash)
case whisperv6.EventMailServerRequestExpired:
_, exist := requests[ev.Hash]
if !exist {
continue
}
log.Debug("request to a mail server expired, disconncet a peer", "address", ev.Peer)
nodeDisconnected(ps.server, ev.Peer, ps.connectedTarget, connected, current)
}
}
}
}()
}
// Stop gracefully closes all background goroutines and waits until they finish.
func (ps *ConnectionManager) Stop() {
if ps.quit == nil {
return
}
select {
case <-ps.quit:
return
default:
}
close(ps.quit)
ps.wg.Wait()
ps.quit = nil
}
func replaceNodes(srv PeerAdderRemover, target int, connected map[enode.ID]struct{}, old, new map[enode.ID]*enode.Node) {
for nid, n := range old {
if _, exist := new[nid]; !exist {
if _, exist := connected[nid]; exist {
delete(connected, nid)
}
srv.RemovePeer(n)
}
}
if len(connected) < target {
for _, n := range new {
srv.AddPeer(n)
}
}
}
func nodeAdded(srv PeerAdderRemover, peer enode.ID, target int, connected map[enode.ID]struct{}, nodes map[enode.ID]*enode.Node) {
n, exist := nodes[peer]
if !exist {
return
}
if len(connected) == target {
srv.RemovePeer(n)
} else {
connected[n.ID()] = struct{}{}
}
}
func nodeDisconnected(srv PeerAdderRemover, peer enode.ID, target int, connected map[enode.ID]struct{}, nodes map[enode.ID]*enode.Node) {
n, exist := nodes[peer] // unrelated event
if !exist {
return
}
_, exist = connected[peer] // check if already disconnected
if !exist {
return
}
if len(nodes) == 1 { // keep node connected if we don't have another choice
return
}
srv.RemovePeer(n) // remove peer permanently, otherwise p2p.Server will try to reconnect
delete(connected, peer)
if len(connected) < target { // try to connect with any other selected (but not connected) node
for nid, n := range nodes {
_, exist := connected[nid]
if exist || peer == nid {
continue
}
srv.AddPeer(n)
}
}
}

View File

@ -0,0 +1,313 @@
package mailservers
import (
"fmt"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/t/utils"
"github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/require"
)
type fakePeerEvents struct {
mu sync.Mutex
nodes map[enode.ID]struct{}
input chan *p2p.PeerEvent
}
func (f *fakePeerEvents) Nodes() []enode.ID {
f.mu.Lock()
rst := make([]enode.ID, 0, len(f.nodes))
for n := range f.nodes {
rst = append(rst, n)
}
f.mu.Unlock()
return rst
}
func (f *fakePeerEvents) AddPeer(node *enode.Node) {
f.mu.Lock()
f.nodes[node.ID()] = struct{}{}
f.mu.Unlock()
if f.input == nil {
return
}
f.input <- &p2p.PeerEvent{
Peer: node.ID(),
Type: p2p.PeerEventTypeAdd,
}
}
func (f *fakePeerEvents) RemovePeer(node *enode.Node) {
f.mu.Lock()
delete(f.nodes, node.ID())
f.mu.Unlock()
if f.input == nil {
return
}
f.input <- &p2p.PeerEvent{
Peer: node.ID(),
Type: p2p.PeerEventTypeDrop,
}
}
func newFakePeerAdderRemover() *fakePeerEvents {
return &fakePeerEvents{nodes: map[enode.ID]struct{}{}}
}
func (f *fakePeerEvents) SubscribeEvents(output chan *p2p.PeerEvent) event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
for {
select {
case <-quit:
return nil
case ev := <-f.input:
// will block the same way as in any feed
output <- ev
}
}
})
}
func newFakeServer() *fakePeerEvents {
srv := newFakePeerAdderRemover()
srv.input = make(chan *p2p.PeerEvent, 20)
return srv
}
type fakeEnvelopeEvents struct {
input chan whisperv6.EnvelopeEvent
}
func (f fakeEnvelopeEvents) SubscribeEnvelopeEvents(output chan<- whisperv6.EnvelopeEvent) event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
for {
select {
case <-quit:
return nil
case ev := <-f.input:
// will block the same way as in any feed
output <- ev
}
}
})
}
func newFakeEnvelopesEvents() fakeEnvelopeEvents {
return fakeEnvelopeEvents{
input: make(chan whisperv6.EnvelopeEvent),
}
}
func getNRandomNodes(t *testing.T, n int) map[enode.ID]*enode.Node {
rst := map[enode.ID]*enode.Node{}
for i := 0; i < n; i++ {
n, err := RandomeNode()
require.NoError(t, err)
rst[n.ID()] = n
}
return rst
}
func mergeOldIntoNew(old, new map[enode.ID]*enode.Node) {
for n := range old {
new[n] = old[n]
}
}
func TestReplaceNodes(t *testing.T) {
type testCase struct {
description string
old map[enode.ID]*enode.Node
new map[enode.ID]*enode.Node
target int
}
for _, tc := range []testCase{
{
"InitialReplace",
getNRandomNodes(t, 0),
getNRandomNodes(t, 3),
2,
},
{
"FullReplace",
getNRandomNodes(t, 3),
getNRandomNodes(t, 3),
2,
},
} {
t.Run(tc.description, func(t *testing.T) {
peers := newFakePeerAdderRemover()
replaceNodes(peers, tc.target, peers.nodes, nil, tc.old)
require.Len(t, peers.nodes, len(tc.old))
for n := range peers.nodes {
require.Contains(t, tc.old, n)
}
replaceNodes(peers, tc.target, peers.nodes, tc.old, tc.new)
require.Len(t, peers.nodes, len(tc.new))
for n := range peers.nodes {
require.Contains(t, tc.new, n)
}
})
}
}
func TestPartialReplaceNodesBelowTarget(t *testing.T) {
peers := newFakePeerAdderRemover()
old := getNRandomNodes(t, 1)
new := getNRandomNodes(t, 2)
replaceNodes(peers, 2, peers.nodes, nil, old)
mergeOldIntoNew(old, new)
replaceNodes(peers, 2, peers.nodes, old, new)
require.Len(t, peers.nodes, len(new))
}
func TestPartialReplaceNodesAboveTarget(t *testing.T) {
peers := newFakePeerAdderRemover()
old := getNRandomNodes(t, 1)
new := getNRandomNodes(t, 2)
replaceNodes(peers, 1, peers.nodes, nil, old)
mergeOldIntoNew(old, new)
replaceNodes(peers, 1, peers.nodes, old, new)
require.Len(t, peers.nodes, 1)
}
func TestConnectionManagerAddDrop(t *testing.T) {
server := newFakeServer()
whisper := newFakeEnvelopesEvents()
target := 1
connmanager := NewConnectionManager(server, whisper, target)
connmanager.Start()
defer connmanager.Stop()
nodes := []*enode.Node{}
for _, n := range getNRandomNodes(t, 3) {
nodes = append(nodes, n)
}
// Send 3 random nodes to connection manager.
connmanager.Notify(nodes)
var initial enode.ID
// Wait till connection manager establishes connection with 1 peer.
require.NoError(t, utils.Eventually(func() error {
nodes := server.Nodes()
if len(nodes) != target {
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
}
initial = nodes[0]
return nil
}, time.Second, 100*time.Millisecond))
// Send an event that peer was dropped.
select {
case server.input <- &p2p.PeerEvent{Peer: initial, Type: p2p.PeerEventTypeDrop}:
case <-time.After(time.Second):
require.FailNow(t, "can't send a drop event")
}
// Connection manager should establish connection with any other peer from initial list.
require.NoError(t, utils.Eventually(func() error {
nodes := server.Nodes()
if len(nodes) != target {
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
}
if nodes[0] == initial {
return fmt.Errorf("connected node wasn't changed from %s", initial)
}
return nil
}, time.Second, 100*time.Millisecond))
}
func TestConnectionManagerReplace(t *testing.T) {
server := newFakeServer()
whisper := newFakeEnvelopesEvents()
target := 1
connmanager := NewConnectionManager(server, whisper, target)
connmanager.Start()
defer connmanager.Stop()
nodes := []*enode.Node{}
for _, n := range getNRandomNodes(t, 3) {
nodes = append(nodes, n)
}
// Send a single node to connection manager.
connmanager.Notify(nodes[:1])
// Wait until this node will get connected.
require.NoError(t, utils.Eventually(func() error {
connected := server.Nodes()
if len(connected) != target {
return fmt.Errorf("unexpected number of connected servers: %d", len(connected))
}
if nodes[0].ID() != connected[0] {
return fmt.Errorf("connected with a wrong peer. expected %s, got %s", nodes[0].ID(), connected[0])
}
return nil
}, time.Second, 100*time.Millisecond))
// Replace previously sent node with 2 different nodes.
connmanager.Notify(nodes[1:])
// Wait until connection manager replaces node connected in the first round.
require.NoError(t, utils.Eventually(func() error {
connected := server.Nodes()
if len(connected) != target {
return fmt.Errorf("unexpected number of connected servers: %d", len(connected))
}
switch connected[0] {
case nodes[1].ID():
case nodes[2].ID():
default:
return fmt.Errorf("connected with unexpected peer. got %s, expected %+v", connected[0], nodes[1:])
}
return nil
}, time.Second, 100*time.Millisecond))
}
func TestConnectionChangedAfterExpiry(t *testing.T) {
server := newFakeServer()
whisper := newFakeEnvelopesEvents()
target := 1
connmanager := NewConnectionManager(server, whisper, target)
connmanager.Start()
defer connmanager.Stop()
nodes := []*enode.Node{}
for _, n := range getNRandomNodes(t, 2) {
nodes = append(nodes, n)
}
// Send two random nodes to connection manager.
connmanager.Notify(nodes)
var initial enode.ID
// Wait until connection manager establishes connection with one node.
require.NoError(t, utils.Eventually(func() error {
nodes := server.Nodes()
if len(nodes) != target {
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
}
initial = nodes[0]
return nil
}, time.Second, 100*time.Millisecond))
hash := common.Hash{1}
// Send event that history request for connected peer was sent.
select {
case whisper.input <- whisperv6.EnvelopeEvent{
Event: whisperv6.EventMailServerRequestSent, Peer: initial, Hash: hash}:
case <-time.After(time.Second):
require.FailNow(t, "can't send a 'sent' event")
}
// And eventually expired.
select {
case whisper.input <- whisperv6.EnvelopeEvent{
Event: whisperv6.EventMailServerRequestExpired, Peer: initial, Hash: hash}:
case <-time.After(time.Second):
require.FailNow(t, "can't send an 'expiry' event")
}
require.NoError(t, utils.Eventually(func() error {
nodes := server.Nodes()
if len(nodes) != target {
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
}
if nodes[0] == initial {
return fmt.Errorf("connected node wasn't changed from %s", initial)
}
return nil
}, time.Second, 100*time.Millisecond))
}

View File

@ -0,0 +1,67 @@
package mailservers
import (
"errors"
"sync"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
)
var (
// ErrNoConnected returned when mail servers are not connected.
ErrNoConnected = errors.New("no connected mail servers")
)
// PeersProvider is an interface for requesting list of peers.
type PeersProvider interface {
Peers() []*p2p.Peer
}
// NewPeerStore returns an instance of PeerStore.
func NewPeerStore() *PeerStore {
return &PeerStore{nodes: map[enode.ID]*enode.Node{}}
}
// PeerStore stores list of selected mail servers and keeps N of them connected.
type PeerStore struct {
mu sync.RWMutex
nodes map[enode.ID]*enode.Node
}
// Exist confirms that peers was added to a store.
func (ps *PeerStore) Exist(peer enode.ID) bool {
ps.mu.RLock()
defer ps.mu.RUnlock()
_, exist := ps.nodes[peer]
return exist
}
// Get returns instance of the node with requested ID or nil if ID is not found.
func (ps *PeerStore) Get(peer enode.ID) *enode.Node {
ps.mu.RLock()
defer ps.mu.RUnlock()
return ps.nodes[peer]
}
// Update updates peers locally.
func (ps *PeerStore) Update(nodes []*enode.Node) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.nodes = map[enode.ID]*enode.Node{}
for _, n := range nodes {
ps.nodes[n.ID()] = n
}
}
// GetFirstConnected returns first connected peer that is also added to a peer store.
// Raises ErrNoConnected if no peers are added to a peer store.
func GetFirstConnected(provider PeersProvider, store *PeerStore) (*enode.Node, error) {
peers := provider.Peers()
for _, p := range peers {
if store.Exist(p.ID()) {
return p.Node(), nil
}
}
return nil, ErrNoConnected
}

View File

@ -0,0 +1,77 @@
package mailservers
import (
"testing"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/stretchr/testify/require"
)
func RandomeNode() (*enode.Node, error) {
pkey, err := crypto.GenerateKey()
if err != nil {
return nil, err
}
return enode.NewV4(&pkey.PublicKey, nil, 0, 0), nil
}
func TestUpdateResetsInternalStorage(t *testing.T) {
store := NewPeerStore()
r1, err := RandomeNode()
require.NoError(t, err)
r2, err := RandomeNode()
require.NoError(t, err)
store.Update([]*enode.Node{r1, r2})
require.True(t, store.Exist(r1.ID()))
require.True(t, store.Exist(r2.ID()))
store.Update([]*enode.Node{r2})
require.False(t, store.Exist(r1.ID()))
require.True(t, store.Exist(r2.ID()))
}
func TestGetNodeByID(t *testing.T) {
store := NewPeerStore()
r1, err := RandomeNode()
require.NoError(t, err)
store.Update([]*enode.Node{r1})
require.Equal(t, r1, store.Get(r1.ID()))
require.Nil(t, store.Get(enode.ID{1}))
}
type fakePeerProvider struct {
peers []*p2p.Peer
}
func (f fakePeerProvider) Peers() []*p2p.Peer {
return f.peers
}
func TestNoConnected(t *testing.T) {
provider := fakePeerProvider{}
store := NewPeerStore()
_, err := GetFirstConnected(provider, store)
require.EqualError(t, ErrNoConnected, err.Error())
}
func TestGetFirstConnected(t *testing.T) {
numPeers := 3
nodes := make([]*enode.Node, numPeers)
peers := make([]*p2p.Peer, numPeers)
nodesMap := getNRandomNodes(t, numPeers)
i := 0
for _, node := range nodesMap {
nodes[i] = node
peers[i] = p2p.NewPeer(node.ID(), node.ID().String(), nil)
i++
}
store := NewPeerStore()
provider := fakePeerProvider{peers}
_, err := GetFirstConnected(provider, store)
require.EqualError(t, ErrNoConnected, err.Error())
store.Update(nodes)
node, err := GetFirstConnected(provider, store)
require.NoError(t, err)
require.Contains(t, nodesMap, node.ID())
}

View File

@ -6,35 +6,26 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/services/shhext/chat" "github.com/status-im/status-go/services/shhext/chat"
"github.com/status-im/status-go/services/shhext/dedup" "github.com/status-im/status-go/services/shhext/dedup"
"github.com/status-im/status-go/services/shhext/mailservers"
whisper "github.com/status-im/whisper/whisperv6" whisper "github.com/status-im/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
var errProtocolNotInitialized = errors.New("procotol is not initialized")
// EnvelopeState in local tracker
type EnvelopeState int
const ( const (
// EnvelopePosted is set when envelope was added to a local whisper queue. // defaultConnectionsTarget used in Service.Start if configured connection target is 0.
EnvelopePosted EnvelopeState = iota defaultConnectionsTarget = 1
// EnvelopeSent is set when envelope is sent to atleast one peer.
EnvelopeSent
// MailServerRequestSent is set when p2p request is sent to the mailserver
MailServerRequestSent
) )
var errProtocolNotInitialized = errors.New("procotol is not initialized")
// EnvelopeEventsHandler used for two different event types. // EnvelopeEventsHandler used for two different event types.
type EnvelopeEventsHandler interface { type EnvelopeEventsHandler interface {
EnvelopeSent(common.Hash) EnvelopeSent(common.Hash)
@ -46,7 +37,9 @@ type EnvelopeEventsHandler interface {
// Service is a service that provides some additional Whisper API. // Service is a service that provides some additional Whisper API.
type Service struct { type Service struct {
w *whisper.Whisper w *whisper.Whisper
config *ServiceConfig
tracker *tracker tracker *tracker
server *p2p.Server
nodeID *ecdsa.PrivateKey nodeID *ecdsa.PrivateKey
deduplicator *dedup.Deduplicator deduplicator *dedup.Deduplicator
protocol *chat.ProtocolService protocol *chat.ProtocolService
@ -54,6 +47,9 @@ type Service struct {
dataDir string dataDir string
installationID string installationID string
pfsEnabled bool pfsEnabled bool
peerStore *mailservers.PeerStore
connManager *mailservers.ConnectionManager
} }
type ServiceConfig struct { type ServiceConfig struct {
@ -62,6 +58,8 @@ type ServiceConfig struct {
Debug bool Debug bool
PFSEnabled bool PFSEnabled bool
MailServerConfirmations bool MailServerConfirmations bool
EnableConnectionManager bool
ConnectionTarget int
} }
// Make sure that Service implements node.Service interface. // Make sure that Service implements node.Service interface.
@ -69,28 +67,34 @@ var _ node.Service = (*Service)(nil)
// New returns a new Service. dataDir is a folder path to a network-independent location // New returns a new Service. dataDir is a folder path to a network-independent location
func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, config *ServiceConfig) *Service { func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, config *ServiceConfig) *Service {
ps := mailservers.NewPeerStore()
track := &tracker{ track := &tracker{
w: w, w: w,
handler: handler, handler: handler,
cache: map[common.Hash]EnvelopeState{}, cache: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{}, batches: map[common.Hash]map[common.Hash]struct{}{},
mailservers: map[enode.ID]struct{}{}, mailPeers: ps,
mailServerConfirmation: config.MailServerConfirmations, mailServerConfirmation: config.MailServerConfirmations,
} }
return &Service{ return &Service{
w: w, w: w,
config: config,
tracker: track, tracker: track,
deduplicator: dedup.NewDeduplicator(w, db), deduplicator: dedup.NewDeduplicator(w, db),
debug: config.Debug, debug: config.Debug,
dataDir: config.DataDir, dataDir: config.DataDir,
installationID: config.InstallationID, installationID: config.InstallationID,
pfsEnabled: config.PFSEnabled, pfsEnabled: config.PFSEnabled,
peerStore: ps,
} }
} }
// UpdateMailservers updates information about selected mail servers. // UpdateMailservers updates information about selected mail servers.
func (s *Service) UpdateMailservers(nodes []*enode.Node) { func (s *Service) UpdateMailservers(nodes []*enode.Node) {
s.tracker.UpdateMailservers(nodes) s.peerStore.Update(nodes)
if s.connManager != nil {
s.connManager.Notify(nodes)
}
} }
// Protocols returns a new protocols list. In this case, there are none. // Protocols returns a new protocols list. In this case, there are none.
@ -191,237 +195,26 @@ func (s *Service) APIs() []rpc.API {
// Start is run when a service is started. // Start is run when a service is started.
// It does nothing in this case but is required by `node.Service` interface. // It does nothing in this case but is required by `node.Service` interface.
func (s *Service) Start(server *p2p.Server) error { func (s *Service) Start(server *p2p.Server) error {
if s.config.EnableConnectionManager {
connectionsTarget := s.config.ConnectionTarget
if connectionsTarget == 0 {
connectionsTarget = defaultConnectionsTarget
}
s.connManager = mailservers.NewConnectionManager(server, s.w, connectionsTarget)
s.connManager.Start()
}
s.tracker.Start() s.tracker.Start()
s.nodeID = server.PrivateKey s.nodeID = server.PrivateKey
s.server = server
return nil return nil
} }
// Stop is run when a service is stopped. // Stop is run when a service is stopped.
// It does nothing in this case but is required by `node.Service` interface. // It does nothing in this case but is required by `node.Service` interface.
func (s *Service) Stop() error { func (s *Service) Stop() error {
if s.config.EnableConnectionManager {
s.connManager.Stop()
}
s.tracker.Stop() s.tracker.Stop()
return nil return nil
} }
// tracker responsible for processing events for envelopes that we are interested in
// and calling specified handler.
type tracker struct {
w *whisper.Whisper
handler EnvelopeEventsHandler
mailServerConfirmation bool
mu sync.Mutex
cache map[common.Hash]EnvelopeState
batches map[common.Hash]map[common.Hash]struct{}
mailMu sync.Mutex
mailservers map[enode.ID]struct{}
wg sync.WaitGroup
quit chan struct{}
}
// Start processing events.
func (t *tracker) Start() {
t.quit = make(chan struct{})
t.wg.Add(1)
go func() {
t.handleEnvelopeEvents()
t.wg.Done()
}()
}
// Stop process events.
func (t *tracker) Stop() {
close(t.quit)
t.wg.Wait()
}
// Add hash to a tracker.
func (t *tracker) Add(hash common.Hash) {
t.mu.Lock()
defer t.mu.Unlock()
t.cache[hash] = EnvelopePosted
}
func (t *tracker) UpdateMailservers(nodes []*enode.Node) {
t.mailMu.Lock()
defer t.mailMu.Unlock()
t.mailservers = map[enode.ID]struct{}{}
for _, n := range nodes {
t.mailservers[n.ID()] = struct{}{}
}
}
// Add request hash to a tracker.
func (t *tracker) AddRequest(hash common.Hash, timerC <-chan time.Time) {
t.mu.Lock()
defer t.mu.Unlock()
t.cache[hash] = MailServerRequestSent
go t.expireRequest(hash, timerC)
}
func (t *tracker) expireRequest(hash common.Hash, timerC <-chan time.Time) {
select {
case <-t.quit:
return
case <-timerC:
t.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestExpired,
Hash: hash,
})
}
}
// handleEnvelopeEvents processes whisper envelope events
func (t *tracker) handleEnvelopeEvents() {
events := make(chan whisper.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper
sub := t.w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
for {
select {
case <-t.quit:
return
case event := <-events:
t.handleEvent(event)
}
}
}
// handleEvent based on type of the event either triggers
// confirmation handler or removes hash from tracker
func (t *tracker) handleEvent(event whisper.EnvelopeEvent) {
handlers := map[whisper.EventType]func(whisper.EnvelopeEvent){
whisper.EventEnvelopeSent: t.handleEventEnvelopeSent,
whisper.EventEnvelopeExpired: t.handleEventEnvelopeExpired,
whisper.EventBatchAcknowledged: t.handleAcknowledgedBatch,
whisper.EventMailServerRequestCompleted: t.handleEventMailServerRequestCompleted,
whisper.EventMailServerRequestExpired: t.handleEventMailServerRequestExpired,
}
if handler, ok := handlers[event.Event]; ok {
handler(event)
}
}
func (t *tracker) handleEventEnvelopeSent(event whisper.EnvelopeEvent) {
if t.mailServerConfirmation {
if !t.isMailserver(event.Peer) {
return
}
}
t.mu.Lock()
defer t.mu.Unlock()
state, ok := t.cache[event.Hash]
// if we didn't send a message using extension - skip it
// if message was already confirmed - skip it
if !ok || state == EnvelopeSent {
return
}
if event.Batch != (common.Hash{}) {
if _, ok := t.batches[event.Batch]; !ok {
t.batches[event.Batch] = map[common.Hash]struct{}{}
}
t.batches[event.Batch][event.Hash] = struct{}{}
log.Debug("waiting for a confirmation", "batch", event.Batch)
} else {
log.Debug("envelope is sent", "hash", event.Hash, "peer", event.Peer)
t.cache[event.Hash] = EnvelopeSent
if t.handler != nil {
t.handler.EnvelopeSent(event.Hash)
}
}
}
func (t *tracker) isMailserver(peer enode.ID) bool {
t.mailMu.Lock()
defer t.mailMu.Unlock()
if len(t.mailservers) == 0 {
log.Error("mail servers are empty")
return false
}
_, exist := t.mailservers[peer]
if !exist {
log.Debug("confirmation received not from a mail server is skipped", "peer", peer)
return false
}
return true
}
func (t *tracker) handleAcknowledgedBatch(event whisper.EnvelopeEvent) {
if t.mailServerConfirmation {
if !t.isMailserver(event.Peer) {
return
}
}
t.mu.Lock()
defer t.mu.Unlock()
envelopes, ok := t.batches[event.Batch]
if !ok {
log.Debug("batch is not found", "batch", event.Batch)
}
log.Debug("received a confirmation", "batch", event.Batch, "peer", event.Peer)
for hash := range envelopes {
state, ok := t.cache[hash]
if !ok || state == EnvelopeSent {
continue
}
t.cache[hash] = EnvelopeSent
if t.handler != nil {
t.handler.EnvelopeSent(hash)
}
}
delete(t.batches, event.Batch)
}
func (t *tracker) handleEventEnvelopeExpired(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
if state, ok := t.cache[event.Hash]; ok {
delete(t.cache, event.Hash)
if state == EnvelopeSent {
return
}
log.Debug("envelope expired", "hash", event.Hash, "state", state)
if t.handler != nil {
t.handler.EnvelopeExpired(event.Hash)
}
}
}
func (t *tracker) handleEventMailServerRequestCompleted(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
state, ok := t.cache[event.Hash]
if !ok || state != MailServerRequestSent {
return
}
log.Debug("mailserver response received", "hash", event.Hash)
delete(t.cache, event.Hash)
if t.handler != nil {
if resp, ok := event.Data.(*whisper.MailServerResponse); ok {
t.handler.MailServerRequestCompleted(event.Hash, resp.LastEnvelopeHash, resp.Cursor, resp.Error)
}
}
}
func (t *tracker) handleEventMailServerRequestExpired(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
state, ok := t.cache[event.Hash]
if !ok || state != MailServerRequestSent {
return
}
log.Debug("mailserver response expired", "hash", event.Hash)
delete(t.cache, event.Hash)
if t.handler != nil {
t.handler.MailServerRequestExpired(event.Hash)
}
}

View File

@ -94,6 +94,7 @@ func (s *ShhExtSuite) SetupTest() {
Debug: true, Debug: true,
PFSEnabled: false, PFSEnabled: false,
MailServerConfirmations: true, MailServerConfirmations: true,
ConnectionTarget: 10,
} }
s.services[i] = New(s.whisper[i], nil, nil, config) s.services[i] = New(s.whisper[i], nil, nil, config)
s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) { s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) {
@ -236,12 +237,12 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() {
NoDiscovery: true, NoDiscovery: true,
}, },
}) // in-memory node as no data dir }) // in-memory node as no data dir
s.NoError(err) s.Require().NoError(err)
err = aNode.Register(func(*node.ServiceContext) (node.Service, error) { return shh, nil }) err = aNode.Register(func(*node.ServiceContext) (node.Service, error) { return shh, nil })
s.NoError(err) s.Require().NoError(err)
err = aNode.Start() err = aNode.Start()
s.NoError(err) s.Require().NoError(err)
defer func() { err := aNode.Stop(); s.NoError(err) }() defer func() { err := aNode.Stop(); s.NoError(err) }()
mock := newHandlerMock(1) mock := newHandlerMock(1)
@ -252,6 +253,7 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() {
PFSEnabled: false, PFSEnabled: false,
} }
service := New(shh, mock, nil, config) service := New(shh, mock, nil, config)
s.Require().NoError(service.Start(aNode.Server()))
api := NewPublicAPI(service) api := NewPublicAPI(service)
// with a peer acting as a mailserver // with a peer acting as a mailserver
@ -263,41 +265,40 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() {
ListenAddr: ":0", ListenAddr: ":0",
}, },
}) // in-memory node as no data dir }) // in-memory node as no data dir
s.NoError(err) s.Require().NoError(err)
err = mailNode.Register(func(*node.ServiceContext) (node.Service, error) { err = mailNode.Register(func(*node.ServiceContext) (node.Service, error) {
return whisper.New(nil), nil return whisper.New(nil), nil
}) })
s.NoError(err) s.NoError(err)
err = mailNode.Start() err = mailNode.Start()
s.NoError(err) s.Require().NoError(err)
defer func() { s.NoError(mailNode.Stop()) }() defer func() { s.NoError(mailNode.Stop()) }()
// add mailPeer as a peer // add mailPeer as a peer
waitErr := helpers.WaitForPeerAsync(aNode.Server(), mailNode.Server().Self().String(), p2p.PeerEventTypeAdd, time.Second) waitErr := helpers.WaitForPeerAsync(aNode.Server(), mailNode.Server().Self().String(), p2p.PeerEventTypeAdd, time.Second)
aNode.Server().AddPeer(mailNode.Server().Self()) aNode.Server().AddPeer(mailNode.Server().Self())
s.NoError(<-waitErr) s.Require().NoError(<-waitErr)
var hash []byte var hash []byte
// send a request with a symmetric key // send a request with a symmetric key
symKeyID, symKeyErr := shh.AddSymKeyFromPassword("some-pass") symKeyID, symKeyErr := shh.AddSymKeyFromPassword("some-pass")
s.NoError(symKeyErr) s.Require().NoError(symKeyErr)
hash, err = api.RequestMessages(context.TODO(), MessagesRequest{ hash, err = api.RequestMessages(context.TODO(), MessagesRequest{
MailServerPeer: mailNode.Server().Self().String(), MailServerPeer: mailNode.Server().Self().String(),
SymKeyID: symKeyID, SymKeyID: symKeyID,
}) })
s.NoError(err) s.Require().NoError(err)
s.NotNil(hash) s.Require().NotNil(hash)
s.Contains(api.service.tracker.cache, common.BytesToHash(hash)) s.Require().NoError(waitForHashInTracker(api.service.tracker, common.BytesToHash(hash), MailServerRequestSent, time.Second))
// Send a request without a symmetric key. In this case, // Send a request without a symmetric key. In this case,
// a public key extracted from MailServerPeer will be used. // a public key extracted from MailServerPeer will be used.
hash, err = api.RequestMessages(context.TODO(), MessagesRequest{ hash, err = api.RequestMessages(context.TODO(), MessagesRequest{
MailServerPeer: mailNode.Server().Self().String(), MailServerPeer: mailNode.Server().Self().String(),
}) })
s.NoError(err) s.Require().NoError(err)
s.NotNil(hash) s.Require().NotNil(hash)
s.Contains(api.service.tracker.cache, common.BytesToHash(hash)) s.Require().NoError(waitForHashInTracker(api.service.tracker, common.BytesToHash(hash), MailServerRequestSent, time.Second))
} }
func (s *ShhExtSuite) TestDebugPostSync() { func (s *ShhExtSuite) TestDebugPostSync() {
@ -398,141 +399,17 @@ func (s *ShhExtSuite) TearDown() {
} }
} }
var ( func waitForHashInTracker(track *tracker, hash common.Hash, state EnvelopeState, deadline time.Duration) error {
testHash = common.Hash{0x01} after := time.After(deadline)
) ticker := time.Tick(100 * time.Millisecond)
for {
func TestTrackerSuite(t *testing.T) {
suite.Run(t, new(TrackerSuite))
}
type TrackerSuite struct {
suite.Suite
tracker *tracker
}
func (s *TrackerSuite) SetupTest() {
s.tracker = &tracker{
cache: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{},
mailservers: map[enode.ID]struct{}{},
mailServerConfirmation: true,
}
}
func (s *TrackerSuite) TestConfirmed() {
testPeer := enode.ID{1}
s.tracker.mailservers[testPeer] = struct{}{}
s.tracker.Add(testHash)
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
Peer: testPeer,
})
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopeSent, s.tracker.cache[testHash])
}
func (s *TrackerSuite) TestConfirmedWithAcknowledge() {
testBatch := common.Hash{1}
testPeer := enode.ID{1}
s.tracker.Add(testHash)
s.tracker.mailservers[testPeer] = struct{}{}
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
Batch: testBatch,
Peer: testPeer,
})
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventBatchAcknowledged,
Batch: testBatch,
Peer: testPeer,
})
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopeSent, s.tracker.cache[testHash])
}
func (s *TrackerSuite) TestIgnored() {
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
})
s.NotContains(s.tracker.cache, testHash)
}
func (s *TrackerSuite) TestRemoved() {
s.tracker.Add(testHash)
s.Contains(s.tracker.cache, testHash)
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeExpired,
Hash: testHash,
})
s.NotContains(s.tracker.cache, testHash)
}
func (s *TrackerSuite) TestRequestCompleted() {
mock := newHandlerMock(1)
s.tracker.handler = mock
s.tracker.AddRequest(testHash, time.After(defaultRequestTimeout*time.Second))
s.Contains(s.tracker.cache, testHash)
s.Equal(MailServerRequestSent, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestCompleted,
Hash: testHash,
Data: &whisper.MailServerResponse{},
})
select { select {
case requestID := <-mock.requestsCompleted: case <-after:
s.Equal(testHash, requestID) return fmt.Errorf("failed while waiting for %s to get into state %d", hash, state)
s.NotContains(s.tracker.cache, testHash) case <-ticker:
case <-time.After(10 * time.Second): if track.GetState(hash) == state {
s.Fail("timed out while waiting for a request to be completed") return nil
} }
} }
func (s *TrackerSuite) TestRequestFailed() {
mock := newHandlerMock(1)
s.tracker.handler = mock
s.tracker.AddRequest(testHash, time.After(defaultRequestTimeout*time.Second))
s.Contains(s.tracker.cache, testHash)
s.Equal(MailServerRequestSent, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestCompleted,
Hash: testHash,
Data: &whisper.MailServerResponse{Error: errors.New("test error")},
})
select {
case requestID := <-mock.requestsFailed:
s.Equal(testHash, requestID)
s.NotContains(s.tracker.cache, testHash)
case <-time.After(10 * time.Second):
s.Fail("timed out while waiting for a request to be failed")
}
}
func (s *TrackerSuite) TestRequestExpiration() {
mock := newHandlerMock(1)
s.tracker.handler = mock
c := make(chan time.Time)
s.tracker.AddRequest(testHash, c)
s.Contains(s.tracker.cache, testHash)
s.Equal(MailServerRequestSent, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestExpired,
Hash: testHash,
})
select {
case requestID := <-mock.requestsExpired:
s.Equal(testHash, requestID)
s.NotContains(s.tracker.cache, testHash)
case <-time.After(10 * time.Second):
s.Fail("timed out while waiting for request expiration")
} }
} }

218
services/shhext/tracker.go Normal file
View File

@ -0,0 +1,218 @@
package shhext
import (
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/services/shhext/mailservers"
whisper "github.com/status-im/whisper/whisperv6"
)
// EnvelopeState in local tracker
type EnvelopeState int
const (
// NotRegistered returned if asked hash wasn't registered in the tracker.
NotRegistered EnvelopeState = -1
// EnvelopePosted is set when envelope was added to a local whisper queue.
EnvelopePosted EnvelopeState = iota
// EnvelopeSent is set when envelope is sent to atleast one peer.
EnvelopeSent
// MailServerRequestSent is set when p2p request is sent to the mailserver
MailServerRequestSent
)
// tracker responsible for processing events for envelopes that we are interested in
// and calling specified handler.
type tracker struct {
w *whisper.Whisper
handler EnvelopeEventsHandler
mailServerConfirmation bool
mu sync.Mutex
cache map[common.Hash]EnvelopeState
batches map[common.Hash]map[common.Hash]struct{}
mailPeers *mailservers.PeerStore
wg sync.WaitGroup
quit chan struct{}
}
// Start processing events.
func (t *tracker) Start() {
t.quit = make(chan struct{})
t.wg.Add(1)
go func() {
t.handleEnvelopeEvents()
t.wg.Done()
}()
}
// Stop process events.
func (t *tracker) Stop() {
close(t.quit)
t.wg.Wait()
}
// Add hash to a tracker.
func (t *tracker) Add(hash common.Hash) {
t.mu.Lock()
defer t.mu.Unlock()
t.cache[hash] = EnvelopePosted
}
func (t *tracker) GetState(hash common.Hash) EnvelopeState {
t.mu.Lock()
defer t.mu.Unlock()
state, exist := t.cache[hash]
if !exist {
return NotRegistered
}
return state
}
// handleEnvelopeEvents processes whisper envelope events
func (t *tracker) handleEnvelopeEvents() {
events := make(chan whisper.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper
sub := t.w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
for {
select {
case <-t.quit:
return
case event := <-events:
t.handleEvent(event)
}
}
}
// handleEvent based on type of the event either triggers
// confirmation handler or removes hash from tracker
func (t *tracker) handleEvent(event whisper.EnvelopeEvent) {
handlers := map[whisper.EventType]func(whisper.EnvelopeEvent){
whisper.EventEnvelopeSent: t.handleEventEnvelopeSent,
whisper.EventEnvelopeExpired: t.handleEventEnvelopeExpired,
whisper.EventBatchAcknowledged: t.handleAcknowledgedBatch,
whisper.EventMailServerRequestSent: t.handleRequestSent,
whisper.EventMailServerRequestCompleted: t.handleEventMailServerRequestCompleted,
whisper.EventMailServerRequestExpired: t.handleEventMailServerRequestExpired,
}
if handler, ok := handlers[event.Event]; ok {
handler(event)
}
}
func (t *tracker) handleEventEnvelopeSent(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
state, ok := t.cache[event.Hash]
// if we didn't send a message using extension - skip it
// if message was already confirmed - skip it
if !ok || state == EnvelopeSent {
return
}
log.Debug("envelope is sent", "hash", event.Hash, "peer", event.Peer)
if event.Batch != (common.Hash{}) {
if _, ok := t.batches[event.Batch]; !ok {
t.batches[event.Batch] = map[common.Hash]struct{}{}
}
t.batches[event.Batch][event.Hash] = struct{}{}
log.Debug("waiting for a confirmation", "batch", event.Batch)
} else {
t.cache[event.Hash] = EnvelopeSent
if t.handler != nil {
t.handler.EnvelopeSent(event.Hash)
}
}
}
func (t *tracker) isMailserver(peer enode.ID) bool {
return t.mailPeers.Exist(peer)
}
func (t *tracker) handleAcknowledgedBatch(event whisper.EnvelopeEvent) {
if t.mailServerConfirmation {
if !t.isMailserver(event.Peer) {
return
}
}
t.mu.Lock()
defer t.mu.Unlock()
envelopes, ok := t.batches[event.Batch]
if !ok {
log.Debug("batch is not found", "batch", event.Batch)
}
log.Debug("received a confirmation", "batch", event.Batch, "peer", event.Peer)
for hash := range envelopes {
state, ok := t.cache[hash]
if !ok || state == EnvelopeSent {
continue
}
t.cache[hash] = EnvelopeSent
if t.handler != nil {
t.handler.EnvelopeSent(hash)
}
}
delete(t.batches, event.Batch)
}
func (t *tracker) handleEventEnvelopeExpired(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
if state, ok := t.cache[event.Hash]; ok {
delete(t.cache, event.Hash)
if state == EnvelopeSent {
return
}
log.Debug("envelope expired", "hash", event.Hash, "state", state)
if t.handler != nil {
t.handler.EnvelopeExpired(event.Hash)
}
}
}
func (t *tracker) handleRequestSent(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
t.cache[event.Hash] = MailServerRequestSent
}
func (t *tracker) handleEventMailServerRequestCompleted(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
state, ok := t.cache[event.Hash]
if !ok || state != MailServerRequestSent {
return
}
log.Debug("mailserver response received", "hash", event.Hash)
delete(t.cache, event.Hash)
if t.handler != nil {
if resp, ok := event.Data.(*whisper.MailServerResponse); ok {
t.handler.MailServerRequestCompleted(event.Hash, resp.LastEnvelopeHash, resp.Cursor, resp.Error)
}
}
}
func (t *tracker) handleEventMailServerRequestExpired(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
state, ok := t.cache[event.Hash]
if !ok || state != MailServerRequestSent {
return
}
log.Debug("mailserver response expired", "hash", event.Hash)
delete(t.cache, event.Hash)
if t.handler != nil {
t.handler.MailServerRequestExpired(event.Hash)
}
}

View File

@ -0,0 +1,144 @@
package shhext
import (
"errors"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/services/shhext/mailservers"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/suite"
)
var (
testHash = common.Hash{0x01}
)
func TestTrackerSuite(t *testing.T) {
suite.Run(t, new(TrackerSuite))
}
type TrackerSuite struct {
suite.Suite
tracker *tracker
}
func (s *TrackerSuite) SetupTest() {
s.tracker = &tracker{
cache: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{},
mailPeers: mailservers.NewPeerStore(),
mailServerConfirmation: true,
}
}
func (s *TrackerSuite) TestConfirmed() {
s.tracker.Add(testHash)
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
})
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopeSent, s.tracker.cache[testHash])
}
func (s *TrackerSuite) TestConfirmedWithAcknowledge() {
testBatch := common.Hash{1}
pkey, err := crypto.GenerateKey()
s.Require().NoError(err)
node := enode.NewV4(&pkey.PublicKey, nil, 0, 0)
s.tracker.mailPeers.Update([]*enode.Node{node})
s.tracker.Add(testHash)
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
Batch: testBatch,
})
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventBatchAcknowledged,
Batch: testBatch,
Peer: node.ID(),
})
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopeSent, s.tracker.cache[testHash])
}
func (s *TrackerSuite) TestIgnored() {
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
})
s.NotContains(s.tracker.cache, testHash)
}
func (s *TrackerSuite) TestRemoved() {
s.tracker.Add(testHash)
s.Contains(s.tracker.cache, testHash)
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeExpired,
Hash: testHash,
})
s.NotContains(s.tracker.cache, testHash)
}
func (s *TrackerSuite) TestRequestCompleted() {
mock := newHandlerMock(1)
s.tracker.handler = mock
s.tracker.cache[testHash] = MailServerRequestSent
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestCompleted,
Hash: testHash,
Data: &whisper.MailServerResponse{},
})
select {
case requestID := <-mock.requestsCompleted:
s.Equal(testHash, requestID)
s.NotContains(s.tracker.cache, testHash)
case <-time.After(10 * time.Second):
s.Fail("timed out while waiting for a request to be completed")
}
}
func (s *TrackerSuite) TestRequestFailed() {
mock := newHandlerMock(1)
s.tracker.handler = mock
s.tracker.cache[testHash] = MailServerRequestSent
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestCompleted,
Hash: testHash,
Data: &whisper.MailServerResponse{Error: errors.New("test error")},
})
select {
case requestID := <-mock.requestsFailed:
s.Equal(testHash, requestID)
s.NotContains(s.tracker.cache, testHash)
case <-time.After(10 * time.Second):
s.Fail("timed out while waiting for a request to be failed")
}
}
func (s *TrackerSuite) TestRequestExpiration() {
mock := newHandlerMock(1)
s.tracker.handler = mock
s.tracker.cache[testHash] = MailServerRequestSent
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestExpired,
Hash: testHash,
})
select {
case requestID := <-mock.requestsExpired:
s.Equal(testHash, requestID)
s.NotContains(s.tracker.cache, testHash)
case <-time.After(10 * time.Second):
s.Fail("timed out while waiting for request expiration")
}
}

View File

@ -40,7 +40,9 @@ func (s *MailServiceSuite) TestShhextRequestMessagesRPCMethodAvailability() {
// This error means that the method is available through inproc communication // This error means that the method is available through inproc communication
// as the validation of params occurred. // as the validation of params occurred.
err := client.Call(nil, "shhext_requestMessages", map[string]interface{}{}) err := client.Call(nil, "shhext_requestMessages", map[string]interface{}{
"mailServerPeer": "xxx",
})
r.EqualError(err, `invalid mailServerPeer value: invalid URL scheme, want "enode"`) r.EqualError(err, `invalid mailServerPeer value: invalid URL scheme, want "enode"`)
// Do the same but using HTTP interface. // Do the same but using HTTP interface.

View File

@ -17,9 +17,13 @@ const (
EventBatchAcknowledged EventType = "batch.acknowleged" EventBatchAcknowledged EventType = "batch.acknowleged"
// EventEnvelopeAvailable fires when envelop is available for filters // EventEnvelopeAvailable fires when envelop is available for filters
EventEnvelopeAvailable EventType = "envelope.available" EventEnvelopeAvailable EventType = "envelope.available"
// EventMailServerRequestSent fires when such request is sent.
EventMailServerRequestSent EventType = "mailserver.request.sent"
// EventMailServerRequestCompleted fires after mailserver sends all the requested messages // EventMailServerRequestCompleted fires after mailserver sends all the requested messages
EventMailServerRequestCompleted EventType = "mailserver.request.completed" EventMailServerRequestCompleted EventType = "mailserver.request.completed"
// EventMailServerRequestExpired fires after mailserver the request TTL ends // EventMailServerRequestExpired fires after mailserver the request TTL ends.
// This event is independent and concurrent to EventMailServerRequestCompleted.
// Request should be considered as expired only if expiry event was received first.
EventMailServerRequestExpired EventType = "mailserver.request.expired" EventMailServerRequestExpired EventType = "mailserver.request.expired"
// EventMailServerEnvelopeArchived fires after an envelope has been archived // EventMailServerEnvelopeArchived fires after an envelope has been archived
EventMailServerEnvelopeArchived EventType = "mailserver.envelope.archived" EventMailServerEnvelopeArchived EventType = "mailserver.envelope.archived"

View File

@ -407,12 +407,40 @@ func (whisper *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error {
// which are not supposed to be forwarded any further. // which are not supposed to be forwarded any further.
// The whisper protocol is agnostic of the format and contents of envelope. // The whisper protocol is agnostic of the format and contents of envelope.
func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error { func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error {
return whisper.RequestHistoricMessagesWithTimeout(peerID, envelope, 0)
}
func (whisper *Whisper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope *Envelope, timeout time.Duration) error {
p, err := whisper.getPeer(peerID) p, err := whisper.getPeer(peerID)
if err != nil { if err != nil {
return err return err
} }
whisper.envelopeFeed.Send(EnvelopeEvent{
Peer: p.peer.ID(),
Hash: envelope.Hash(),
Event: EventMailServerRequestSent,
})
p.trusted = true p.trusted = true
return p2p.Send(p.ws, p2pRequestCode, envelope) err = p2p.Send(p.ws, p2pRequestCode, envelope)
if timeout != 0 {
go whisper.expireRequestHistoricMessages(p.peer.ID(), envelope.Hash(), timeout)
}
return err
}
func (whisper *Whisper) expireRequestHistoricMessages(peer enode.ID, hash common.Hash, timeout time.Duration) {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-whisper.quit:
return
case <-timer.C:
whisper.envelopeFeed.Send(EnvelopeEvent{
Peer: peer,
Hash: hash,
Event: EventMailServerRequestExpired,
})
}
} }
func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, payload []byte) error { func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, payload []byte) error {