diff --git a/api/backend.go b/api/backend.go index 942ce9ec7..73672d854 100644 --- a/api/backend.go +++ b/api/backend.go @@ -587,6 +587,5 @@ func (b *StatusBackend) UpdateMailservers(enodes []string) error { } nodes[i] = node } - st.UpdateMailservers(nodes) - return nil + return st.UpdateMailservers(nodes) } diff --git a/db/db.go b/db/db.go index 6f8a728b8..4330053b3 100644 --- a/db/db.go +++ b/db/db.go @@ -18,6 +18,8 @@ const ( // DeduplicatorCache is used for the db entries used for messages // deduplication cache DeduplicatorCache + // MailserversCache is a list of mail servers provided by users. + MailserversCache ) // Key creates a DB key for a specified service with specified data diff --git a/services/shhext/mailservers/cache.go b/services/shhext/mailservers/cache.go new file mode 100644 index 000000000..3fd314e97 --- /dev/null +++ b/services/shhext/mailservers/cache.go @@ -0,0 +1,142 @@ +package mailservers + +import ( + "encoding/json" + "time" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/status-im/status-go/db" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/util" +) + +// NewPeerRecord returns instance of the peer record. +func NewPeerRecord(node *enode.Node) PeerRecord { + return PeerRecord{node: node} +} + +// PeerRecord is set data associated with each peer that is stored on disk. +// PeerRecord stored with a enode as a key in leveldb, and body marshalled as json. +type PeerRecord struct { + node *enode.Node + + // last time it was used. + LastUsed time.Time +} + +// Encode encodes PeerRecords to bytes. +func (r PeerRecord) Encode() ([]byte, error) { + return json.Marshal(r) +} + +// ID returns enode identity of the node. +func (r PeerRecord) ID() enode.ID { + return r.node.ID() +} + +// Node returs pointer to original object. +// enode.Node doensn't allow modification on the object. +func (r PeerRecord) Node() *enode.Node { + return r.node +} + +// EncodeKey returns bytes that will should be used as a key in persistent storage. +func (r PeerRecord) EncodeKey() ([]byte, error) { + return r.Node().MarshalText() +} + +// NewCache returns pointer to a Cache instance. +func NewCache(db *leveldb.DB) *Cache { + return &Cache{db: db} +} + +// Cache is wrapper for operations on disk with leveldb. +type Cache struct { + db *leveldb.DB +} + +// Replace deletes old and adds new records in the persistent cache. +func (c *Cache) Replace(nodes []*enode.Node) error { + batch := new(leveldb.Batch) + iter := createPeersIterator(c.db) + defer iter.Release() + newNodes := nodesToMap(nodes) + for iter.Next() { + record, err := unmarshalKeyValue(keyWithoutPrefix(iter.Key()), iter.Value()) + if err != nil { + return err + } + if _, exist := newNodes[record.ID()]; exist { + delete(newNodes, record.ID()) + } else { + batch.Delete(iter.Key()) + } + } + for _, n := range newNodes { + enodeKey, err := n.MarshalText() + if err != nil { + return err + } + // we put nil as default value doesn't have any state associated with them. + batch.Put(db.Key(db.MailserversCache, enodeKey), nil) + } + return c.db.Write(batch, nil) +} + +// LoadAll loads all records from persistent database. +func (c *Cache) LoadAll() (rst []PeerRecord, err error) { + iter := createPeersIterator(c.db) + for iter.Next() { + record, err := unmarshalKeyValue(keyWithoutPrefix(iter.Key()), iter.Value()) + if err != nil { + return nil, err + } + rst = append(rst, record) + } + return rst, nil +} + +// UpdateRecord updates single record. +func (c *Cache) UpdateRecord(record PeerRecord) error { + enodeKey, err := record.EncodeKey() + if err != nil { + return err + } + value, err := record.Encode() + if err != nil { + return err + } + return c.db.Put(db.Key(db.MailserversCache, enodeKey), value, nil) +} + +func unmarshalKeyValue(key, value []byte) (record PeerRecord, err error) { + enodeKey := key + node := new(enode.Node) + err = node.UnmarshalText(enodeKey) + if err != nil { + return record, err + } + record = PeerRecord{node: node} + if len(value) != 0 { + err = json.Unmarshal(value, &record) + } + return record, err +} + +func nodesToMap(nodes []*enode.Node) map[enode.ID]*enode.Node { + rst := map[enode.ID]*enode.Node{} + for _, n := range nodes { + rst[n.ID()] = n + } + return rst +} + +func createPeersIterator(level *leveldb.DB) iterator.Iterator { + return level.NewIterator(util.BytesPrefix([]byte{byte(db.MailserversCache)}), nil) +} + +// keyWithoutPrefix removes first byte from key. +func keyWithoutPrefix(key []byte) []byte { + return key[1:] +} diff --git a/services/shhext/mailservers/cache_test.go b/services/shhext/mailservers/cache_test.go new file mode 100644 index 000000000..36d3470d6 --- /dev/null +++ b/services/shhext/mailservers/cache_test.go @@ -0,0 +1,80 @@ +package mailservers + +import ( + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/stretchr/testify/require" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/storage" +) + +func newInMemCache(t *testing.T) *Cache { + db, err := leveldb.Open(storage.NewMemStorage(), nil) + require.NoError(t, err) + return NewCache(db) +} + +func containsNode(nodes []*enode.Node, node *enode.Node) error { + for _, n := range nodes { + if n.ID() == node.ID() { + return nil + } + } + return fmt.Errorf("nodes %+s do not contain %s", nodes, node) +} + +func TestReplaceRecords(t *testing.T) { + nodesNumber := 3 + cache := newInMemCache(t) + nodes := make([]*enode.Node, nodesNumber) + // First round is a sanity check that records were written. + fillWithRandomNodes(t, nodes) + require.NoError(t, cache.Replace(nodes)) + records, err := cache.LoadAll() + require.NoError(t, err) + require.Len(t, records, nodesNumber) + for i := range records { + require.NoError(t, containsNode(nodes, records[i].Node())) + } + // Replace all nodes and verify that length is the same and loaded records are found. + fillWithRandomNodes(t, nodes) + require.NoError(t, cache.Replace(nodes)) + records, err = cache.LoadAll() + require.NoError(t, err) + require.Len(t, records, nodesNumber) + for i := range records { + require.NoError(t, containsNode(nodes, records[i].Node())) + } +} + +func TestUsedRecord(t *testing.T) { + cache := newInMemCache(t) + node, err := RandomNode() + require.NoError(t, err) + record := PeerRecord{node: node} + require.NoError(t, cache.UpdateRecord(record)) + record.LastUsed = time.Now() + require.NoError(t, cache.UpdateRecord(record)) + records, err := cache.LoadAll() + require.NoError(t, err) + require.Len(t, records, 1) + require.True(t, record.LastUsed.Equal(records[0].LastUsed)) +} + +func TestTimestampPreservedOnReplace(t *testing.T) { + cache := newInMemCache(t) + node, err := RandomNode() + require.NoError(t, err) + record := PeerRecord{node: node, LastUsed: time.Now()} + require.NoError(t, cache.UpdateRecord(record)) + require.NoError(t, cache.Replace([]*enode.Node{node})) + records, err := cache.LoadAll() + require.NoError(t, err) + require.Len(t, records, 1) + require.Equal(t, node.ID(), records[0].Node().ID()) + require.False(t, records[0].LastUsed.IsZero(), "timestamp should be preserved and not equal to zero") + +} diff --git a/services/shhext/mailservers/connmanager.go b/services/shhext/mailservers/connmanager.go index 905d0c01f..2f93f9a50 100644 --- a/services/shhext/mailservers/connmanager.go +++ b/services/shhext/mailservers/connmanager.go @@ -2,13 +2,14 @@ package mailservers import ( "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/status-im/whisper/whisperv6" + whisper "github.com/status-im/whisper/whisperv6" ) const ( @@ -29,7 +30,7 @@ type PeerEventsSubscriber interface { // EnvelopeEventSubscbriber interface to subscribe for whisper.EnvelopeEvent's. type EnvelopeEventSubscbriber interface { - SubscribeEnvelopeEvents(chan<- whisperv6.EnvelopeEvent) event.Subscription + SubscribeEnvelopeEvents(chan<- whisper.EnvelopeEvent) event.Subscription } type p2pServer interface { @@ -38,12 +39,13 @@ type p2pServer interface { } // NewConnectionManager creates an instance of ConnectionManager. -func NewConnectionManager(server p2pServer, whisper EnvelopeEventSubscbriber, target int) *ConnectionManager { +func NewConnectionManager(server p2pServer, whisper EnvelopeEventSubscbriber, target int, timeout time.Duration) *ConnectionManager { return &ConnectionManager{ - server: server, - whisper: whisper, - connectedTarget: target, - notifications: make(chan []*enode.Node), + server: server, + whisper: whisper, + connectedTarget: target, + notifications: make(chan []*enode.Node), + timeoutWaitAdded: timeout, } } @@ -55,8 +57,9 @@ type ConnectionManager struct { server p2pServer whisper EnvelopeEventSubscbriber - notifications chan []*enode.Node - connectedTarget int + notifications chan []*enode.Node + connectedTarget int + timeoutWaitAdded time.Duration } // Notify sends a non-blocking notification about new nodes. @@ -77,56 +80,45 @@ func (ps *ConnectionManager) Start() { ps.quit = make(chan struct{}) ps.wg.Add(1) go func() { - current := map[enode.ID]*enode.Node{} - connected := map[enode.ID]struct{}{} + state := newInternalState(ps.server, ps.connectedTarget, ps.timeoutWaitAdded) + events := make(chan *p2p.PeerEvent, peerEventsBuffer) sub := ps.server.SubscribeEvents(events) - whisperEvents := make(chan whisperv6.EnvelopeEvent, whisperEventsBuffer) + whisperEvents := make(chan whisper.EnvelopeEvent, whisperEventsBuffer) whisperSub := ps.whisper.SubscribeEnvelopeEvents(whisperEvents) requests := map[common.Hash]struct{}{} + + defer sub.Unsubscribe() + defer whisperSub.Unsubscribe() + defer ps.wg.Done() for { select { case <-ps.quit: - sub.Unsubscribe() - whisperSub.Unsubscribe() - ps.wg.Done() return case err := <-sub.Err(): log.Error("retry after error subscribing to p2p events", "error", err) - sub = ps.server.SubscribeEvents(events) + return case err := <-whisperSub.Err(): log.Error("retry after error suscribing to whisper events", "error", err) - whisperSub = ps.whisper.SubscribeEnvelopeEvents(whisperEvents) + return case newNodes := <-ps.notifications: - replacement := map[enode.ID]*enode.Node{} - for _, n := range newNodes { - replacement[n.ID()] = n - } - replaceNodes(ps.server, ps.connectedTarget, connected, current, replacement) - current = replacement + state.processReplacement(newNodes, events) case ev := <-events: - switch ev.Type { - case p2p.PeerEventTypeAdd: - log.Debug("connected to a mailserver", "address", ev.Peer) - nodeAdded(ps.server, ev.Peer, ps.connectedTarget, connected, current) - case p2p.PeerEventTypeDrop: - log.Debug("mailserver disconnected", "address", ev.Peer) - nodeDisconnected(ps.server, ev.Peer, ps.connectedTarget, connected, current) - } + processPeerEvent(state, ev) case ev := <-whisperEvents: // TODO what about completed but with error? what about expired envelopes? switch ev.Event { - case whisperv6.EventMailServerRequestSent: + case whisper.EventMailServerRequestSent: requests[ev.Hash] = struct{}{} - case whisperv6.EventMailServerRequestCompleted: + case whisper.EventMailServerRequestCompleted: delete(requests, ev.Hash) - case whisperv6.EventMailServerRequestExpired: + case whisper.EventMailServerRequestExpired: _, exist := requests[ev.Hash] if !exist { continue } log.Debug("request to a mail server expired, disconncet a peer", "address", ev.Peer) - nodeDisconnected(ps.server, ev.Peer, ps.connectedTarget, connected, current) + state.nodeDisconnected(ev.Peer) } } } @@ -148,55 +140,127 @@ func (ps *ConnectionManager) Stop() { ps.quit = nil } -func replaceNodes(srv PeerAdderRemover, target int, connected map[enode.ID]struct{}, old, new map[enode.ID]*enode.Node) { - for nid, n := range old { +func (state *internalState) processReplacement(newNodes []*enode.Node, events <-chan *p2p.PeerEvent) { + replacement := map[enode.ID]*enode.Node{} + for _, n := range newNodes { + replacement[n.ID()] = n + } + state.replaceNodes(replacement) + if state.ReachedTarget() { + log.Debug("already connected with required target", "target", state.target) + return + } + if state.timeout != 0 { + log.Debug("waiting defined timeout to establish connections", + "timeout", state.timeout, "target", state.target) + timer := time.NewTimer(state.timeout) + waitForConnections(state, timer.C, events) + timer.Stop() + } +} + +func newInternalState(srv PeerAdderRemover, target int, timeout time.Duration) *internalState { + return &internalState{ + options: options{target: target, timeout: timeout}, + srv: srv, + connected: map[enode.ID]struct{}{}, + currentNodes: map[enode.ID]*enode.Node{}, + } +} + +type options struct { + target int + timeout time.Duration +} + +type internalState struct { + options + srv PeerAdderRemover + + connected map[enode.ID]struct{} + currentNodes map[enode.ID]*enode.Node +} + +func (state *internalState) ReachedTarget() bool { + return len(state.connected) >= state.target +} + +func (state *internalState) replaceNodes(new map[enode.ID]*enode.Node) { + for nid, n := range state.currentNodes { if _, exist := new[nid]; !exist { - if _, exist := connected[nid]; exist { - delete(connected, nid) + if _, exist := state.connected[nid]; exist { + delete(state.connected, nid) } - srv.RemovePeer(n) + state.srv.RemovePeer(n) } } - if len(connected) < target { + if !state.ReachedTarget() { for _, n := range new { - srv.AddPeer(n) + state.srv.AddPeer(n) } } + state.currentNodes = new } -func nodeAdded(srv PeerAdderRemover, peer enode.ID, target int, connected map[enode.ID]struct{}, nodes map[enode.ID]*enode.Node) { - n, exist := nodes[peer] +func (state *internalState) nodeAdded(peer enode.ID) { + n, exist := state.currentNodes[peer] if !exist { return } - if len(connected) == target { - srv.RemovePeer(n) + if state.ReachedTarget() { + state.srv.RemovePeer(n) } else { - connected[n.ID()] = struct{}{} + state.connected[n.ID()] = struct{}{} } } -func nodeDisconnected(srv PeerAdderRemover, peer enode.ID, target int, connected map[enode.ID]struct{}, nodes map[enode.ID]*enode.Node) { - n, exist := nodes[peer] // unrelated event +func (state *internalState) nodeDisconnected(peer enode.ID) { + n, exist := state.currentNodes[peer] // unrelated event if !exist { return } - _, exist = connected[peer] // check if already disconnected + _, exist = state.connected[peer] // check if already disconnected if !exist { return } - if len(nodes) == 1 { // keep node connected if we don't have another choice + if len(state.currentNodes) == 1 { // keep node connected if we don't have another choice return } - srv.RemovePeer(n) // remove peer permanently, otherwise p2p.Server will try to reconnect - delete(connected, peer) - if len(connected) < target { // try to connect with any other selected (but not connected) node - for nid, n := range nodes { - _, exist := connected[nid] + state.srv.RemovePeer(n) // remove peer permanently, otherwise p2p.Server will try to reconnect + delete(state.connected, peer) + if !state.ReachedTarget() { // try to connect with any other selected (but not connected) node + for nid, n := range state.currentNodes { + _, exist := state.connected[nid] if exist || peer == nid { continue } - srv.AddPeer(n) + state.srv.AddPeer(n) } } } + +func processPeerEvent(state *internalState, ev *p2p.PeerEvent) { + switch ev.Type { + case p2p.PeerEventTypeAdd: + log.Debug("connected to a mailserver", "address", ev.Peer) + state.nodeAdded(ev.Peer) + case p2p.PeerEventTypeDrop: + log.Debug("mailserver disconnected", "address", ev.Peer) + state.nodeDisconnected(ev.Peer) + } +} + +func waitForConnections(state *internalState, timeout <-chan time.Time, events <-chan *p2p.PeerEvent) { + for { + select { + case ev := <-events: + processPeerEvent(state, ev) + if state.ReachedTarget() { + return + } + case <-timeout: + return + } + } + +} diff --git a/services/shhext/mailservers/connmanager_test.go b/services/shhext/mailservers/connmanager_test.go index 0e8f45144..ad2eb64ca 100644 --- a/services/shhext/mailservers/connmanager_test.go +++ b/services/shhext/mailservers/connmanager_test.go @@ -11,7 +11,8 @@ import ( "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/status-im/status-go/t/utils" - "github.com/status-im/whisper/whisperv6" + whisper "github.com/status-im/whisper/whisperv6" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -82,10 +83,10 @@ func newFakeServer() *fakePeerEvents { } type fakeEnvelopeEvents struct { - input chan whisperv6.EnvelopeEvent + input chan whisper.EnvelopeEvent } -func (f fakeEnvelopeEvents) SubscribeEnvelopeEvents(output chan<- whisperv6.EnvelopeEvent) event.Subscription { +func (f fakeEnvelopeEvents) SubscribeEnvelopeEvents(output chan<- whisper.EnvelopeEvent) event.Subscription { return event.NewSubscription(func(quit <-chan struct{}) error { for { select { @@ -101,18 +102,22 @@ func (f fakeEnvelopeEvents) SubscribeEnvelopeEvents(output chan<- whisperv6.Enve func newFakeEnvelopesEvents() fakeEnvelopeEvents { return fakeEnvelopeEvents{ - input: make(chan whisperv6.EnvelopeEvent), + input: make(chan whisper.EnvelopeEvent), } } -func getNRandomNodes(t *testing.T, n int) map[enode.ID]*enode.Node { - rst := map[enode.ID]*enode.Node{} - for i := 0; i < n; i++ { - n, err := RandomeNode() +func fillWithRandomNodes(t *testing.T, nodes []*enode.Node) { + var err error + for i := range nodes { + nodes[i], err = RandomNode() require.NoError(t, err) - rst[n.ID()] = n } - return rst +} + +func getMapWithRandomNodes(t *testing.T, n int) map[enode.ID]*enode.Node { + nodes := make([]*enode.Node, n) + fillWithRandomNodes(t, nodes) + return nodesToMap(nodes) } func mergeOldIntoNew(old, new map[enode.ID]*enode.Node) { @@ -131,25 +136,26 @@ func TestReplaceNodes(t *testing.T) { for _, tc := range []testCase{ { "InitialReplace", - getNRandomNodes(t, 0), - getNRandomNodes(t, 3), + getMapWithRandomNodes(t, 0), + getMapWithRandomNodes(t, 3), 2, }, { "FullReplace", - getNRandomNodes(t, 3), - getNRandomNodes(t, 3), + getMapWithRandomNodes(t, 3), + getMapWithRandomNodes(t, 3), 2, }, } { t.Run(tc.description, func(t *testing.T) { peers := newFakePeerAdderRemover() - replaceNodes(peers, tc.target, peers.nodes, nil, tc.old) + state := newInternalState(peers, tc.target, 0) + state.replaceNodes(tc.old) require.Len(t, peers.nodes, len(tc.old)) for n := range peers.nodes { require.Contains(t, tc.old, n) } - replaceNodes(peers, tc.target, peers.nodes, tc.old, tc.new) + state.replaceNodes(tc.new) require.Len(t, peers.nodes, len(tc.new)) for n := range peers.nodes { require.Contains(t, tc.new, n) @@ -160,21 +166,26 @@ func TestReplaceNodes(t *testing.T) { func TestPartialReplaceNodesBelowTarget(t *testing.T) { peers := newFakePeerAdderRemover() - old := getNRandomNodes(t, 1) - new := getNRandomNodes(t, 2) - replaceNodes(peers, 2, peers.nodes, nil, old) + old := getMapWithRandomNodes(t, 1) + new := getMapWithRandomNodes(t, 2) + state := newInternalState(peers, 2, 0) + state.replaceNodes(old) mergeOldIntoNew(old, new) - replaceNodes(peers, 2, peers.nodes, old, new) + state.replaceNodes(new) require.Len(t, peers.nodes, len(new)) } func TestPartialReplaceNodesAboveTarget(t *testing.T) { peers := newFakePeerAdderRemover() - old := getNRandomNodes(t, 1) - new := getNRandomNodes(t, 2) - replaceNodes(peers, 1, peers.nodes, nil, old) + initial, err := RandomNode() + require.NoError(t, err) + old := nodesToMap([]*enode.Node{initial}) + new := getMapWithRandomNodes(t, 2) + state := newInternalState(peers, 1, 0) + state.replaceNodes(old) + state.nodeAdded(initial.ID()) mergeOldIntoNew(old, new) - replaceNodes(peers, 1, peers.nodes, old, new) + state.replaceNodes(new) require.Len(t, peers.nodes, 1) } @@ -182,11 +193,11 @@ func TestConnectionManagerAddDrop(t *testing.T) { server := newFakeServer() whisper := newFakeEnvelopesEvents() target := 1 - connmanager := NewConnectionManager(server, whisper, target) + connmanager := NewConnectionManager(server, whisper, target, 0) connmanager.Start() defer connmanager.Stop() nodes := []*enode.Node{} - for _, n := range getNRandomNodes(t, 3) { + for _, n := range getMapWithRandomNodes(t, 3) { nodes = append(nodes, n) } // Send 3 random nodes to connection manager. @@ -224,11 +235,11 @@ func TestConnectionManagerReplace(t *testing.T) { server := newFakeServer() whisper := newFakeEnvelopesEvents() target := 1 - connmanager := NewConnectionManager(server, whisper, target) + connmanager := NewConnectionManager(server, whisper, target, 0) connmanager.Start() defer connmanager.Stop() nodes := []*enode.Node{} - for _, n := range getNRandomNodes(t, 3) { + for _, n := range getMapWithRandomNodes(t, 3) { nodes = append(nodes, n) } // Send a single node to connection manager. @@ -264,13 +275,13 @@ func TestConnectionManagerReplace(t *testing.T) { func TestConnectionChangedAfterExpiry(t *testing.T) { server := newFakeServer() - whisper := newFakeEnvelopesEvents() + whisperMock := newFakeEnvelopesEvents() target := 1 - connmanager := NewConnectionManager(server, whisper, target) + connmanager := NewConnectionManager(server, whisperMock, target, 0) connmanager.Start() defer connmanager.Stop() nodes := []*enode.Node{} - for _, n := range getNRandomNodes(t, 2) { + for _, n := range getMapWithRandomNodes(t, 2) { nodes = append(nodes, n) } // Send two random nodes to connection manager. @@ -288,15 +299,15 @@ func TestConnectionChangedAfterExpiry(t *testing.T) { hash := common.Hash{1} // Send event that history request for connected peer was sent. select { - case whisper.input <- whisperv6.EnvelopeEvent{ - Event: whisperv6.EventMailServerRequestSent, Peer: initial, Hash: hash}: + case whisperMock.input <- whisper.EnvelopeEvent{ + Event: whisper.EventMailServerRequestSent, Peer: initial, Hash: hash}: case <-time.After(time.Second): require.FailNow(t, "can't send a 'sent' event") } // And eventually expired. select { - case whisper.input <- whisperv6.EnvelopeEvent{ - Event: whisperv6.EventMailServerRequestExpired, Peer: initial, Hash: hash}: + case whisperMock.input <- whisper.EnvelopeEvent{ + Event: whisper.EventMailServerRequestExpired, Peer: initial, Hash: hash}: case <-time.After(time.Second): require.FailNow(t, "can't send an 'expiry' event") } @@ -311,3 +322,23 @@ func TestConnectionChangedAfterExpiry(t *testing.T) { return nil }, time.Second, 100*time.Millisecond)) } + +func TestProcessReplacementWaitsForConnections(t *testing.T) { + srv := newFakePeerAdderRemover() + target := 1 + timeout := time.Second + nodes := make([]*enode.Node, 2) + fillWithRandomNodes(t, nodes) + events := make(chan *p2p.PeerEvent) + state := newInternalState(srv, target, timeout) + state.currentNodes = nodesToMap(nodes) + go func() { + select { + case events <- &p2p.PeerEvent{Peer: nodes[0].ID(), Type: p2p.PeerEventTypeAdd}: + case <-time.After(time.Second): + assert.FailNow(t, "can't send a drop event") + } + }() + state.processReplacement(nodes, events) + require.Len(t, state.connected, 1) +} diff --git a/services/shhext/mailservers/connmonitor.go b/services/shhext/mailservers/connmonitor.go new file mode 100644 index 000000000..3f9612f11 --- /dev/null +++ b/services/shhext/mailservers/connmonitor.go @@ -0,0 +1,85 @@ +package mailservers + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + whisper "github.com/status-im/whisper/whisperv6" +) + +// NewLastUsedConnectionMonitor returns pointer to the instance of LastUsedConnectionMonitor. +func NewLastUsedConnectionMonitor(ps *PeerStore, cache *Cache, whisper EnvelopeEventSubscbriber) *LastUsedConnectionMonitor { + return &LastUsedConnectionMonitor{ + ps: ps, + cache: cache, + whisper: whisper, + } +} + +// LastUsedConnectionMonitor watches relevant events and reflects it in cache. +type LastUsedConnectionMonitor struct { + ps *PeerStore + cache *Cache + + whisper EnvelopeEventSubscbriber + + quit chan struct{} + wg sync.WaitGroup +} + +// Start spins a separate goroutine to watch connections. +func (mon *LastUsedConnectionMonitor) Start() { + mon.quit = make(chan struct{}) + mon.wg.Add(1) + go func() { + events := make(chan whisper.EnvelopeEvent, whisperEventsBuffer) + sub := mon.whisper.SubscribeEnvelopeEvents(events) + defer sub.Unsubscribe() + defer mon.wg.Done() + for { + select { + case <-mon.quit: + return + case err := <-sub.Err(): + log.Error("retry after error suscribing to whisper events", "error", err) + return + case ev := <-events: + node := mon.ps.Get(ev.Peer) + if node == nil { + continue + } + if ev.Event == whisper.EventMailServerRequestCompleted { + err := mon.updateRecord(ev.Peer) + if err != nil { + log.Error("unable to update storage", "peer", ev.Peer, "error", err) + } + } + } + } + }() +} + +func (mon *LastUsedConnectionMonitor) updateRecord(nodeID enode.ID) error { + node := mon.ps.Get(nodeID) + if node == nil { + return nil + } + return mon.cache.UpdateRecord(PeerRecord{node: node, LastUsed: time.Now()}) +} + +// Stop closes channel to signal a quit and waits until all goroutines are stoppped. +func (mon *LastUsedConnectionMonitor) Stop() { + if mon.quit == nil { + return + } + select { + case <-mon.quit: + return + default: + } + close(mon.quit) + mon.wg.Wait() + mon.quit = nil +} diff --git a/services/shhext/mailservers/connmonitor_test.go b/services/shhext/mailservers/connmonitor_test.go new file mode 100644 index 000000000..3fa640f75 --- /dev/null +++ b/services/shhext/mailservers/connmonitor_test.go @@ -0,0 +1,79 @@ +package mailservers + +import ( + "fmt" + "sort" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/status-im/status-go/t/utils" + whisper "github.com/status-im/whisper/whisperv6" + "github.com/stretchr/testify/require" +) + +func TestUsedConnectionPersisted(t *testing.T) { + nodes := make([]*enode.Node, 2) + fillWithRandomNodes(t, nodes) + + cache := newInMemCache(t) + store := NewPeerStore(cache) + require.NoError(t, store.Update(nodes)) + whisperMock := newFakeEnvelopesEvents() + monitor := NewLastUsedConnectionMonitor(store, cache, whisperMock) + monitor.Start() + + // Send a confirmation that we received history from one of the peers. + select { + case whisperMock.input <- whisper.EnvelopeEvent{ + Event: whisper.EventMailServerRequestCompleted, Peer: nodes[0].ID()}: + case <-time.After(time.Second): + require.FailNow(t, "can't send a 'completed' event") + } + + // Wait until records will be updated in the cache. + require.NoError(t, utils.Eventually(func() error { + records, err := cache.LoadAll() + if err != nil { + return err + } + if lth := len(records); lth != 2 { + return fmt.Errorf("unexpected length of all records stored in the cache. expected %d got %d", 2, lth) + } + var used bool + for _, r := range records { + if r.Node().ID() == nodes[0].ID() { + used = !r.LastUsed.IsZero() + } + } + if !used { + return fmt.Errorf("record %s is not marked as used", nodes[0].ID()) + } + return nil + }, time.Second, 100*time.Millisecond)) + + // Use different peer, first will be marked as unused. + select { + case whisperMock.input <- whisper.EnvelopeEvent{ + Event: whisper.EventMailServerRequestCompleted, Peer: nodes[1].ID()}: + case <-time.After(time.Second): + require.FailNow(t, "can't send a 'completed' event") + } + + require.NoError(t, utils.Eventually(func() error { + records, err := cache.LoadAll() + if err != nil { + return err + } + if lth := len(records); lth != 2 { + return fmt.Errorf("unexpected length of all records stored in the cache. expected %d got %d", 2, lth) + } + sort.Slice(records, func(i, j int) bool { + return records[i].LastUsed.After(records[j].LastUsed) + }) + if records[0].Node().ID() != nodes[1].ID() { + return fmt.Errorf("record wasn't updated after previous event") + } + return nil + }, time.Second, 100*time.Millisecond)) +} diff --git a/services/shhext/mailservers/peerstore.go b/services/shhext/mailservers/peerstore.go index e545d9f79..fdfde7ef3 100644 --- a/services/shhext/mailservers/peerstore.go +++ b/services/shhext/mailservers/peerstore.go @@ -19,49 +19,43 @@ type PeersProvider interface { } // NewPeerStore returns an instance of PeerStore. -func NewPeerStore() *PeerStore { - return &PeerStore{nodes: map[enode.ID]*enode.Node{}} +func NewPeerStore(cache *Cache) *PeerStore { + return &PeerStore{ + nodes: map[enode.ID]*enode.Node{}, + cache: cache, + } } // PeerStore stores list of selected mail servers and keeps N of them connected. type PeerStore struct { mu sync.RWMutex nodes map[enode.ID]*enode.Node + + cache *Cache } // Exist confirms that peers was added to a store. -func (ps *PeerStore) Exist(peer enode.ID) bool { +func (ps *PeerStore) Exist(nodeID enode.ID) bool { ps.mu.RLock() defer ps.mu.RUnlock() - _, exist := ps.nodes[peer] + _, exist := ps.nodes[nodeID] return exist } // Get returns instance of the node with requested ID or nil if ID is not found. -func (ps *PeerStore) Get(peer enode.ID) *enode.Node { +func (ps *PeerStore) Get(nodeID enode.ID) *enode.Node { ps.mu.RLock() defer ps.mu.RUnlock() - return ps.nodes[peer] + return ps.nodes[nodeID] } // Update updates peers locally. -func (ps *PeerStore) Update(nodes []*enode.Node) { +func (ps *PeerStore) Update(nodes []*enode.Node) error { ps.mu.Lock() - defer ps.mu.Unlock() ps.nodes = map[enode.ID]*enode.Node{} for _, n := range nodes { ps.nodes[n.ID()] = n } -} - -// GetFirstConnected returns first connected peer that is also added to a peer store. -// Raises ErrNoConnected if no peers are added to a peer store. -func GetFirstConnected(provider PeersProvider, store *PeerStore) (*enode.Node, error) { - peers := provider.Peers() - for _, p := range peers { - if store.Exist(p.ID()) { - return p.Node(), nil - } - } - return nil, ErrNoConnected + ps.mu.Unlock() + return ps.cache.Replace(nodes) } diff --git a/services/shhext/mailservers/peerstore_test.go b/services/shhext/mailservers/peerstore_test.go index 111b826c3..48d7bdbaf 100644 --- a/services/shhext/mailservers/peerstore_test.go +++ b/services/shhext/mailservers/peerstore_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" ) -func RandomeNode() (*enode.Node, error) { +func RandomNode() (*enode.Node, error) { pkey, err := crypto.GenerateKey() if err != nil { return nil, err @@ -18,24 +18,24 @@ func RandomeNode() (*enode.Node, error) { } func TestUpdateResetsInternalStorage(t *testing.T) { - store := NewPeerStore() - r1, err := RandomeNode() + store := NewPeerStore(newInMemCache(t)) + r1, err := RandomNode() require.NoError(t, err) - r2, err := RandomeNode() + r2, err := RandomNode() require.NoError(t, err) - store.Update([]*enode.Node{r1, r2}) + require.NoError(t, store.Update([]*enode.Node{r1, r2})) require.True(t, store.Exist(r1.ID())) require.True(t, store.Exist(r2.ID())) - store.Update([]*enode.Node{r2}) + require.NoError(t, store.Update([]*enode.Node{r2})) require.False(t, store.Exist(r1.ID())) require.True(t, store.Exist(r2.ID())) } func TestGetNodeByID(t *testing.T) { - store := NewPeerStore() - r1, err := RandomeNode() + store := NewPeerStore(newInMemCache(t)) + r1, err := RandomNode() require.NoError(t, err) - store.Update([]*enode.Node{r1}) + require.NoError(t, store.Update([]*enode.Node{r1})) require.Equal(t, r1, store.Get(r1.ID())) require.Nil(t, store.Get(enode.ID{1})) } @@ -50,28 +50,7 @@ func (f fakePeerProvider) Peers() []*p2p.Peer { func TestNoConnected(t *testing.T) { provider := fakePeerProvider{} - store := NewPeerStore() + store := NewPeerStore(newInMemCache(t)) _, err := GetFirstConnected(provider, store) require.EqualError(t, ErrNoConnected, err.Error()) } - -func TestGetFirstConnected(t *testing.T) { - numPeers := 3 - nodes := make([]*enode.Node, numPeers) - peers := make([]*p2p.Peer, numPeers) - nodesMap := getNRandomNodes(t, numPeers) - i := 0 - for _, node := range nodesMap { - nodes[i] = node - peers[i] = p2p.NewPeer(node.ID(), node.ID().String(), nil) - i++ - } - store := NewPeerStore() - provider := fakePeerProvider{peers} - _, err := GetFirstConnected(provider, store) - require.EqualError(t, ErrNoConnected, err.Error()) - store.Update(nodes) - node, err := GetFirstConnected(provider, store) - require.NoError(t, err) - require.Contains(t, nodesMap, node.ID()) -} diff --git a/services/shhext/mailservers/utils.go b/services/shhext/mailservers/utils.go new file mode 100644 index 000000000..c8e3557af --- /dev/null +++ b/services/shhext/mailservers/utils.go @@ -0,0 +1,52 @@ +package mailservers + +import ( + "sort" + + "github.com/ethereum/go-ethereum/p2p/enode" +) + +// GetFirstConnected returns first connected peer that is also added to a peer store. +// Raises ErrNoConnected if no peers are added to a peer store. +func GetFirstConnected(provider PeersProvider, store *PeerStore) (*enode.Node, error) { + peers := provider.Peers() + for _, p := range peers { + if store.Exist(p.ID()) { + return p.Node(), nil + } + } + return nil, ErrNoConnected +} + +// NodesNotifee interface to be notified when new nodes are received. +type NodesNotifee interface { + Notify([]*enode.Node) +} + +// EnsureUsedRecordsAddedFirst checks if any nodes were marked as connected before app went offline. +func EnsureUsedRecordsAddedFirst(ps *PeerStore, conn NodesNotifee) error { + records, err := ps.cache.LoadAll() + if err != nil { + return err + } + if len(records) == 0 { + return nil + } + sort.Slice(records, func(i, j int) bool { + return records[i].LastUsed.After(records[j].LastUsed) + }) + all := recordsToNodes(records) + if !records[0].LastUsed.IsZero() { + conn.Notify(all[:1]) + } + conn.Notify(all) + return nil +} + +func recordsToNodes(records []PeerRecord) []*enode.Node { + nodes := make([]*enode.Node, len(records)) + for i := range records { + nodes[i] = records[i].Node() + } + return nodes +} diff --git a/services/shhext/mailservers/utils_test.go b/services/shhext/mailservers/utils_test.go new file mode 100644 index 000000000..8290af27d --- /dev/null +++ b/services/shhext/mailservers/utils_test.go @@ -0,0 +1,55 @@ +package mailservers + +import ( + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/stretchr/testify/require" +) + +func TestGetFirstConnected(t *testing.T) { + numPeers := 3 + nodes := make([]*enode.Node, numPeers) + peers := make([]*p2p.Peer, numPeers) + nodesMap := getMapWithRandomNodes(t, numPeers) + i := 0 + for _, node := range nodesMap { + nodes[i] = node + peers[i] = p2p.NewPeer(node.ID(), node.ID().String(), nil) + i++ + } + store := NewPeerStore(newInMemCache(t)) + provider := fakePeerProvider{peers} + _, err := GetFirstConnected(provider, store) + require.EqualError(t, ErrNoConnected, err.Error()) + require.NoError(t, store.Update(nodes)) + node, err := GetFirstConnected(provider, store) + require.NoError(t, err) + require.Contains(t, nodesMap, node.ID()) +} + +type trackingNodeNotifee struct { + calls [][]*enode.Node +} + +func (t *trackingNodeNotifee) Notify(nodes []*enode.Node) { + t.calls = append(t.calls, nodes) +} + +func TestEnsureNewRecordsAddedFirst(t *testing.T) { + notifee := new(trackingNodeNotifee) + store := NewPeerStore(newInMemCache(t)) + nodes := make([]*enode.Node, 3) + fillWithRandomNodes(t, nodes) + require.NoError(t, store.Update(nodes)) + record := NewPeerRecord(nodes[0]) + record.LastUsed = time.Now() + require.NoError(t, store.cache.UpdateRecord(record)) + require.NoError(t, EnsureUsedRecordsAddedFirst(store, notifee)) + require.Len(t, notifee.calls, 2) + require.Len(t, notifee.calls[0], 1) + require.Equal(t, nodes[0].ID(), notifee.calls[0][0].ID()) + require.Len(t, notifee.calls[1], 3) +} diff --git a/services/shhext/service.go b/services/shhext/service.go index 6bb9ee8f9..92cb0b2c0 100644 --- a/services/shhext/service.go +++ b/services/shhext/service.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto/sha3" @@ -23,6 +24,8 @@ import ( const ( // defaultConnectionsTarget used in Service.Start if configured connection target is 0. defaultConnectionsTarget = 1 + // defaultTimeoutWaitAdded is a timeout to use to establish initial connections. + defaultTimeoutWaitAdded = 5 * time.Second ) var errProtocolNotInitialized = errors.New("procotol is not initialized") @@ -49,8 +52,10 @@ type Service struct { installationID string pfsEnabled bool - peerStore *mailservers.PeerStore - connManager *mailservers.ConnectionManager + peerStore *mailservers.PeerStore + cache *mailservers.Cache + connManager *mailservers.ConnectionManager + lastUsedMonitor *mailservers.LastUsedConnectionMonitor } type ServiceConfig struct { @@ -60,6 +65,7 @@ type ServiceConfig struct { PFSEnabled bool MailServerConfirmations bool EnableConnectionManager bool + EnableLastUsedMonitor bool ConnectionTarget int } @@ -68,7 +74,8 @@ var _ node.Service = (*Service)(nil) // New returns a new Service. dataDir is a folder path to a network-independent location func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, config *ServiceConfig) *Service { - ps := mailservers.NewPeerStore() + cache := mailservers.NewCache(db) + ps := mailservers.NewPeerStore(cache) track := &tracker{ w: w, handler: handler, @@ -87,15 +94,19 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, conf installationID: config.InstallationID, pfsEnabled: config.PFSEnabled, peerStore: ps, + cache: cache, } } // UpdateMailservers updates information about selected mail servers. -func (s *Service) UpdateMailservers(nodes []*enode.Node) { - s.peerStore.Update(nodes) +func (s *Service) UpdateMailservers(nodes []*enode.Node) error { + if err := s.peerStore.Update(nodes); err != nil { + return err + } if s.connManager != nil { s.connManager.Notify(nodes) } + return nil } // Protocols returns a new protocols list. In this case, there are none. @@ -212,8 +223,15 @@ func (s *Service) Start(server *p2p.Server) error { if connectionsTarget == 0 { connectionsTarget = defaultConnectionsTarget } - s.connManager = mailservers.NewConnectionManager(server, s.w, connectionsTarget) + s.connManager = mailservers.NewConnectionManager(server, s.w, connectionsTarget, defaultTimeoutWaitAdded) s.connManager.Start() + if err := mailservers.EnsureUsedRecordsAddedFirst(s.peerStore, s.connManager); err != nil { + return err + } + } + if s.config.EnableLastUsedMonitor { + s.lastUsedMonitor = mailservers.NewLastUsedConnectionMonitor(s.peerStore, s.cache, s.w) + s.lastUsedMonitor.Start() } s.tracker.Start() s.nodeID = server.PrivateKey @@ -227,6 +245,9 @@ func (s *Service) Stop() error { if s.config.EnableConnectionManager { s.connManager.Stop() } + if s.config.EnableLastUsedMonitor { + s.lastUsedMonitor.Stop() + } s.tracker.Stop() return nil } diff --git a/services/shhext/service_test.go b/services/shhext/service_test.go index 0b39dd882..b48b8bba5 100644 --- a/services/shhext/service_test.go +++ b/services/shhext/service_test.go @@ -17,6 +17,8 @@ import ( "github.com/status-im/status-go/t/helpers" whisper "github.com/status-im/whisper/whisperv6" "github.com/stretchr/testify/suite" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/storage" ) func newHandlerMock(buf int) handlerMock { @@ -102,7 +104,9 @@ func (s *ShhExtSuite) SetupTest() { MailServerConfirmations: true, ConnectionTarget: 10, } - s.services[i] = New(s.whisper[i], nil, nil, config) + db, err := leveldb.Open(storage.NewMemStorage(), nil) + s.Require().NoError(err) + s.services[i] = New(s.whisper[i], nil, db, config) s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) { return s.services[i], nil })) @@ -120,7 +124,7 @@ func (s *ShhExtSuite) TestInitProtocol() { func (s *ShhExtSuite) TestPostMessageWithConfirmation() { mock := newHandlerMock(1) s.services[0].tracker.handler = mock - s.services[0].UpdateMailservers([]*enode.Node{s.nodes[1].Server().Self()}) + s.Require().NoError(s.services[0].UpdateMailservers([]*enode.Node{s.nodes[1].Server().Self()})) s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self()) symID, err := s.whisper[0].GenerateSymKey() s.NoError(err) diff --git a/services/shhext/tracker_test.go b/services/shhext/tracker_test.go index ad530aaf3..5150b9d1e 100644 --- a/services/shhext/tracker_test.go +++ b/services/shhext/tracker_test.go @@ -11,6 +11,8 @@ import ( "github.com/status-im/status-go/services/shhext/mailservers" whisper "github.com/status-im/whisper/whisperv6" "github.com/stretchr/testify/suite" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/storage" ) var ( @@ -28,10 +30,12 @@ type TrackerSuite struct { } func (s *TrackerSuite) SetupTest() { + db, err := leveldb.Open(storage.NewMemStorage(), nil) + s.Require().NoError(err) s.tracker = &tracker{ cache: map[common.Hash]EnvelopeState{}, batches: map[common.Hash]map[common.Hash]struct{}{}, - mailPeers: mailservers.NewPeerStore(), + mailPeers: mailservers.NewPeerStore(mailservers.NewCache(db)), } } @@ -52,7 +56,7 @@ func (s *TrackerSuite) TestConfirmedWithAcknowledge() { pkey, err := crypto.GenerateKey() s.Require().NoError(err) node := enode.NewV4(&pkey.PublicKey, nil, 0, 0) - s.tracker.mailPeers.Update([]*enode.Node{node}) + s.Require().NoError(s.tracker.mailPeers.Update([]*enode.Node{node})) s.tracker.Add(testHash) s.Contains(s.tracker.cache, testHash) s.Equal(EnvelopePosted, s.tracker.cache[testHash])