feat: storenode cycle

This commit is contained in:
Richard Ramos 2024-09-19 14:39:40 -04:00
parent 12abd041d6
commit 0194a126e3
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
9 changed files with 906 additions and 21 deletions

View File

@ -74,6 +74,14 @@ func (t timestamp) String() string {
return time.Unix(0, int64(t)).Format(time.RFC3339)
}
func Timep(key string, time *int64) zapcore.Field {
if time == nil {
return zap.String(key, "-")
} else {
return Time(key, *time)
}
}
func Epoch(key string, time time.Time) zap.Field {
return zap.String(key, fmt.Sprintf("%d", time.UnixNano()))
}

View File

@ -0,0 +1,523 @@
package history
import (
"context"
"crypto/rand"
"errors"
"fmt"
"math"
"math/big"
"net"
"net/http"
"runtime"
"sort"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"go.uber.org/zap"
)
const defaultBackoff = 10 * time.Second
const graylistBackoff = 3 * time.Minute
const storenodeMaxFailedRequests uint = 2
const isAndroidEmulator = runtime.GOOS == "android" && runtime.GOARCH == "amd64"
const findNearestMailServer = !isAndroidEmulator
const overrideDNS = runtime.GOOS == "android" || runtime.GOOS == "ios"
const bootstrapDNS = "8.8.8.8:53"
type connStatus int
const (
disconnected connStatus = iota + 1
connected
)
type peerStatus struct {
status connStatus
canConnectAfter time.Time
lastConnectionAttempt time.Time
}
type StorenodeConfigProvider interface {
UseStorenodes() (bool, error)
GetPinnedStorenode() (peer.ID, error)
Storenodes() ([]peer.ID, error)
}
type StorenodeCycle struct {
sync.RWMutex
logger *zap.Logger
host host.Host
storenodeConfigProvider StorenodeConfigProvider
StorenodeAvailableOneshotEmitter *OneShotEmitter[struct{}]
StorenodeChangedEmitter *Emitter[peer.ID]
StorenodeNotWorkingEmitter *Emitter[struct{}]
StorenodeAvailableEmitter *Emitter[peer.ID]
failedRequests map[peer.ID]uint
peersMutex sync.RWMutex
activeStorenode peer.ID
peers map[peer.ID]peerStatus
}
func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle {
return &StorenodeCycle{
StorenodeAvailableOneshotEmitter: NewOneshotEmitter[struct{}](),
StorenodeChangedEmitter: NewEmitter[peer.ID](),
StorenodeNotWorkingEmitter: NewEmitter[struct{}](),
StorenodeAvailableEmitter: NewEmitter[peer.ID](),
logger: logger.Named("storenode-cycle"),
}
}
func (m *StorenodeCycle) Start(ctx context.Context, h host.Host) {
m.logger.Debug("starting storenode cycle")
m.host = h
m.failedRequests = make(map[peer.ID]uint)
m.peers = make(map[peer.ID]peerStatus)
go m.verifyStorenodeStatus(ctx)
}
func (m *StorenodeCycle) DisconnectActiveStorenode(backoff time.Duration) {
m.Lock()
defer m.Unlock()
m.disconnectActiveStorenode(backoff)
}
func (m *StorenodeCycle) connectToNewStorenodeAndWait(ctx context.Context) error {
// Handle pinned storenodes
m.logger.Info("disconnecting storenode")
pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode()
if err != nil {
m.logger.Error("could not obtain the pinned storenode", zap.Error(err))
return err
}
// If no pinned storenode, no need to disconnect and wait for it to be available
if pinnedStorenode == "" {
m.disconnectActiveStorenode(graylistBackoff)
}
return m.findNewStorenode(ctx)
}
func (m *StorenodeCycle) disconnectStorenode(backoffDuration time.Duration) error {
if m.activeStorenode == "" {
m.logger.Info("no active storenode")
return nil
}
m.logger.Info("disconnecting active storenode", zap.Stringer("peerID", m.activeStorenode))
m.peersMutex.Lock()
pInfo, ok := m.peers[m.activeStorenode]
if ok {
pInfo.status = disconnected
pInfo.canConnectAfter = time.Now().Add(backoffDuration)
m.peers[m.activeStorenode] = pInfo
} else {
m.peers[m.activeStorenode] = peerStatus{
status: disconnected,
canConnectAfter: time.Now().Add(backoffDuration),
}
}
m.peersMutex.Unlock()
m.activeStorenode = ""
return nil
}
func (m *StorenodeCycle) disconnectActiveStorenode(backoffDuration time.Duration) {
err := m.disconnectStorenode(backoffDuration)
if err != nil {
m.logger.Error("failed to disconnect storenode", zap.Error(err))
}
m.StorenodeChangedEmitter.Emit("")
}
func (m *StorenodeCycle) Cycle(ctx context.Context) {
if m.storenodeConfigProvider == nil {
m.logger.Info("storenodeConfigProvider not yet setup")
return
}
m.logger.Info("Automatically switching storenode")
if m.activeStorenode != "" {
m.disconnectActiveStorenode(graylistBackoff)
}
useStorenode, err := m.storenodeConfigProvider.UseStorenodes()
if err != nil {
m.logger.Error("failed to get use storenodes", zap.Error(err))
return
}
if !useStorenode {
m.logger.Info("Skipping storenode search due to useStorenode being false")
return
}
err = m.findNewStorenode(ctx)
if err != nil {
m.logger.Error("Error getting new storenode", zap.Error(err))
}
}
func poolSize(fleetSize int) int {
return int(math.Ceil(float64(fleetSize) / 4))
}
func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.ID) []peer.ID {
// TODO: this can be replaced by peer selector once code is moved to go-waku api
availableStorenodes := make(map[peer.ID]time.Duration)
availableStorenodesMutex := sync.Mutex{}
availableStorenodesWg := sync.WaitGroup{}
for _, storenode := range allStorenodes {
availableStorenodesWg.Add(1)
go func(peerID peer.ID) {
defer availableStorenodesWg.Done()
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()
rtt, err := m.pingPeer(ctx, peerID)
if err == nil { // pinging storenodes might fail, but we don't care
availableStorenodesMutex.Lock()
availableStorenodes[peerID] = rtt
availableStorenodesMutex.Unlock()
}
}(storenode)
}
availableStorenodesWg.Wait()
if len(availableStorenodes) == 0 {
m.logger.Warn("No storenodes available") // Do nothing..
return nil
}
var sortedStorenodes []SortedStorenode
for storenodeID, rtt := range availableStorenodes {
sortedStorenode := SortedStorenode{
Storenode: storenodeID,
RTT: rtt,
}
m.peersMutex.Lock()
pInfo, ok := m.peers[storenodeID]
m.peersMutex.Unlock()
if ok && time.Now().Before(pInfo.canConnectAfter) {
continue // We can't connect to this node yet
}
sortedStorenodes = append(sortedStorenodes, sortedStorenode)
}
sort.Sort(byRTTMsAndCanConnectBefore(sortedStorenodes))
result := make([]peer.ID, len(sortedStorenodes))
for i, s := range sortedStorenodes {
result[i] = s.Storenode
}
return result
}
func (m *StorenodeCycle) pingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
pingResultCh := ping.Ping(ctx, m.host, peerID)
select {
case <-ctx.Done():
return 0, ctx.Err()
case r := <-pingResultCh:
if r.Error != nil {
return 0, r.Error
}
return r.RTT, nil
}
}
func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
// we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581
if overrideDNS {
var dialer net.Dialer
net.DefaultResolver = &net.Resolver{
PreferGo: false,
Dial: func(context context.Context, _, _ string) (net.Conn, error) {
conn, err := dialer.DialContext(context, "udp", bootstrapDNS)
if err != nil {
return nil, err
}
return conn, nil
},
}
}
pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode()
if err != nil {
m.logger.Error("Could not obtain the pinned storenode", zap.Error(err))
return err
}
if pinnedStorenode != "" {
return m.connect(pinnedStorenode)
}
m.logger.Info("Finding a new storenode..")
allStorenodes, err := m.storenodeConfigProvider.Storenodes()
if err != nil {
return err
}
// TODO: remove this check once sockets are stable on x86_64 emulators
if findNearestMailServer {
allStorenodes = m.getAvailableStorenodesSortedByRTT(ctx, allStorenodes)
}
// Picks a random storenode amongs the ones with the lowest latency
// The pool size is 1/4 of the storenodes were pinged successfully
pSize := poolSize(len(allStorenodes) - 1)
if pSize <= 0 {
pSize = len(allStorenodes)
if pSize <= 0 {
m.logger.Warn("No storenodes available") // Do nothing..
return nil
}
}
r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize)))
if err != nil {
return err
}
ms := allStorenodes[r.Int64()]
return m.connect(ms)
}
func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus {
m.peersMutex.RLock()
defer m.peersMutex.RUnlock()
peer, ok := m.peers[peerID]
if !ok {
return disconnected
}
return peer.status
}
func (m *StorenodeCycle) connect(peerID peer.ID) error {
m.logger.Info("connecting to storenode", zap.Stringer("peerID", peerID))
m.activeStorenode = peerID
m.StorenodeChangedEmitter.Emit(m.activeStorenode)
storenodeStatus := m.storenodeStatus(peerID)
if storenodeStatus != connected {
m.peersMutex.Lock()
m.peers[peerID] = peerStatus{
status: connected,
lastConnectionAttempt: time.Now(),
canConnectAfter: time.Now().Add(defaultBackoff),
}
m.peersMutex.Unlock()
m.failedRequests[peerID] = 0
m.logger.Info("storenode available", zap.Stringer("peerID", m.activeStorenode))
m.StorenodeAvailableOneshotEmitter.Emit(struct{}{}) // Maybe can be refactored away?
m.StorenodeAvailableEmitter.Emit(m.activeStorenode)
}
return nil
}
func (m *StorenodeCycle) GetActiveStorenode() peer.ID {
m.RLock()
defer m.RUnlock()
return m.activeStorenode
}
func (m *StorenodeCycle) IsStorenodeAvailable(peerID peer.ID) bool {
return m.storenodeStatus(peerID) == connected
}
func (m *StorenodeCycle) penalizeStorenode(id peer.ID) {
m.peersMutex.Lock()
defer m.peersMutex.Unlock()
pInfo, ok := m.peers[id]
if !ok {
pInfo.status = disconnected
}
pInfo.canConnectAfter = time.Now().Add(graylistBackoff)
m.peers[id] = pInfo
}
func (m *StorenodeCycle) verifyStorenodeStatus(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := m.disconnectStorenodeIfRequired(ctx)
if err != nil {
m.logger.Error("failed to handle storenode cycle event", zap.Error(err))
continue
}
case <-ctx.Done():
return
}
}
}
func (m *StorenodeCycle) disconnectStorenodeIfRequired(ctx context.Context) error {
m.logger.Debug("wakuV2 storenode status verification")
if m.activeStorenode == "" {
// No active storenode, find a new one
m.Cycle(ctx)
return nil
}
// Check whether we want to disconnect the active storenode
if m.failedRequests[m.activeStorenode] >= storenodeMaxFailedRequests {
m.penalizeStorenode(m.activeStorenode)
m.StorenodeNotWorkingEmitter.Emit(struct{}{})
m.logger.Info("too many failed requests", zap.Stringer("storenode", m.activeStorenode))
m.failedRequests[m.activeStorenode] = 0
return m.connectToNewStorenodeAndWait(ctx)
}
return nil
}
func (m *StorenodeCycle) SetStorenodeConfigProvider(provider StorenodeConfigProvider) {
m.storenodeConfigProvider = provider
}
func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout time.Duration) bool {
// Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start.
// This can be improved after merging https://github.com/status-im/status-go/pull/4380.
// NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately
timeout += time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for !m.IsStorenodeAvailable(m.activeStorenode) {
select {
case <-m.StorenodeAvailableOneshotEmitter.Subscribe():
case <-ctx.Done():
return
}
}
}()
select {
case <-waitForWaitGroup(&wg):
case <-ctx.Done():
}
return m.IsStorenodeAvailable(m.activeStorenode)
}
func waitForWaitGroup(wg *sync.WaitGroup) <-chan struct{} {
ch := make(chan struct{})
go func() {
wg.Wait()
close(ch)
}()
return ch
}
type storenodeTaskParameters struct {
customPeerID peer.ID
}
type StorenodeTaskOption func(*storenodeTaskParameters)
func WithPeerID(peerID peer.ID) StorenodeTaskOption {
return func(stp *storenodeTaskParameters) {
stp.customPeerID = peerID
}
}
func (m *StorenodeCycle) PerformStorenodeTask(fn func() error, options ...StorenodeTaskOption) error {
params := storenodeTaskParameters{}
for _, opt := range options {
opt(&params)
}
peerID := params.customPeerID
if peerID == "" {
peerID = m.GetActiveStorenode()
}
if peerID == "" {
return errors.New("storenode not available")
}
m.RLock()
defer m.RUnlock()
var tries uint = 0
for tries < storenodeMaxFailedRequests {
if params.customPeerID == "" && m.storenodeStatus(peerID) != connected {
return errors.New("storenode not available")
}
m.logger.Info("trying performing history requests", zap.Uint("try", tries), zap.Stringer("peerID", peerID))
// Peform request
err := fn()
if err == nil {
// Reset failed requests
m.logger.Debug("history request performed successfully", zap.Stringer("peerID", peerID))
m.failedRequests[peerID] = 0
return nil
}
m.logger.Error("failed to perform history request",
zap.Stringer("peerID", peerID),
zap.Uint("tries", tries),
zap.Error(err),
)
tries++
if storeErr, ok := err.(*store.StoreError); ok {
if storeErr.Code == http.StatusTooManyRequests {
m.disconnectActiveStorenode(defaultBackoff)
return fmt.Errorf("ratelimited at storenode %s: %w", peerID, err)
}
}
// Increment failed requests
m.failedRequests[peerID]++
// Change storenode
if m.failedRequests[peerID] >= storenodeMaxFailedRequests {
return errors.New("too many failed requests")
}
// Wait a couple of second not to spam
time.Sleep(2 * time.Second)
}
return errors.New("failed to perform history request")
}

