mirror of
https://github.com/status-im/go-waku.git
synced 2025-02-23 10:38:29 +00:00
feat: storenode cycle (#1223)
This commit is contained in:
parent
0ed94ce0b1
commit
76275f6fb8
@ -74,6 +74,14 @@ func (t timestamp) String() string {
|
||||
return time.Unix(0, int64(t)).Format(time.RFC3339)
|
||||
}
|
||||
|
||||
func Timep(key string, time *int64) zapcore.Field {
|
||||
if time == nil {
|
||||
return zap.String(key, "-")
|
||||
} else {
|
||||
return Time(key, *time)
|
||||
}
|
||||
}
|
||||
|
||||
func Epoch(key string, time time.Time) zap.Field {
|
||||
return zap.String(key, fmt.Sprintf("%d", time.UnixNano()))
|
||||
}
|
||||
|
524
waku/v2/api/history/cycle.go
Normal file
524
waku/v2/api/history/cycle.go
Normal file
@ -0,0 +1,524 @@
|
||||
package history
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/big"
|
||||
"net"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const defaultBackoff = 10 * time.Second
|
||||
const graylistBackoff = 3 * time.Minute
|
||||
const storenodeVerificationInterval = time.Second
|
||||
const storenodeMaxFailedRequests uint = 2
|
||||
const minStorenodesToChooseFrom = 3
|
||||
const isAndroidEmulator = runtime.GOOS == "android" && runtime.GOARCH == "amd64"
|
||||
const findNearestMailServer = !isAndroidEmulator
|
||||
const overrideDNS = runtime.GOOS == "android" || runtime.GOOS == "ios"
|
||||
const bootstrapDNS = "8.8.8.8:53"
|
||||
|
||||
type connStatus int
|
||||
|
||||
const (
|
||||
disconnected connStatus = iota + 1
|
||||
connected
|
||||
)
|
||||
|
||||
type peerStatus struct {
|
||||
status connStatus
|
||||
canConnectAfter time.Time
|
||||
lastConnectionAttempt time.Time
|
||||
}
|
||||
|
||||
type StorenodeConfigProvider interface {
|
||||
UseStorenodes() (bool, error)
|
||||
GetPinnedStorenode() (peer.ID, error)
|
||||
Storenodes() ([]peer.ID, error)
|
||||
}
|
||||
|
||||
type StorenodeCycle struct {
|
||||
sync.RWMutex
|
||||
|
||||
logger *zap.Logger
|
||||
|
||||
host host.Host
|
||||
|
||||
storenodeConfigProvider StorenodeConfigProvider
|
||||
|
||||
StorenodeAvailableOneshotEmitter *OneShotEmitter[struct{}]
|
||||
StorenodeChangedEmitter *Emitter[peer.ID]
|
||||
StorenodeNotWorkingEmitter *Emitter[struct{}]
|
||||
StorenodeAvailableEmitter *Emitter[peer.ID]
|
||||
|
||||
failedRequests map[peer.ID]uint
|
||||
|
||||
peersMutex sync.RWMutex
|
||||
activeStorenode peer.ID
|
||||
peers map[peer.ID]peerStatus
|
||||
}
|
||||
|
||||
func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle {
|
||||
return &StorenodeCycle{
|
||||
StorenodeAvailableOneshotEmitter: NewOneshotEmitter[struct{}](),
|
||||
StorenodeChangedEmitter: NewEmitter[peer.ID](),
|
||||
StorenodeNotWorkingEmitter: NewEmitter[struct{}](),
|
||||
StorenodeAvailableEmitter: NewEmitter[peer.ID](),
|
||||
logger: logger.Named("storenode-cycle"),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) Start(ctx context.Context, h host.Host) {
|
||||
m.logger.Debug("starting storenode cycle")
|
||||
m.host = h
|
||||
m.failedRequests = make(map[peer.ID]uint)
|
||||
m.peers = make(map[peer.ID]peerStatus)
|
||||
|
||||
go m.verifyStorenodeStatus(ctx)
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) DisconnectActiveStorenode(backoff time.Duration) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
m.disconnectActiveStorenode(backoff)
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) connectToNewStorenodeAndWait(ctx context.Context) error {
|
||||
// Handle pinned storenodes
|
||||
m.logger.Info("disconnecting storenode")
|
||||
pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode()
|
||||
if err != nil {
|
||||
m.logger.Error("could not obtain the pinned storenode", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// If no pinned storenode, no need to disconnect and wait for it to be available
|
||||
if pinnedStorenode == "" {
|
||||
m.disconnectActiveStorenode(graylistBackoff)
|
||||
}
|
||||
|
||||
return m.findNewStorenode(ctx)
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) disconnectStorenode(backoffDuration time.Duration) error {
|
||||
if m.activeStorenode == "" {
|
||||
m.logger.Info("no active storenode")
|
||||
return nil
|
||||
}
|
||||
|
||||
m.logger.Info("disconnecting active storenode", zap.Stringer("peerID", m.activeStorenode))
|
||||
|
||||
m.peersMutex.Lock()
|
||||
pInfo, ok := m.peers[m.activeStorenode]
|
||||
if ok {
|
||||
pInfo.status = disconnected
|
||||
pInfo.canConnectAfter = time.Now().Add(backoffDuration)
|
||||
m.peers[m.activeStorenode] = pInfo
|
||||
} else {
|
||||
m.peers[m.activeStorenode] = peerStatus{
|
||||
status: disconnected,
|
||||
canConnectAfter: time.Now().Add(backoffDuration),
|
||||
}
|
||||
}
|
||||
m.peersMutex.Unlock()
|
||||
|
||||
m.activeStorenode = ""
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) disconnectActiveStorenode(backoffDuration time.Duration) {
|
||||
err := m.disconnectStorenode(backoffDuration)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to disconnect storenode", zap.Error(err))
|
||||
}
|
||||
|
||||
m.StorenodeChangedEmitter.Emit("")
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) Cycle(ctx context.Context) {
|
||||
if m.storenodeConfigProvider == nil {
|
||||
m.logger.Debug("storenodeConfigProvider not yet setup")
|
||||
return
|
||||
}
|
||||
|
||||
m.logger.Info("Automatically switching storenode")
|
||||
|
||||
if m.activeStorenode != "" {
|
||||
m.disconnectActiveStorenode(graylistBackoff)
|
||||
}
|
||||
|
||||
useStorenode, err := m.storenodeConfigProvider.UseStorenodes()
|
||||
if err != nil {
|
||||
m.logger.Error("failed to get use storenodes", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if !useStorenode {
|
||||
m.logger.Info("Skipping storenode search due to useStorenode being false")
|
||||
return
|
||||
}
|
||||
|
||||
err = m.findNewStorenode(ctx)
|
||||
if err != nil {
|
||||
m.logger.Error("Error getting new storenode", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func poolSize(fleetSize int) int {
|
||||
return int(math.Ceil(float64(fleetSize) / 4))
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.ID) []peer.ID {
|
||||
availableStorenodes := make(map[peer.ID]time.Duration)
|
||||
availableStorenodesMutex := sync.Mutex{}
|
||||
availableStorenodesWg := sync.WaitGroup{}
|
||||
for _, storenode := range allStorenodes {
|
||||
availableStorenodesWg.Add(1)
|
||||
go func(peerID peer.ID) {
|
||||
defer availableStorenodesWg.Done()
|
||||
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
|
||||
defer cancel()
|
||||
|
||||
rtt, err := m.pingPeer(ctx, peerID)
|
||||
if err == nil { // pinging storenodes might fail, but we don't care
|
||||
availableStorenodesMutex.Lock()
|
||||
availableStorenodes[peerID] = rtt
|
||||
availableStorenodesMutex.Unlock()
|
||||
}
|
||||
}(storenode)
|
||||
}
|
||||
availableStorenodesWg.Wait()
|
||||
|
||||
if len(availableStorenodes) == 0 {
|
||||
m.logger.Warn("No storenodes available") // Do nothing..
|
||||
return nil
|
||||
}
|
||||
|
||||
var sortedStorenodes []SortedStorenode
|
||||
for storenodeID, rtt := range availableStorenodes {
|
||||
sortedStorenode := SortedStorenode{
|
||||
Storenode: storenodeID,
|
||||
RTT: rtt,
|
||||
}
|
||||
m.peersMutex.Lock()
|
||||
pInfo, ok := m.peers[storenodeID]
|
||||
m.peersMutex.Unlock()
|
||||
if ok && time.Now().Before(pInfo.canConnectAfter) {
|
||||
continue // We can't connect to this node yet
|
||||
}
|
||||
sortedStorenodes = append(sortedStorenodes, sortedStorenode)
|
||||
}
|
||||
sort.Sort(byRTTMsAndCanConnectBefore(sortedStorenodes))
|
||||
|
||||
result := make([]peer.ID, len(sortedStorenodes))
|
||||
for i, s := range sortedStorenodes {
|
||||
result[i] = s.Storenode
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) pingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
|
||||
pingResultCh := ping.Ping(ctx, m.host, peerID)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
case r := <-pingResultCh:
|
||||
if r.Error != nil {
|
||||
return 0, r.Error
|
||||
}
|
||||
return r.RTT, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
|
||||
// we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581
|
||||
if overrideDNS {
|
||||
var dialer net.Dialer
|
||||
net.DefaultResolver = &net.Resolver{
|
||||
PreferGo: false,
|
||||
Dial: func(context context.Context, _, _ string) (net.Conn, error) {
|
||||
conn, err := dialer.DialContext(context, "udp", bootstrapDNS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return conn, nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode()
|
||||
if err != nil {
|
||||
m.logger.Error("Could not obtain the pinned storenode", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if pinnedStorenode != "" {
|
||||
return m.setActiveStorenode(pinnedStorenode)
|
||||
}
|
||||
|
||||
m.logger.Info("Finding a new storenode..")
|
||||
|
||||
allStorenodes, err := m.storenodeConfigProvider.Storenodes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: remove this check once sockets are stable on x86_64 emulators
|
||||
if findNearestMailServer {
|
||||
allStorenodes = m.getAvailableStorenodesSortedByRTT(ctx, allStorenodes)
|
||||
}
|
||||
|
||||
// Picks a random storenode amongs the ones with the lowest latency
|
||||
// The pool size is 1/4 of the storenodes were pinged successfully
|
||||
// If the pool size is less than `minStorenodesToChooseFrom`, it will
|
||||
// pick a storenode fromm all the available storenodes
|
||||
pSize := poolSize(len(allStorenodes) - 1)
|
||||
if pSize <= minStorenodesToChooseFrom {
|
||||
pSize = len(allStorenodes)
|
||||
if pSize <= 0 {
|
||||
m.logger.Warn("No storenodes available") // Do nothing..
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ms := allStorenodes[r.Int64()]
|
||||
return m.setActiveStorenode(ms)
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus {
|
||||
m.peersMutex.RLock()
|
||||
defer m.peersMutex.RUnlock()
|
||||
|
||||
peer, ok := m.peers[peerID]
|
||||
if !ok {
|
||||
return disconnected
|
||||
}
|
||||
return peer.status
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) setActiveStorenode(peerID peer.ID) error {
|
||||
m.activeStorenode = peerID
|
||||
|
||||
m.StorenodeChangedEmitter.Emit(m.activeStorenode)
|
||||
|
||||
storenodeStatus := m.storenodeStatus(peerID)
|
||||
if storenodeStatus != connected {
|
||||
m.peersMutex.Lock()
|
||||
m.peers[peerID] = peerStatus{
|
||||
status: connected,
|
||||
lastConnectionAttempt: time.Now(),
|
||||
canConnectAfter: time.Now().Add(defaultBackoff),
|
||||
}
|
||||
m.peersMutex.Unlock()
|
||||
|
||||
m.failedRequests[peerID] = 0
|
||||
m.logger.Info("storenode available", zap.Stringer("peerID", m.activeStorenode))
|
||||
|
||||
m.StorenodeAvailableOneshotEmitter.Emit(struct{}{}) // Maybe can be refactored away?
|
||||
m.StorenodeAvailableEmitter.Emit(m.activeStorenode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) GetActiveStorenode() peer.ID {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
return m.activeStorenode
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) IsStorenodeAvailable(peerID peer.ID) bool {
|
||||
return m.storenodeStatus(peerID) == connected
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) penalizeStorenode(id peer.ID) {
|
||||
m.peersMutex.Lock()
|
||||
defer m.peersMutex.Unlock()
|
||||
pInfo, ok := m.peers[id]
|
||||
if !ok {
|
||||
pInfo.status = disconnected
|
||||
}
|
||||
|
||||
pInfo.canConnectAfter = time.Now().Add(graylistBackoff)
|
||||
m.peers[id] = pInfo
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) verifyStorenodeStatus(ctx context.Context) {
|
||||
ticker := time.NewTicker(storenodeVerificationInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
err := m.disconnectStorenodeIfRequired(ctx)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to handle storenode cycle event", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) disconnectStorenodeIfRequired(ctx context.Context) error {
|
||||
m.logger.Debug("wakuV2 storenode status verification")
|
||||
|
||||
if m.activeStorenode == "" {
|
||||
// No active storenode, find a new one
|
||||
m.Cycle(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check whether we want to disconnect the active storenode
|
||||
if m.failedRequests[m.activeStorenode] >= storenodeMaxFailedRequests {
|
||||
m.penalizeStorenode(m.activeStorenode)
|
||||
m.StorenodeNotWorkingEmitter.Emit(struct{}{})
|
||||
|
||||
m.logger.Info("too many failed requests", zap.Stringer("storenode", m.activeStorenode))
|
||||
m.failedRequests[m.activeStorenode] = 0
|
||||
return m.connectToNewStorenodeAndWait(ctx)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) SetStorenodeConfigProvider(provider StorenodeConfigProvider) {
|
||||
m.storenodeConfigProvider = provider
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout time.Duration) bool {
|
||||
// Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start.
|
||||
// This can be improved after merging https://github.com/status-im/status-go/pull/4380.
|
||||
// NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately
|
||||
timeout += time.Second
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for !m.IsStorenodeAvailable(m.activeStorenode) {
|
||||
select {
|
||||
case <-m.StorenodeAvailableOneshotEmitter.Subscribe():
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-waitForWaitGroup(&wg):
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
return m.IsStorenodeAvailable(m.activeStorenode)
|
||||
}
|
||||
|
||||
func waitForWaitGroup(wg *sync.WaitGroup) <-chan struct{} {
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(ch)
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
|
||||
type storenodeTaskParameters struct {
|
||||
customPeerID peer.ID
|
||||
}
|
||||
|
||||
type StorenodeTaskOption func(*storenodeTaskParameters)
|
||||
|
||||
func WithPeerID(peerID peer.ID) StorenodeTaskOption {
|
||||
return func(stp *storenodeTaskParameters) {
|
||||
stp.customPeerID = peerID
|
||||
}
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) PerformStorenodeTask(fn func() error, options ...StorenodeTaskOption) error {
|
||||
params := storenodeTaskParameters{}
|
||||
for _, opt := range options {
|
||||
opt(¶ms)
|
||||
}
|
||||
|
||||
peerID := params.customPeerID
|
||||
if peerID == "" {
|
||||
peerID = m.GetActiveStorenode()
|
||||
}
|
||||
|
||||
if peerID == "" {
|
||||
return errors.New("storenode not available")
|
||||
}
|
||||
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
var tries uint = 0
|
||||
for tries < storenodeMaxFailedRequests {
|
||||
if params.customPeerID == "" && m.storenodeStatus(peerID) != connected {
|
||||
return errors.New("storenode not available")
|
||||
}
|
||||
m.logger.Info("trying performing history requests", zap.Uint("try", tries), zap.Stringer("peerID", peerID))
|
||||
|
||||
// Peform request
|
||||
err := fn()
|
||||
if err == nil {
|
||||
// Reset failed requests
|
||||
m.logger.Debug("history request performed successfully", zap.Stringer("peerID", peerID))
|
||||
m.failedRequests[peerID] = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
m.logger.Error("failed to perform history request",
|
||||
zap.Stringer("peerID", peerID),
|
||||
zap.Uint("tries", tries),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
tries++
|
||||
|
||||
if storeErr, ok := err.(*store.StoreError); ok {
|
||||
if storeErr.Code == http.StatusTooManyRequests {
|
||||
m.disconnectActiveStorenode(defaultBackoff)
|
||||
return fmt.Errorf("ratelimited at storenode %s: %w", peerID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Increment failed requests
|
||||
m.failedRequests[peerID]++
|
||||
|
||||
// Change storenode
|
||||
if m.failedRequests[peerID] >= storenodeMaxFailedRequests {
|
||||
return errors.New("too many failed requests")
|
||||
}
|
||||
// Wait a couple of second not to spam
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
}
|
||||
return errors.New("failed to perform history request")
|
||||
}
|
48
waku/v2/api/history/emitters.go
Normal file
48
waku/v2/api/history/emitters.go
Normal file
@ -0,0 +1,48 @@
|
||||
package history
|
||||
|
||||
import "sync"
|
||||
|
||||
type Emitter[T any] struct {
|
||||
sync.Mutex
|
||||
subscriptions []chan T
|
||||
}
|
||||
|
||||
func NewEmitter[T any]() *Emitter[T] {
|
||||
return &Emitter[T]{}
|
||||
}
|
||||
|
||||
func (s *Emitter[T]) Subscribe() <-chan T {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
c := make(chan T)
|
||||
s.subscriptions = append(s.subscriptions, c)
|
||||
return c
|
||||
}
|
||||
|
||||
func (s *Emitter[T]) Emit(value T) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
for _, sub := range s.subscriptions {
|
||||
sub <- value
|
||||
}
|
||||
}
|
||||
|
||||
type OneShotEmitter[T any] struct {
|
||||
Emitter[T]
|
||||
}
|
||||
|
||||
func NewOneshotEmitter[T any]() *OneShotEmitter[T] {
|
||||
return &OneShotEmitter[T]{}
|
||||
}
|
||||
|
||||
func (s *OneShotEmitter[T]) Emit(value T) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
for _, subs := range s.subscriptions {
|
||||
subs <- value
|
||||
close(subs)
|
||||
}
|
||||
s.subscriptions = nil
|
||||
}
|
67
waku/v2/api/history/emitters_test.go
Normal file
67
waku/v2/api/history/emitters_test.go
Normal file
@ -0,0 +1,67 @@
|
||||
package history
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestEmitter(t *testing.T) {
|
||||
emitter := NewEmitter[int]()
|
||||
|
||||
subscr1 := emitter.Subscribe()
|
||||
subscr2 := emitter.Subscribe()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(3)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
emitter.Emit(1)
|
||||
emitter.Emit(2)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
require.Equal(t, 1, <-subscr1)
|
||||
require.Equal(t, 2, <-subscr1)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
require.Equal(t, 1, <-subscr2)
|
||||
require.Equal(t, 2, <-subscr2)
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestOneShotEmitter(t *testing.T) {
|
||||
emitter := NewOneshotEmitter[struct{}]()
|
||||
|
||||
subscr1 := emitter.Subscribe()
|
||||
subscr2 := emitter.Subscribe()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(3)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
emitter.Emit(struct{}{})
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for range subscr1 {
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for range subscr2 {
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
296
waku/v2/api/history/history.go
Normal file
296
waku/v2/api/history/history.go
Normal file
@ -0,0 +1,296 @@
|
||||
package history
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const maxTopicsPerRequest int = 10
|
||||
const mailserverRequestTimeout = 30 * time.Second
|
||||
|
||||
type work struct {
|
||||
criteria store.FilterCriteria
|
||||
cursor []byte
|
||||
limit uint64
|
||||
}
|
||||
|
||||
type HistoryRetriever struct {
|
||||
store Store
|
||||
logger *zap.Logger
|
||||
historyProcessor HistoryProcessor
|
||||
}
|
||||
|
||||
type HistoryProcessor interface {
|
||||
OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error
|
||||
OnRequestFailed(requestID []byte, peerID peer.ID, err error)
|
||||
}
|
||||
|
||||
type Store interface {
|
||||
Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error)
|
||||
}
|
||||
|
||||
func NewHistoryRetriever(store Store, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
|
||||
return &HistoryRetriever{
|
||||
store: store,
|
||||
logger: logger.Named("history-retriever"),
|
||||
historyProcessor: historyProcessor,
|
||||
}
|
||||
}
|
||||
|
||||
func (hr *HistoryRetriever) Query(
|
||||
ctx context.Context,
|
||||
criteria store.FilterCriteria,
|
||||
storenodeID peer.ID,
|
||||
pageLimit uint64,
|
||||
shouldProcessNextPage func(int) (bool, uint64),
|
||||
processEnvelopes bool,
|
||||
) error {
|
||||
logger := hr.logger.With(
|
||||
logging.Timep("fromString", criteria.TimeStart),
|
||||
logging.Timep("toString", criteria.TimeEnd),
|
||||
zap.String("pubsubTopic", criteria.PubsubTopic),
|
||||
zap.Strings("contentTopics", criteria.ContentTopicsList()),
|
||||
zap.Int64p("from", criteria.TimeStart),
|
||||
zap.Int64p("to", criteria.TimeEnd),
|
||||
)
|
||||
|
||||
logger.Info("syncing")
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
workWg := sync.WaitGroup{}
|
||||
workCh := make(chan work, 1000) // each batch item is split in 10 topics bunch and sent to this channel
|
||||
workCompleteCh := make(chan struct{}) // once all batch items are processed, this channel is triggered
|
||||
semaphore := make(chan struct{}, 3) // limit the number of concurrent queries
|
||||
errCh := make(chan error)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
// TODO: refactor this by extracting the consumer into a separate go routine.
|
||||
|
||||
// Producer
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer func() {
|
||||
logger.Debug("mailserver batch producer complete")
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
contentTopicList := criteria.ContentTopics.ToList()
|
||||
|
||||
// TODO: split into 24h batches
|
||||
|
||||
allWorks := int(math.Ceil(float64(len(contentTopicList)) / float64(maxTopicsPerRequest)))
|
||||
workWg.Add(allWorks)
|
||||
|
||||
for i := 0; i < len(contentTopicList); i += maxTopicsPerRequest {
|
||||
j := i + maxTopicsPerRequest
|
||||
if j > len(contentTopicList) {
|
||||
j = len(contentTopicList)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Debug("processBatch producer - context done")
|
||||
return
|
||||
default:
|
||||
logger.Debug("processBatch producer - creating work")
|
||||
workCh <- work{
|
||||
criteria: store.FilterCriteria{
|
||||
ContentFilter: protocol.NewContentFilter(criteria.PubsubTopic, contentTopicList[i:j]...),
|
||||
TimeStart: criteria.TimeStart,
|
||||
TimeEnd: criteria.TimeEnd,
|
||||
},
|
||||
limit: pageLimit,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
workWg.Wait()
|
||||
workCompleteCh <- struct{}{}
|
||||
}()
|
||||
|
||||
logger.Debug("processBatch producer complete")
|
||||
}()
|
||||
|
||||
var result error
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Debug("processBatch cleanup - context done")
|
||||
result = ctx.Err()
|
||||
if errors.Is(result, context.Canceled) {
|
||||
result = nil
|
||||
}
|
||||
break loop
|
||||
case w, ok := <-workCh:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
// continue...
|
||||
}
|
||||
|
||||
logger.Debug("processBatch - received work")
|
||||
|
||||
semaphore <- struct{}{}
|
||||
go func(w work) { // Consumer
|
||||
defer func() {
|
||||
workWg.Done()
|
||||
<-semaphore
|
||||
}()
|
||||
|
||||
queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout)
|
||||
cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, w.criteria, w.cursor, w.limit, true, processEnvelopes, logger)
|
||||
queryCancel()
|
||||
|
||||
if err != nil {
|
||||
logger.Debug("failed to send request", zap.Error(err))
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
processNextPage := true
|
||||
nextPageLimit := pageLimit
|
||||
if shouldProcessNextPage != nil {
|
||||
processNextPage, nextPageLimit = shouldProcessNextPage(envelopesCount)
|
||||
}
|
||||
|
||||
if !processNextPage {
|
||||
return
|
||||
}
|
||||
|
||||
// Check the cursor after calling `shouldProcessNextPage`.
|
||||
// The app might use process the fetched envelopes in the callback for own needs.
|
||||
if cursor == nil {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debug("processBatch producer - creating work (cursor)")
|
||||
|
||||
workWg.Add(1)
|
||||
workCh <- work{
|
||||
criteria: w.criteria,
|
||||
cursor: cursor,
|
||||
limit: nextPageLimit,
|
||||
}
|
||||
}(w)
|
||||
case err := <-errCh:
|
||||
logger.Debug("processBatch - received error", zap.Error(err))
|
||||
cancel() // Kill go routines
|
||||
return err
|
||||
case <-workCompleteCh:
|
||||
logger.Debug("processBatch - all jobs complete")
|
||||
cancel() // Kill go routines
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
logger.Info("synced topic", zap.NamedError("hasError", result))
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (hr *HistoryRetriever) createMessagesRequest(
|
||||
ctx context.Context,
|
||||
peerID peer.ID,
|
||||
criteria store.FilterCriteria,
|
||||
cursor []byte,
|
||||
limit uint64,
|
||||
waitForResponse bool,
|
||||
processEnvelopes bool,
|
||||
logger *zap.Logger,
|
||||
) (storeCursor []byte, envelopesCount int, err error) {
|
||||
if waitForResponse {
|
||||
resultCh := make(chan struct {
|
||||
storeCursor []byte
|
||||
envelopesCount int
|
||||
err error
|
||||
})
|
||||
|
||||
go func() {
|
||||
storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, processEnvelopes)
|
||||
resultCh <- struct {
|
||||
storeCursor []byte
|
||||
envelopesCount int
|
||||
err error
|
||||
}{storeCursor, envelopesCount, err}
|
||||
}()
|
||||
|
||||
select {
|
||||
case result := <-resultCh:
|
||||
return result.storeCursor, result.envelopesCount, result.err
|
||||
case <-ctx.Done():
|
||||
return nil, 0, ctx.Err()
|
||||
}
|
||||
} else {
|
||||
go func() {
|
||||
_, _, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, false)
|
||||
if err != nil {
|
||||
logger.Error("failed to request store messages", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID peer.ID, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) {
|
||||
requestID := protocol.GenerateRequestID()
|
||||
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID))
|
||||
|
||||
opts := []store.RequestOption{
|
||||
store.WithPaging(false, limit),
|
||||
store.WithRequestID(requestID),
|
||||
store.WithPeer(peerID),
|
||||
store.WithCursor(cursor)}
|
||||
|
||||
logger.Debug("store.query",
|
||||
logging.Timep("startTime", criteria.TimeStart),
|
||||
logging.Timep("endTime", criteria.TimeEnd),
|
||||
zap.Strings("contentTopics", criteria.ContentTopics.ToList()),
|
||||
zap.String("pubsubTopic", criteria.PubsubTopic),
|
||||
zap.String("cursor", hexutil.Encode(cursor)),
|
||||
)
|
||||
|
||||
queryStart := time.Now()
|
||||
result, err := hr.store.Query(ctx, criteria, opts...)
|
||||
queryDuration := time.Since(queryStart)
|
||||
if err != nil {
|
||||
logger.Error("error querying storenode", zap.Error(err))
|
||||
|
||||
hr.historyProcessor.OnRequestFailed(requestID, peerID, err)
|
||||
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
messages := result.Messages()
|
||||
envelopesCount := len(messages)
|
||||
logger.Debug("store.query response", zap.Duration("queryDuration", queryDuration), zap.Int("numMessages", envelopesCount), zap.Bool("hasCursor", result.IsComplete() && result.Cursor() != nil))
|
||||
for _, mkv := range messages {
|
||||
envelope := protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic())
|
||||
err := hr.historyProcessor.OnEnvelope(envelope, processEnvelopes)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
return result.Cursor(), envelopesCount, nil
|
||||
}
|
254
waku/v2/api/history/history_test.go
Normal file
254
waku/v2/api/history/history_test.go
Normal file
@ -0,0 +1,254 @@
|
||||
package history
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"math/big"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
proto_pb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
type queryResponse struct {
|
||||
contentTopics []string
|
||||
messages []*pb.WakuMessageKeyValue
|
||||
err error // Indicates if this response will simulate an error returned by SendMessagesRequestForTopics
|
||||
cursor []byte
|
||||
}
|
||||
|
||||
type mockResult struct {
|
||||
cursor []byte
|
||||
messages []*pb.WakuMessageKeyValue
|
||||
}
|
||||
|
||||
func (r *mockResult) Cursor() []byte {
|
||||
return r.cursor
|
||||
}
|
||||
|
||||
func (r *mockResult) Messages() []*pb.WakuMessageKeyValue {
|
||||
return r.messages
|
||||
}
|
||||
|
||||
func (r *mockResult) IsComplete() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (r *mockResult) PeerID() peer.ID {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (r *mockResult) Query() *pb.StoreQueryRequest {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *mockResult) Response() *pb.StoreQueryResponse {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *mockResult) Next(ctx context.Context, opts ...store.RequestOption) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type mockHistoryProcessor struct {
|
||||
}
|
||||
|
||||
func (h *mockHistoryProcessor) OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *mockHistoryProcessor) OnRequestFailed(requestID []byte, peerID peer.ID, err error) {
|
||||
}
|
||||
|
||||
func newMockHistoryProcessor() *mockHistoryProcessor {
|
||||
return &mockHistoryProcessor{}
|
||||
}
|
||||
|
||||
type mockStore struct {
|
||||
queryResponses map[string]queryResponse
|
||||
}
|
||||
|
||||
func newMockStore() *mockStore {
|
||||
return &mockStore{
|
||||
queryResponses: make(map[string]queryResponse),
|
||||
}
|
||||
}
|
||||
|
||||
func getInitialResponseKey(contentTopics []string) string {
|
||||
sort.Strings(contentTopics)
|
||||
return hex.EncodeToString(append([]byte("start"), []byte(contentTopics[0])...))
|
||||
}
|
||||
|
||||
func (t *mockStore) Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error) {
|
||||
params := store.Parameters{}
|
||||
for _, opt := range opts {
|
||||
_ = opt(¶ms)
|
||||
}
|
||||
result := &mockResult{}
|
||||
if params.Cursor() == nil {
|
||||
initialResponse := getInitialResponseKey(criteria.ContentTopicsList())
|
||||
response := t.queryResponses[initialResponse]
|
||||
if response.err != nil {
|
||||
return nil, response.err
|
||||
}
|
||||
result.cursor = response.cursor
|
||||
result.messages = response.messages
|
||||
} else {
|
||||
response := t.queryResponses[hex.EncodeToString(params.Cursor())]
|
||||
if response.err != nil {
|
||||
return nil, response.err
|
||||
}
|
||||
result.cursor = response.cursor
|
||||
result.messages = response.messages
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (t *mockStore) Populate(topics []string, responses int, includeRandomError bool) error {
|
||||
if responses <= 0 || len(topics) == 0 {
|
||||
return errors.New("invalid input parameters")
|
||||
}
|
||||
|
||||
var topicBatches [][]string
|
||||
|
||||
for i := 0; i < len(topics); i += maxTopicsPerRequest {
|
||||
// Split batch in 10-contentTopic subbatches
|
||||
j := i + maxTopicsPerRequest
|
||||
if j > len(topics) {
|
||||
j = len(topics)
|
||||
}
|
||||
topicBatches = append(topicBatches, topics[i:j])
|
||||
}
|
||||
|
||||
randomErrIdx, err := rand.Int(rand.Reader, big.NewInt(int64(len(topicBatches))))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
randomErrIdxInt := int(randomErrIdx.Int64())
|
||||
|
||||
for i, topicBatch := range topicBatches {
|
||||
// Setup initial response
|
||||
initialResponseKey := getInitialResponseKey(topicBatch)
|
||||
t.queryResponses[initialResponseKey] = queryResponse{
|
||||
contentTopics: topicBatch,
|
||||
messages: []*pb.WakuMessageKeyValue{
|
||||
{
|
||||
MessageHash: protocol.GenerateRequestID(),
|
||||
Message: &proto_pb.WakuMessage{
|
||||
Payload: []byte{1, 2, 3},
|
||||
ContentTopic: "abc",
|
||||
Timestamp: proto.Int64(time.Now().UnixNano()),
|
||||
},
|
||||
PubsubTopic: proto.String("test"),
|
||||
},
|
||||
},
|
||||
err: nil,
|
||||
}
|
||||
|
||||
prevKey := initialResponseKey
|
||||
for x := 0; x < responses-1; x++ {
|
||||
newResponseCursor := []byte(uuid.New().String())
|
||||
newResponseKey := hex.EncodeToString(newResponseCursor)
|
||||
|
||||
var err error
|
||||
if includeRandomError && i == randomErrIdxInt && x == responses-2 { // Include an error in last request
|
||||
err = errors.New("random error")
|
||||
}
|
||||
|
||||
t.queryResponses[newResponseKey] = queryResponse{
|
||||
contentTopics: topicBatch,
|
||||
messages: []*pb.WakuMessageKeyValue{
|
||||
{
|
||||
MessageHash: protocol.GenerateRequestID(),
|
||||
Message: &proto_pb.WakuMessage{
|
||||
Payload: []byte{1, 2, 3},
|
||||
ContentTopic: "abc",
|
||||
Timestamp: proto.Int64(time.Now().UnixNano()),
|
||||
},
|
||||
PubsubTopic: proto.String("test"),
|
||||
},
|
||||
},
|
||||
err: err,
|
||||
}
|
||||
|
||||
// Updating prev response cursor to point to the new response
|
||||
prevResponse := t.queryResponses[prevKey]
|
||||
prevResponse.cursor = newResponseCursor
|
||||
t.queryResponses[prevKey] = prevResponse
|
||||
|
||||
prevKey = newResponseKey
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestSuccessBatchExecution(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
storenodeID, err := peer.Decode("16Uiu2HAkw3x97MbbZSWHbdF5bob45vcZvPPK4s4Mjyv2mxyB9GS3")
|
||||
require.NoError(t, err)
|
||||
|
||||
topics := []string{}
|
||||
for i := 0; i < 50; i++ {
|
||||
topics = append(topics, uuid.NewString())
|
||||
}
|
||||
|
||||
testStore := newMockStore()
|
||||
err = testStore.Populate(topics, 10, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
historyProcessor := newMockHistoryProcessor()
|
||||
|
||||
historyRetriever := NewHistoryRetriever(testStore, historyProcessor, utils.Logger())
|
||||
|
||||
criteria := store.FilterCriteria{
|
||||
ContentFilter: protocol.NewContentFilter("test", topics...),
|
||||
}
|
||||
|
||||
err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestFailedBatchExecution(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
storenodeID, err := peer.Decode("16Uiu2HAkw3x97MbbZSWHbdF5bob45vcZvPPK4s4Mjyv2mxyB9GS3")
|
||||
require.NoError(t, err)
|
||||
|
||||
topics := []string{}
|
||||
for i := 0; i < 2; i++ {
|
||||
topics = append(topics, uuid.NewString())
|
||||
}
|
||||
|
||||
testStore := newMockStore()
|
||||
err = testStore.Populate(topics, 10, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
historyProcessor := newMockHistoryProcessor()
|
||||
|
||||
historyRetriever := NewHistoryRetriever(testStore, historyProcessor, utils.Logger())
|
||||
|
||||
criteria := store.FilterCriteria{
|
||||
ContentFilter: protocol.NewContentFilter("test", topics...),
|
||||
}
|
||||
|
||||
err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true)
|
||||
require.Error(t, err)
|
||||
}
|
32
waku/v2/api/history/sort.go
Normal file
32
waku/v2/api/history/sort.go
Normal file
@ -0,0 +1,32 @@
|
||||
package history
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
)
|
||||
|
||||
type SortedStorenode struct {
|
||||
Storenode peer.ID
|
||||
RTT time.Duration
|
||||
CanConnectAfter time.Time
|
||||
}
|
||||
|
||||
type byRTTMsAndCanConnectBefore []SortedStorenode
|
||||
|
||||
func (s byRTTMsAndCanConnectBefore) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
func (s byRTTMsAndCanConnectBefore) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
||||
func (s byRTTMsAndCanConnectBefore) Less(i, j int) bool {
|
||||
// Slightly inaccurate as time sensitive sorting, but it does not matter so much
|
||||
now := time.Now()
|
||||
if s[i].CanConnectAfter.Before(now) && s[j].CanConnectAfter.Before(now) {
|
||||
return s[i].RTT < s[j].RTT
|
||||
}
|
||||
return s[i].CanConnectAfter.Before(s[j].CanConnectAfter)
|
||||
}
|
@ -178,7 +178,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (*store.Result, error), logger *zap.Logger, logMsg string) (*store.Result, error) {
|
||||
func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (store.Result, error), logger *zap.Logger, logMsg string) (store.Result, error) {
|
||||
retry := true
|
||||
count := 1
|
||||
for retry && count <= m.params.maxAttemptsToRetrieveHistory {
|
||||
@ -212,7 +212,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
||||
logging.Epoch("to", now),
|
||||
)
|
||||
|
||||
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
|
||||
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) {
|
||||
return m.store.Query(ctx, store.FilterCriteria{
|
||||
ContentFilter: protocol.NewContentFilter(interest.contentFilter.PubsubTopic, contentTopics[batchFrom:batchTo]...),
|
||||
TimeStart: proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()),
|
||||
@ -243,7 +243,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
||||
missingHashes = append(missingHashes, hash)
|
||||
}
|
||||
|
||||
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
|
||||
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) {
|
||||
if err = result.Next(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -282,7 +282,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
||||
defer utils.LogOnPanic()
|
||||
defer wg.Wait()
|
||||
|
||||
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
|
||||
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) {
|
||||
queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout)
|
||||
defer cancel()
|
||||
return m.store.QueryByHash(queryCtx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest))
|
||||
@ -303,7 +303,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
||||
}
|
||||
}
|
||||
|
||||
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
|
||||
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) {
|
||||
if err = result.Next(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -8,8 +8,8 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
apicommon "github.com/waku-org/go-waku/waku/v2/api/common"
|
||||
"github.com/waku-org/go-waku/waku/v2/api/history"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
@ -29,7 +29,6 @@ type ISentCheck interface {
|
||||
Start()
|
||||
Add(topic string, messageID common.Hash, sentTime uint32)
|
||||
DeleteByMessageIDs(messageIDs []common.Hash)
|
||||
SetStorePeerID(peerID peer.ID)
|
||||
}
|
||||
|
||||
// MessageSentCheck tracks the outgoing messages and check against store node
|
||||
@ -38,11 +37,11 @@ type ISentCheck interface {
|
||||
type MessageSentCheck struct {
|
||||
messageIDs map[string]map[common.Hash]uint32
|
||||
messageIDsMu sync.RWMutex
|
||||
storePeerID peer.ID
|
||||
messageStoredChan chan common.Hash
|
||||
messageExpiredChan chan common.Hash
|
||||
ctx context.Context
|
||||
store *store.WakuStore
|
||||
storenodeCycle *history.StorenodeCycle
|
||||
timesource timesource.Timesource
|
||||
logger *zap.Logger
|
||||
maxHashQueryLength uint64
|
||||
@ -53,7 +52,7 @@ type MessageSentCheck struct {
|
||||
}
|
||||
|
||||
// NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters
|
||||
func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck {
|
||||
func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, cycle *history.StorenodeCycle, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck {
|
||||
return &MessageSentCheck{
|
||||
messageIDs: make(map[string]map[common.Hash]uint32),
|
||||
messageIDsMu: sync.RWMutex{},
|
||||
@ -61,6 +60,7 @@ func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource
|
||||
messageExpiredChan: msgExpiredChan,
|
||||
ctx: ctx,
|
||||
store: store,
|
||||
storenodeCycle: cycle,
|
||||
timesource: timesource,
|
||||
logger: logger,
|
||||
maxHashQueryLength: DefaultMaxHashQueryLength,
|
||||
@ -139,11 +139,6 @@ func (m *MessageSentCheck) DeleteByMessageIDs(messageIDs []common.Hash) {
|
||||
}
|
||||
}
|
||||
|
||||
// SetStorePeerID sets the peer id of store node
|
||||
func (m *MessageSentCheck) SetStorePeerID(peerID peer.ID) {
|
||||
m.storePeerID = peerID
|
||||
}
|
||||
|
||||
// Start checks if the tracked outgoing messages are stored periodically
|
||||
func (m *MessageSentCheck) Start() {
|
||||
defer utils.LogOnPanic()
|
||||
@ -211,7 +206,7 @@ func (m *MessageSentCheck) Start() {
|
||||
}
|
||||
|
||||
func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []common.Hash, relayTime []uint32, pubsubTopic string) []common.Hash {
|
||||
selectedPeer := m.storePeerID
|
||||
selectedPeer := m.storenodeCycle.GetActiveStorenode()
|
||||
if selectedPeer == "" {
|
||||
m.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic))
|
||||
return []common.Hash{}
|
||||
|
@ -10,7 +10,7 @@ import (
|
||||
|
||||
func TestAddAndDelete(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
messageSentCheck := NewMessageSentCheck(ctx, nil, nil, nil, nil, nil)
|
||||
messageSentCheck := NewMessageSentCheck(ctx, nil, nil, nil, nil, nil, nil)
|
||||
|
||||
messageSentCheck.Add("topic", [32]byte{1}, 1)
|
||||
messageSentCheck.Add("topic", [32]byte{2}, 2)
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
@ -162,9 +161,3 @@ func (ms *MessageSender) MessagesDelivered(messageIDs []common.Hash) {
|
||||
ms.messageSentCheck.DeleteByMessageIDs(messageIDs)
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *MessageSender) SetStorePeerID(peerID peer.ID) {
|
||||
if ms.messageSentCheck != nil {
|
||||
ms.messageSentCheck.SetStorePeerID(peerID)
|
||||
}
|
||||
}
|
||||
|
@ -50,8 +50,8 @@ type StoreError struct {
|
||||
}
|
||||
|
||||
// NewStoreError creates a new instance of StoreError
|
||||
func NewStoreError(code int, message string) StoreError {
|
||||
return StoreError{
|
||||
func NewStoreError(code int, message string) *StoreError {
|
||||
return &StoreError{
|
||||
Code: code,
|
||||
Message: message,
|
||||
}
|
||||
@ -99,7 +99,7 @@ func (s *WakuStore) SetHost(h host.Host) {
|
||||
// Request is used to send a store query. This function requires understanding how to prepare a store query
|
||||
// and most of the time you can use `Query`, `QueryByHash` and `Exists` instead, as they provide
|
||||
// a simpler API
|
||||
func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...RequestOption) (*Result, error) {
|
||||
func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...RequestOption) (Result, error) {
|
||||
params := new(Parameters)
|
||||
|
||||
optList := DefaultOptions()
|
||||
@ -182,7 +182,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := &Result{
|
||||
result := &resultImpl{
|
||||
store: s,
|
||||
messages: response.Messages,
|
||||
storeRequest: storeRequest,
|
||||
@ -195,12 +195,12 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
|
||||
}
|
||||
|
||||
// Query retrieves all the messages that match a criteria. Use the options to indicate whether to return the message themselves or not.
|
||||
func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (*Result, error) {
|
||||
func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (Result, error) {
|
||||
return s.Request(ctx, criteria, opts...)
|
||||
}
|
||||
|
||||
// Query retrieves all the messages with specific message hashes
|
||||
func (s *WakuStore) QueryByHash(ctx context.Context, messageHashes []wpb.MessageHash, opts ...RequestOption) (*Result, error) {
|
||||
func (s *WakuStore) QueryByHash(ctx context.Context, messageHashes []wpb.MessageHash, opts ...RequestOption) (Result, error) {
|
||||
return s.Request(ctx, MessageHashCriteria{messageHashes}, opts...)
|
||||
}
|
||||
|
||||
@ -214,17 +214,17 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt
|
||||
return false, err
|
||||
}
|
||||
|
||||
return len(result.messages) != 0, nil
|
||||
return len(result.Messages()) != 0, nil
|
||||
}
|
||||
|
||||
func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) {
|
||||
func (s *WakuStore) next(ctx context.Context, r Result, opts ...RequestOption) (*resultImpl, error) {
|
||||
if r.IsComplete() {
|
||||
return &Result{
|
||||
return &resultImpl{
|
||||
store: s,
|
||||
messages: nil,
|
||||
cursor: nil,
|
||||
storeRequest: r.storeRequest,
|
||||
storeResponse: r.storeResponse,
|
||||
storeRequest: r.Query(),
|
||||
storeResponse: r.Response(),
|
||||
peerID: r.PeerID(),
|
||||
}, nil
|
||||
}
|
||||
@ -240,7 +240,7 @@ func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption)
|
||||
}
|
||||
}
|
||||
|
||||
storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest)
|
||||
storeRequest := proto.Clone(r.Query()).(*pb.StoreQueryRequest)
|
||||
storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
|
||||
storeRequest.PaginationCursor = r.Cursor()
|
||||
|
||||
@ -249,7 +249,7 @@ func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := &Result{
|
||||
result := &resultImpl{
|
||||
store: s,
|
||||
messages: response.Messages,
|
||||
storeRequest: storeRequest,
|
||||
@ -317,7 +317,7 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe
|
||||
|
||||
if storeResponse.GetStatusCode() != ok {
|
||||
err := NewStoreError(int(storeResponse.GetStatusCode()), storeResponse.GetStatusDesc())
|
||||
return nil, &err
|
||||
return nil, err
|
||||
}
|
||||
return storeResponse, nil
|
||||
}
|
||||
|
@ -128,33 +128,33 @@ func TestStoreClient(t *testing.T) {
|
||||
|
||||
// -- First page:
|
||||
require.False(t, response.IsComplete())
|
||||
require.Len(t, response.messages, 2)
|
||||
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp())
|
||||
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[1].GetTimestamp())
|
||||
require.Len(t, response.Messages(), 2)
|
||||
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[0].GetTimestamp())
|
||||
require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[1].GetTimestamp())
|
||||
|
||||
err = response.Next(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// -- Second page:
|
||||
require.False(t, response.IsComplete())
|
||||
require.Len(t, response.messages, 2)
|
||||
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[2].GetTimestamp())
|
||||
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[3].GetTimestamp())
|
||||
require.Len(t, response.Messages(), 2)
|
||||
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[2].GetTimestamp())
|
||||
require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[3].GetTimestamp())
|
||||
|
||||
err = response.Next(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// -- Third page:
|
||||
require.False(t, response.IsComplete())
|
||||
require.Len(t, response.messages, 1)
|
||||
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[4].GetTimestamp())
|
||||
require.Len(t, response.Messages(), 1)
|
||||
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[4].GetTimestamp())
|
||||
|
||||
err = response.Next(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// -- Trying to continue a completed cursor
|
||||
require.True(t, response.IsComplete())
|
||||
require.Len(t, response.messages, 0)
|
||||
require.Len(t, response.Messages(), 0)
|
||||
|
||||
err = response.Next(ctx)
|
||||
require.NoError(t, err)
|
||||
@ -165,26 +165,26 @@ func TestStoreClient(t *testing.T) {
|
||||
|
||||
// -- First page:
|
||||
require.False(t, response.IsComplete())
|
||||
require.Len(t, response.messages, 2)
|
||||
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[3].GetTimestamp())
|
||||
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[4].GetTimestamp())
|
||||
require.Len(t, response.Messages(), 2)
|
||||
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[3].GetTimestamp())
|
||||
require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[4].GetTimestamp())
|
||||
|
||||
err = response.Next(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// -- Second page:
|
||||
require.False(t, response.IsComplete())
|
||||
require.Len(t, response.messages, 2)
|
||||
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[1].GetTimestamp())
|
||||
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[2].GetTimestamp())
|
||||
require.Len(t, response.Messages(), 2)
|
||||
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[1].GetTimestamp())
|
||||
require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[2].GetTimestamp())
|
||||
|
||||
err = response.Next(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// -- Third page:
|
||||
require.False(t, response.IsComplete())
|
||||
require.Len(t, response.messages, 1)
|
||||
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp())
|
||||
require.Len(t, response.Messages(), 1)
|
||||
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[0].GetTimestamp())
|
||||
|
||||
err = response.Next(ctx)
|
||||
require.NoError(t, err)
|
||||
@ -197,13 +197,13 @@ func TestStoreClient(t *testing.T) {
|
||||
// No cursor should be returned if there are no messages that match the criteria
|
||||
response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "no-messages"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 2))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, response.messages, 0)
|
||||
require.Len(t, response.Messages(), 0)
|
||||
require.Empty(t, response.Cursor())
|
||||
|
||||
// If the page size is larger than the number of existing messages, it should not return a cursor
|
||||
response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 100))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, response.messages, 5)
|
||||
require.Len(t, response.Messages(), 5)
|
||||
require.Empty(t, response.Cursor())
|
||||
|
||||
// Invalid cursors should fail
|
||||
@ -225,17 +225,17 @@ func TestStoreClient(t *testing.T) {
|
||||
// Handle temporal history query with a zero-size time window
|
||||
response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test"), TimeStart: startTime, TimeEnd: startTime})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, response.messages, 0)
|
||||
require.Len(t, response.Messages(), 0)
|
||||
require.Empty(t, response.Cursor())
|
||||
|
||||
// Should not include data
|
||||
response, err = wakuStore.Request(ctx, MessageHashCriteria{MessageHashes: []pb.MessageHash{messages[0].Hash(pubsubTopic)}}, IncludeData(false), WithPeer(storenode.ID))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, response.messages, 1)
|
||||
require.Nil(t, response.messages[0].Message)
|
||||
require.Len(t, response.Messages(), 1)
|
||||
require.Nil(t, response.Messages()[0].Message)
|
||||
|
||||
response, err = wakuStore.Request(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test")}, IncludeData(false))
|
||||
require.NoError(t, err)
|
||||
require.GreaterOrEqual(t, len(response.messages), 1)
|
||||
require.Nil(t, response.messages[0].Message)
|
||||
require.GreaterOrEqual(t, len(response.Messages()), 1)
|
||||
require.Nil(t, response.Messages()[0].Message)
|
||||
}
|
||||
|
@ -22,6 +22,10 @@ type Parameters struct {
|
||||
skipRatelimit bool
|
||||
}
|
||||
|
||||
func (p *Parameters) Cursor() []byte {
|
||||
return p.cursor
|
||||
}
|
||||
|
||||
type RequestOption func(*Parameters) error
|
||||
|
||||
// WithPeer is an option used to specify the peerID to request the message history.
|
||||
|
@ -8,7 +8,17 @@ import (
|
||||
)
|
||||
|
||||
// Result represents a valid response from a store node
|
||||
type Result struct {
|
||||
type Result interface {
|
||||
Cursor() []byte
|
||||
IsComplete() bool
|
||||
PeerID() peer.ID
|
||||
Query() *pb.StoreQueryRequest
|
||||
Response() *pb.StoreQueryResponse
|
||||
Next(ctx context.Context, opts ...RequestOption) error
|
||||
Messages() []*pb.WakuMessageKeyValue
|
||||
}
|
||||
|
||||
type resultImpl struct {
|
||||
done bool
|
||||
|
||||
messages []*pb.WakuMessageKeyValue
|
||||
@ -19,27 +29,27 @@ type Result struct {
|
||||
peerID peer.ID
|
||||
}
|
||||
|
||||
func (r *Result) Cursor() []byte {
|
||||
func (r *resultImpl) Cursor() []byte {
|
||||
return r.cursor
|
||||
}
|
||||
|
||||
func (r *Result) IsComplete() bool {
|
||||
func (r *resultImpl) IsComplete() bool {
|
||||
return r.done
|
||||
}
|
||||
|
||||
func (r *Result) PeerID() peer.ID {
|
||||
func (r *resultImpl) PeerID() peer.ID {
|
||||
return r.peerID
|
||||
}
|
||||
|
||||
func (r *Result) Query() *pb.StoreQueryRequest {
|
||||
func (r *resultImpl) Query() *pb.StoreQueryRequest {
|
||||
return r.storeRequest
|
||||
}
|
||||
|
||||
func (r *Result) Response() *pb.StoreQueryResponse {
|
||||
func (r *resultImpl) Response() *pb.StoreQueryResponse {
|
||||
return r.storeResponse
|
||||
}
|
||||
|
||||
func (r *Result) Next(ctx context.Context, opts ...RequestOption) error {
|
||||
func (r *resultImpl) Next(ctx context.Context, opts ...RequestOption) error {
|
||||
if r.cursor == nil {
|
||||
r.done = true
|
||||
r.messages = nil
|
||||
@ -57,6 +67,6 @@ func (r *Result) Next(ctx context.Context, opts ...RequestOption) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Result) Messages() []*pb.WakuMessageKeyValue {
|
||||
func (r *resultImpl) Messages() []*pb.WakuMessageKeyValue {
|
||||
return r.messages
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user