View File

@ -0,0 +1,49 @@
package history
import "sync"
type Emitter[T any] struct {
sync.Mutex
subscriptions []chan T
}
func NewEmitter[T any]() *Emitter[T] {
return &Emitter[T]{}
}
func (s *Emitter[T]) Subscribe() <-chan T {
s.Lock()
defer s.Unlock()
c := make(chan T)
s.subscriptions = append(s.subscriptions, c)
return c
}
func (s *Emitter[T]) Emit(value T) {
s.Lock()
defer s.Unlock()
for _, subs := range s.subscriptions {
subs <- value
}
s.subscriptions = nil
}
type OneShotEmitter[T any] struct {
Emitter[T]
}
func NewOneshotEmitter[T any]() *OneShotEmitter[T] {
return &OneShotEmitter[T]{}
}
func (s *OneShotEmitter[T]) Emit(value T) {
s.Lock()
defer s.Unlock()
for _, subs := range s.subscriptions {
subs <- value
close(subs)
}
s.subscriptions = nil
}

View File

@ -0,0 +1,285 @@
package history
import (
"context"
"errors"
"math"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"go.uber.org/zap"
)
const maxTopicsPerRequest int = 10
const mailserverRequestTimeout = 30 * time.Second
type work struct {
criteria store.FilterCriteria
cursor []byte
limit uint64
}
type HistoryRetriever struct {
store *store.WakuStore
logger *zap.Logger
historyProcessor HistoryProcessor
}
type HistoryProcessor interface {
OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error
OnRequestFailed(requestID []byte, peerID peer.ID, err error)
}
func NewHistoryRetriever(store *store.WakuStore, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
return &HistoryRetriever{
store: store,
logger: logger.Named("history-retriever"),
historyProcessor: historyProcessor,
}
}
func (hr *HistoryRetriever) Query(
ctx context.Context,
criteria store.FilterCriteria,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error {
logger := hr.logger.With(
logging.Timep("fromString", criteria.TimeStart),
logging.Timep("toString", criteria.TimeEnd),
zap.String("pubsubTopic", criteria.PubsubTopic),
zap.Strings("contentTopics", criteria.ContentTopicsList()),
zap.Int64p("from", criteria.TimeStart),
zap.Int64p("to", criteria.TimeEnd),
)
logger.Info("syncing")
wg := sync.WaitGroup{}
workWg := sync.WaitGroup{}
workCh := make(chan work, 1000) // each batch item is split in 10 topics bunch and sent to this channel
workCompleteCh := make(chan struct{}) // once all batch items are processed, this channel is triggered
semaphore := make(chan struct{}, 3) // limit the number of concurrent queries
errCh := make(chan error)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// TODO: refactor this by extracting the consumer into a separate go routine.
// Producer
wg.Add(1)
go func() {
defer func() {
logger.Debug("mailserver batch producer complete")
wg.Done()
}()
contentTopicList := criteria.ContentTopics.ToList()
// TODO: split into 24h batches
allWorks := int(math.Ceil(float64(len(contentTopicList)) / float64(maxTopicsPerRequest)))
workWg.Add(allWorks)
for i := 0; i < len(contentTopicList); i += maxTopicsPerRequest {
j := i + maxTopicsPerRequest
if j > len(contentTopicList) {
j = len(contentTopicList)
}
select {
case <-ctx.Done():
logger.Debug("processBatch producer - context done")
return
default:
logger.Debug("processBatch producer - creating work")
workCh <- work{
criteria: store.FilterCriteria{
ContentFilter: protocol.NewContentFilter(criteria.PubsubTopic, contentTopicList[i:j]...),
TimeStart: criteria.TimeStart,
TimeEnd: criteria.TimeEnd,
},
limit: pageLimit,
}
}
}
go func() {
workWg.Wait()
workCompleteCh <- struct{}{}
}()
logger.Debug("processBatch producer complete")
}()
var result error
loop:
for {
select {
case <-ctx.Done():
logger.Debug("processBatch cleanup - context done")
result = ctx.Err()
if errors.Is(result, context.Canceled) {
result = nil
}
break loop
case w, ok := <-workCh:
if !ok {
continue
}
logger.Debug("processBatch - received work")
semaphore <- struct{}{}
go func(w work) { // Consumer
defer func() {
workWg.Done()
<-semaphore
}()
queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout)
cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, w.criteria, w.cursor, w.limit, true, processEnvelopes, logger)
queryCancel()
if err != nil {
logger.Debug("failed to send request", zap.Error(err))
errCh <- err
return
}
processNextPage := true
nextPageLimit := pageLimit
if shouldProcessNextPage != nil {
processNextPage, nextPageLimit = shouldProcessNextPage(envelopesCount)
}
if !processNextPage {
return
}
// Check the cursor after calling `shouldProcessNextPage`.
// The app might use process the fetched envelopes in the callback for own needs.
if cursor == nil {
return
}
logger.Debug("processBatch producer - creating work (cursor)")
workWg.Add(1)
workCh <- work{
criteria: w.criteria,
cursor: cursor,
limit: nextPageLimit,
}
}(w)
case err := <-errCh:
logger.Debug("processBatch - received error", zap.Error(err))
cancel() // Kill go routines
return err
case <-workCompleteCh:
logger.Debug("processBatch - all jobs complete")
cancel() // Kill go routines
}
}
wg.Wait()
logger.Info("synced topic", zap.NamedError("hasError", result))
return result
}
func (hr *HistoryRetriever) createMessagesRequest(
ctx context.Context,
peerID peer.ID,
criteria store.FilterCriteria,
cursor []byte,
limit uint64,
waitForResponse bool,
processEnvelopes bool,
logger *zap.Logger,
) (storeCursor []byte, envelopesCount int, err error) {
if waitForResponse {
resultCh := make(chan struct {
storeCursor []byte
envelopesCount int
err error
})
go func() {
storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, processEnvelopes)
resultCh <- struct {
storeCursor []byte
envelopesCount int
err error
}{storeCursor, envelopesCount, err}
}()
select {
case result := <-resultCh:
return result.storeCursor, result.envelopesCount, result.err
case <-ctx.Done():
return nil, 0, ctx.Err()
}
} else {
go func() {
_, _, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, false)
if err != nil {
logger.Error("failed to request store messages", zap.Error(err))
}
}()
}
return
}
func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID peer.ID, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) {
requestID := protocol.GenerateRequestID()
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID))
opts := []store.RequestOption{
store.WithPaging(false, limit),
store.WithRequestID(requestID),
store.WithPeer(peerID),
store.WithCursor(cursor)}
logger.Debug("store.query",
logging.Timep("startTime", criteria.TimeStart),
logging.Timep("endTime", criteria.TimeEnd),
zap.Strings("contentTopics", criteria.ContentTopics.ToList()),
zap.String("pubsubTopic", criteria.PubsubTopic),
zap.String("cursor", hexutil.Encode(cursor)),
)
queryStart := time.Now()
result, err := hr.store.Query(ctx, criteria, opts...)
queryDuration := time.Since(queryStart)
if err != nil {
logger.Error("error querying storenode", zap.Error(err))
hr.historyProcessor.OnRequestFailed(requestID, peerID, err)
return nil, 0, err
}
messages := result.Messages()
envelopesCount := len(messages)
logger.Debug("store.query response", zap.Duration("queryDuration", queryDuration), zap.Int("numMessages", envelopesCount), zap.Bool("hasCursor", result.IsComplete() && result.Cursor() != nil))
for _, mkv := range messages {
envelope := protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic())
err := hr.historyProcessor.OnEnvelope(envelope, processEnvelopes)
if err != nil {
return nil, 0, err
}
}
return result.Cursor(), envelopesCount, nil
}

View File

@ -0,0 +1,32 @@
package history
import (
"time"
"github.com/libp2p/go-libp2p/core/peer"
)
type SortedStorenode struct {
Storenode peer.ID
RTT time.Duration
CanConnectAfter time.Time
}
type byRTTMsAndCanConnectBefore []SortedStorenode
func (s byRTTMsAndCanConnectBefore) Len() int {
return len(s)
}
func (s byRTTMsAndCanConnectBefore) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s byRTTMsAndCanConnectBefore) Less(i, j int) bool {
// Slightly inaccurate as time sensitive sorting, but it does not matter so much
now := time.Now()
if s[i].CanConnectAfter.Before(now) && s[j].CanConnectAfter.Before(now) {
return s[i].RTT < s[j].RTT
}
return s[i].CanConnectAfter.Before(s[j].CanConnectAfter)
}

View File

@ -8,8 +8,8 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/libp2p/go-libp2p/core/peer"
apicommon "github.com/waku-org/go-waku/waku/v2/api/common"
"github.com/waku-org/go-waku/waku/v2/api/history"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
@ -29,7 +29,6 @@ type ISentCheck interface {
Start()
Add(topic string, messageID common.Hash, sentTime uint32)
DeleteByMessageIDs(messageIDs []common.Hash)
SetStorePeerID(peerID peer.ID)
}
// MessageSentCheck tracks the outgoing messages and check against store node
@ -38,11 +37,11 @@ type ISentCheck interface {
type MessageSentCheck struct {
messageIDs map[string]map[common.Hash]uint32
messageIDsMu sync.RWMutex
storePeerID peer.ID
messageStoredChan chan common.Hash
messageExpiredChan chan common.Hash
ctx context.Context
store *store.WakuStore
storenodeCycle *history.StorenodeCycle
timesource timesource.Timesource
logger *zap.Logger
maxHashQueryLength uint64
@ -53,7 +52,7 @@ type MessageSentCheck struct {
}
// NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters
func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck {
func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, cycle *history.StorenodeCycle, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck {
return &MessageSentCheck{
messageIDs: make(map[string]map[common.Hash]uint32),
messageIDsMu: sync.RWMutex{},
@ -61,6 +60,7 @@ func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource
messageExpiredChan: msgExpiredChan,
ctx: ctx,
store: store,
storenodeCycle: cycle,
timesource: timesource,
logger: logger,
maxHashQueryLength: DefaultMaxHashQueryLength,
@ -139,11 +139,6 @@ func (m *MessageSentCheck) DeleteByMessageIDs(messageIDs []common.Hash) {
}
}
// SetStorePeerID sets the peer id of store node
func (m *MessageSentCheck) SetStorePeerID(peerID peer.ID) {
m.storePeerID = peerID
}
// Start checks if the tracked outgoing messages are stored periodically
func (m *MessageSentCheck) Start() {
defer utils.LogOnPanic()
@ -211,7 +206,7 @@ func (m *MessageSentCheck) Start() {
}
func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []common.Hash, relayTime []uint32, pubsubTopic string) []common.Hash {
selectedPeer := m.storePeerID
selectedPeer := m.storenodeCycle.GetActiveStorenode()
if selectedPeer == "" {
m.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic))
return []common.Hash{}

View File

@ -10,7 +10,7 @@ import (
func TestAddAndDelete(t *testing.T) {
ctx := context.TODO()
messageSentCheck := NewMessageSentCheck(ctx, nil, nil, nil, nil, nil)
messageSentCheck := NewMessageSentCheck(ctx, nil, nil, nil, nil, nil, nil)
messageSentCheck.Add("topic", [32]byte{1}, 1)
messageSentCheck.Add("topic", [32]byte{2}, 2)

View File

@ -6,7 +6,6 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
@ -162,9 +161,3 @@ func (ms *MessageSender) MessagesDelivered(messageIDs []common.Hash) {
ms.messageSentCheck.DeleteByMessageIDs(messageIDs)
}
}
func (ms *MessageSender) SetStorePeerID(peerID peer.ID) {
if ms.messageSentCheck != nil {
ms.messageSentCheck.SetStorePeerID(peerID)
}
}

View File

@ -50,8 +50,8 @@ type StoreError struct {
}
// NewStoreError creates a new instance of StoreError
func NewStoreError(code int, message string) StoreError {
return StoreError{
func NewStoreError(code int, message string) *StoreError {
return &StoreError{
Code: code,
Message: message,
}
@ -317,7 +317,7 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe
if storeResponse.GetStatusCode() != ok {
err := NewStoreError(int(storeResponse.GetStatusCode()), storeResponse.GetStatusDesc())
return nil, &err
return nil, err
}
return storeResponse, nil
}