diff --git a/logging/logging.go b/logging/logging.go index ac895e9d..f8742ac5 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -74,6 +74,14 @@ func (t timestamp) String() string { return time.Unix(0, int64(t)).Format(time.RFC3339) } +func Timep(key string, time *int64) zapcore.Field { + if time == nil { + return zap.String(key, "-") + } else { + return Time(key, *time) + } +} + func Epoch(key string, time time.Time) zap.Field { return zap.String(key, fmt.Sprintf("%d", time.UnixNano())) } diff --git a/waku/v2/api/history/cycle.go b/waku/v2/api/history/cycle.go new file mode 100644 index 00000000..e09b2530 --- /dev/null +++ b/waku/v2/api/history/cycle.go @@ -0,0 +1,523 @@ +package history + +import ( + "context" + "crypto/rand" + "errors" + "fmt" + "math" + "math/big" + "net" + "net/http" + "runtime" + "sort" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + "go.uber.org/zap" +) + +const defaultBackoff = 10 * time.Second +const graylistBackoff = 3 * time.Minute +const storenodeMaxFailedRequests uint = 2 +const isAndroidEmulator = runtime.GOOS == "android" && runtime.GOARCH == "amd64" +const findNearestMailServer = !isAndroidEmulator +const overrideDNS = runtime.GOOS == "android" || runtime.GOOS == "ios" +const bootstrapDNS = "8.8.8.8:53" + +type connStatus int + +const ( + disconnected connStatus = iota + 1 + connected +) + +type peerStatus struct { + status connStatus + canConnectAfter time.Time + lastConnectionAttempt time.Time +} + +type StorenodeConfigProvider interface { + UseStorenodes() (bool, error) + GetPinnedStorenode() (peer.ID, error) + Storenodes() ([]peer.ID, error) +} + +type StorenodeCycle struct { + sync.RWMutex + + logger *zap.Logger + + host host.Host + + storenodeConfigProvider StorenodeConfigProvider + + StorenodeAvailableOneshotEmitter *OneShotEmitter[struct{}] + StorenodeChangedEmitter *Emitter[peer.ID] + StorenodeNotWorkingEmitter *Emitter[struct{}] + StorenodeAvailableEmitter *Emitter[peer.ID] + + failedRequests map[peer.ID]uint + + peersMutex sync.RWMutex + activeStorenode peer.ID + peers map[peer.ID]peerStatus +} + +func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle { + return &StorenodeCycle{ + StorenodeAvailableOneshotEmitter: NewOneshotEmitter[struct{}](), + StorenodeChangedEmitter: NewEmitter[peer.ID](), + StorenodeNotWorkingEmitter: NewEmitter[struct{}](), + StorenodeAvailableEmitter: NewEmitter[peer.ID](), + logger: logger.Named("storenode-cycle"), + } +} + +func (m *StorenodeCycle) Start(ctx context.Context, h host.Host) { + m.logger.Debug("starting storenode cycle") + m.host = h + m.failedRequests = make(map[peer.ID]uint) + m.peers = make(map[peer.ID]peerStatus) + + go m.verifyStorenodeStatus(ctx) +} + +func (m *StorenodeCycle) DisconnectActiveStorenode(backoff time.Duration) { + m.Lock() + defer m.Unlock() + + m.disconnectActiveStorenode(backoff) +} + +func (m *StorenodeCycle) connectToNewStorenodeAndWait(ctx context.Context) error { + // Handle pinned storenodes + m.logger.Info("disconnecting storenode") + pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode() + if err != nil { + m.logger.Error("could not obtain the pinned storenode", zap.Error(err)) + return err + } + + // If no pinned storenode, no need to disconnect and wait for it to be available + if pinnedStorenode == "" { + m.disconnectActiveStorenode(graylistBackoff) + } + + return m.findNewStorenode(ctx) +} + +func (m *StorenodeCycle) disconnectStorenode(backoffDuration time.Duration) error { + if m.activeStorenode == "" { + m.logger.Info("no active storenode") + return nil + } + + m.logger.Info("disconnecting active storenode", zap.Stringer("peerID", m.activeStorenode)) + + m.peersMutex.Lock() + pInfo, ok := m.peers[m.activeStorenode] + if ok { + pInfo.status = disconnected + pInfo.canConnectAfter = time.Now().Add(backoffDuration) + m.peers[m.activeStorenode] = pInfo + } else { + m.peers[m.activeStorenode] = peerStatus{ + status: disconnected, + canConnectAfter: time.Now().Add(backoffDuration), + } + } + m.peersMutex.Unlock() + + m.activeStorenode = "" + + return nil +} + +func (m *StorenodeCycle) disconnectActiveStorenode(backoffDuration time.Duration) { + err := m.disconnectStorenode(backoffDuration) + if err != nil { + m.logger.Error("failed to disconnect storenode", zap.Error(err)) + } + + m.StorenodeChangedEmitter.Emit("") +} + +func (m *StorenodeCycle) Cycle(ctx context.Context) { + if m.storenodeConfigProvider == nil { + m.logger.Info("storenodeConfigProvider not yet setup") + return + } + + m.logger.Info("Automatically switching storenode") + + if m.activeStorenode != "" { + m.disconnectActiveStorenode(graylistBackoff) + } + + useStorenode, err := m.storenodeConfigProvider.UseStorenodes() + if err != nil { + m.logger.Error("failed to get use storenodes", zap.Error(err)) + return + } + + if !useStorenode { + m.logger.Info("Skipping storenode search due to useStorenode being false") + return + } + + err = m.findNewStorenode(ctx) + if err != nil { + m.logger.Error("Error getting new storenode", zap.Error(err)) + } +} + +func poolSize(fleetSize int) int { + return int(math.Ceil(float64(fleetSize) / 4)) +} + +func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.ID) []peer.ID { + // TODO: this can be replaced by peer selector once code is moved to go-waku api + availableStorenodes := make(map[peer.ID]time.Duration) + availableStorenodesMutex := sync.Mutex{} + availableStorenodesWg := sync.WaitGroup{} + for _, storenode := range allStorenodes { + availableStorenodesWg.Add(1) + go func(peerID peer.ID) { + defer availableStorenodesWg.Done() + ctx, cancel := context.WithTimeout(ctx, 4*time.Second) + defer cancel() + + rtt, err := m.pingPeer(ctx, peerID) + if err == nil { // pinging storenodes might fail, but we don't care + availableStorenodesMutex.Lock() + availableStorenodes[peerID] = rtt + availableStorenodesMutex.Unlock() + } + }(storenode) + } + availableStorenodesWg.Wait() + + if len(availableStorenodes) == 0 { + m.logger.Warn("No storenodes available") // Do nothing.. + return nil + } + + var sortedStorenodes []SortedStorenode + for storenodeID, rtt := range availableStorenodes { + sortedStorenode := SortedStorenode{ + Storenode: storenodeID, + RTT: rtt, + } + m.peersMutex.Lock() + pInfo, ok := m.peers[storenodeID] + m.peersMutex.Unlock() + if ok && time.Now().Before(pInfo.canConnectAfter) { + continue // We can't connect to this node yet + } + sortedStorenodes = append(sortedStorenodes, sortedStorenode) + } + sort.Sort(byRTTMsAndCanConnectBefore(sortedStorenodes)) + + result := make([]peer.ID, len(sortedStorenodes)) + for i, s := range sortedStorenodes { + result[i] = s.Storenode + } + + return result +} + +func (m *StorenodeCycle) pingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { + pingResultCh := ping.Ping(ctx, m.host, peerID) + select { + case <-ctx.Done(): + return 0, ctx.Err() + case r := <-pingResultCh: + if r.Error != nil { + return 0, r.Error + } + return r.RTT, nil + } +} + +func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error { + // we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581 + if overrideDNS { + var dialer net.Dialer + net.DefaultResolver = &net.Resolver{ + PreferGo: false, + Dial: func(context context.Context, _, _ string) (net.Conn, error) { + conn, err := dialer.DialContext(context, "udp", bootstrapDNS) + if err != nil { + return nil, err + } + return conn, nil + }, + } + } + + pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode() + if err != nil { + m.logger.Error("Could not obtain the pinned storenode", zap.Error(err)) + return err + } + + if pinnedStorenode != "" { + return m.connect(pinnedStorenode) + } + + m.logger.Info("Finding a new storenode..") + + allStorenodes, err := m.storenodeConfigProvider.Storenodes() + if err != nil { + return err + } + + // TODO: remove this check once sockets are stable on x86_64 emulators + if findNearestMailServer { + allStorenodes = m.getAvailableStorenodesSortedByRTT(ctx, allStorenodes) + } + + // Picks a random storenode amongs the ones with the lowest latency + // The pool size is 1/4 of the storenodes were pinged successfully + pSize := poolSize(len(allStorenodes) - 1) + if pSize <= 0 { + pSize = len(allStorenodes) + if pSize <= 0 { + m.logger.Warn("No storenodes available") // Do nothing.. + return nil + } + } + + r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize))) + if err != nil { + return err + } + + ms := allStorenodes[r.Int64()] + return m.connect(ms) +} + +func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus { + m.peersMutex.RLock() + defer m.peersMutex.RUnlock() + + peer, ok := m.peers[peerID] + if !ok { + return disconnected + } + return peer.status +} + +func (m *StorenodeCycle) connect(peerID peer.ID) error { + m.logger.Info("connecting to storenode", zap.Stringer("peerID", peerID)) + + m.activeStorenode = peerID + + m.StorenodeChangedEmitter.Emit(m.activeStorenode) + + storenodeStatus := m.storenodeStatus(peerID) + if storenodeStatus != connected { + m.peersMutex.Lock() + m.peers[peerID] = peerStatus{ + status: connected, + lastConnectionAttempt: time.Now(), + canConnectAfter: time.Now().Add(defaultBackoff), + } + m.peersMutex.Unlock() + + m.failedRequests[peerID] = 0 + m.logger.Info("storenode available", zap.Stringer("peerID", m.activeStorenode)) + + m.StorenodeAvailableOneshotEmitter.Emit(struct{}{}) // Maybe can be refactored away? + m.StorenodeAvailableEmitter.Emit(m.activeStorenode) + } + return nil +} + +func (m *StorenodeCycle) GetActiveStorenode() peer.ID { + m.RLock() + defer m.RUnlock() + + return m.activeStorenode +} + +func (m *StorenodeCycle) IsStorenodeAvailable(peerID peer.ID) bool { + return m.storenodeStatus(peerID) == connected +} + +func (m *StorenodeCycle) penalizeStorenode(id peer.ID) { + m.peersMutex.Lock() + defer m.peersMutex.Unlock() + pInfo, ok := m.peers[id] + if !ok { + pInfo.status = disconnected + } + + pInfo.canConnectAfter = time.Now().Add(graylistBackoff) + m.peers[id] = pInfo +} + +func (m *StorenodeCycle) verifyStorenodeStatus(ctx context.Context) { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := m.disconnectStorenodeIfRequired(ctx) + if err != nil { + m.logger.Error("failed to handle storenode cycle event", zap.Error(err)) + continue + } + + case <-ctx.Done(): + return + } + } +} + +func (m *StorenodeCycle) disconnectStorenodeIfRequired(ctx context.Context) error { + m.logger.Debug("wakuV2 storenode status verification") + + if m.activeStorenode == "" { + // No active storenode, find a new one + m.Cycle(ctx) + return nil + } + + // Check whether we want to disconnect the active storenode + if m.failedRequests[m.activeStorenode] >= storenodeMaxFailedRequests { + m.penalizeStorenode(m.activeStorenode) + m.StorenodeNotWorkingEmitter.Emit(struct{}{}) + + m.logger.Info("too many failed requests", zap.Stringer("storenode", m.activeStorenode)) + m.failedRequests[m.activeStorenode] = 0 + return m.connectToNewStorenodeAndWait(ctx) + } + + return nil +} + +func (m *StorenodeCycle) SetStorenodeConfigProvider(provider StorenodeConfigProvider) { + m.storenodeConfigProvider = provider +} + +func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout time.Duration) bool { + // Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start. + // This can be improved after merging https://github.com/status-im/status-go/pull/4380. + // NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately + timeout += time.Second + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for !m.IsStorenodeAvailable(m.activeStorenode) { + select { + case <-m.StorenodeAvailableOneshotEmitter.Subscribe(): + case <-ctx.Done(): + return + } + } + }() + + select { + case <-waitForWaitGroup(&wg): + case <-ctx.Done(): + } + + return m.IsStorenodeAvailable(m.activeStorenode) +} + +func waitForWaitGroup(wg *sync.WaitGroup) <-chan struct{} { + ch := make(chan struct{}) + go func() { + wg.Wait() + close(ch) + }() + return ch +} + +type storenodeTaskParameters struct { + customPeerID peer.ID +} + +type StorenodeTaskOption func(*storenodeTaskParameters) + +func WithPeerID(peerID peer.ID) StorenodeTaskOption { + return func(stp *storenodeTaskParameters) { + stp.customPeerID = peerID + } +} + +func (m *StorenodeCycle) PerformStorenodeTask(fn func() error, options ...StorenodeTaskOption) error { + params := storenodeTaskParameters{} + for _, opt := range options { + opt(¶ms) + } + + peerID := params.customPeerID + if peerID == "" { + peerID = m.GetActiveStorenode() + } + + if peerID == "" { + return errors.New("storenode not available") + } + + m.RLock() + defer m.RUnlock() + + var tries uint = 0 + for tries < storenodeMaxFailedRequests { + if params.customPeerID == "" && m.storenodeStatus(peerID) != connected { + return errors.New("storenode not available") + } + m.logger.Info("trying performing history requests", zap.Uint("try", tries), zap.Stringer("peerID", peerID)) + + // Peform request + err := fn() + if err == nil { + // Reset failed requests + m.logger.Debug("history request performed successfully", zap.Stringer("peerID", peerID)) + m.failedRequests[peerID] = 0 + return nil + } + + m.logger.Error("failed to perform history request", + zap.Stringer("peerID", peerID), + zap.Uint("tries", tries), + zap.Error(err), + ) + + tries++ + + if storeErr, ok := err.(*store.StoreError); ok { + if storeErr.Code == http.StatusTooManyRequests { + m.disconnectActiveStorenode(defaultBackoff) + return fmt.Errorf("ratelimited at storenode %s: %w", peerID, err) + } + } + + // Increment failed requests + m.failedRequests[peerID]++ + + // Change storenode + if m.failedRequests[peerID] >= storenodeMaxFailedRequests { + return errors.New("too many failed requests") + } + // Wait a couple of second not to spam + time.Sleep(2 * time.Second) + + } + return errors.New("failed to perform history request") +} diff --git a/waku/v2/api/history/emitters.go b/waku/v2/api/history/emitters.go new file mode 100644 index 00000000..2aee5476 --- /dev/null +++ b/waku/v2/api/history/emitters.go @@ -0,0 +1,49 @@ +package history + +import "sync" + +type Emitter[T any] struct { + sync.Mutex + subscriptions []chan T +} + +func NewEmitter[T any]() *Emitter[T] { + return &Emitter[T]{} +} + +func (s *Emitter[T]) Subscribe() <-chan T { + s.Lock() + defer s.Unlock() + c := make(chan T) + s.subscriptions = append(s.subscriptions, c) + return c +} + +func (s *Emitter[T]) Emit(value T) { + s.Lock() + defer s.Unlock() + + for _, subs := range s.subscriptions { + subs <- value + } + s.subscriptions = nil +} + +type OneShotEmitter[T any] struct { + Emitter[T] +} + +func NewOneshotEmitter[T any]() *OneShotEmitter[T] { + return &OneShotEmitter[T]{} +} + +func (s *OneShotEmitter[T]) Emit(value T) { + s.Lock() + defer s.Unlock() + + for _, subs := range s.subscriptions { + subs <- value + close(subs) + } + s.subscriptions = nil +} diff --git a/waku/v2/api/history/history.go b/waku/v2/api/history/history.go new file mode 100644 index 00000000..3db71a50 --- /dev/null +++ b/waku/v2/api/history/history.go @@ -0,0 +1,285 @@ +package history + +import ( + "context" + "errors" + "math" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + "go.uber.org/zap" +) + +const maxTopicsPerRequest int = 10 +const mailserverRequestTimeout = 30 * time.Second + +type work struct { + criteria store.FilterCriteria + cursor []byte + limit uint64 +} + +type HistoryRetriever struct { + store *store.WakuStore + logger *zap.Logger + historyProcessor HistoryProcessor +} + +type HistoryProcessor interface { + OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error + OnRequestFailed(requestID []byte, peerID peer.ID, err error) +} + +func NewHistoryRetriever(store *store.WakuStore, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever { + return &HistoryRetriever{ + store: store, + logger: logger.Named("history-retriever"), + historyProcessor: historyProcessor, + } +} + +func (hr *HistoryRetriever) Query( + ctx context.Context, + criteria store.FilterCriteria, + storenodeID peer.ID, + pageLimit uint64, + shouldProcessNextPage func(int) (bool, uint64), + processEnvelopes bool, +) error { + logger := hr.logger.With( + logging.Timep("fromString", criteria.TimeStart), + logging.Timep("toString", criteria.TimeEnd), + zap.String("pubsubTopic", criteria.PubsubTopic), + zap.Strings("contentTopics", criteria.ContentTopicsList()), + zap.Int64p("from", criteria.TimeStart), + zap.Int64p("to", criteria.TimeEnd), + ) + + logger.Info("syncing") + + wg := sync.WaitGroup{} + workWg := sync.WaitGroup{} + workCh := make(chan work, 1000) // each batch item is split in 10 topics bunch and sent to this channel + workCompleteCh := make(chan struct{}) // once all batch items are processed, this channel is triggered + semaphore := make(chan struct{}, 3) // limit the number of concurrent queries + errCh := make(chan error) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // TODO: refactor this by extracting the consumer into a separate go routine. + + // Producer + wg.Add(1) + go func() { + defer func() { + logger.Debug("mailserver batch producer complete") + wg.Done() + }() + + contentTopicList := criteria.ContentTopics.ToList() + + // TODO: split into 24h batches + + allWorks := int(math.Ceil(float64(len(contentTopicList)) / float64(maxTopicsPerRequest))) + workWg.Add(allWorks) + + for i := 0; i < len(contentTopicList); i += maxTopicsPerRequest { + j := i + maxTopicsPerRequest + if j > len(contentTopicList) { + j = len(contentTopicList) + } + + select { + case <-ctx.Done(): + logger.Debug("processBatch producer - context done") + return + default: + logger.Debug("processBatch producer - creating work") + workCh <- work{ + criteria: store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter(criteria.PubsubTopic, contentTopicList[i:j]...), + TimeStart: criteria.TimeStart, + TimeEnd: criteria.TimeEnd, + }, + limit: pageLimit, + } + } + } + + go func() { + workWg.Wait() + workCompleteCh <- struct{}{} + }() + + logger.Debug("processBatch producer complete") + }() + + var result error + +loop: + for { + select { + case <-ctx.Done(): + logger.Debug("processBatch cleanup - context done") + result = ctx.Err() + if errors.Is(result, context.Canceled) { + result = nil + } + break loop + case w, ok := <-workCh: + if !ok { + continue + } + + logger.Debug("processBatch - received work") + + semaphore <- struct{}{} + go func(w work) { // Consumer + defer func() { + workWg.Done() + <-semaphore + }() + + queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout) + cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, w.criteria, w.cursor, w.limit, true, processEnvelopes, logger) + queryCancel() + + if err != nil { + logger.Debug("failed to send request", zap.Error(err)) + errCh <- err + return + } + + processNextPage := true + nextPageLimit := pageLimit + if shouldProcessNextPage != nil { + processNextPage, nextPageLimit = shouldProcessNextPage(envelopesCount) + } + + if !processNextPage { + return + } + + // Check the cursor after calling `shouldProcessNextPage`. + // The app might use process the fetched envelopes in the callback for own needs. + if cursor == nil { + return + } + + logger.Debug("processBatch producer - creating work (cursor)") + + workWg.Add(1) + workCh <- work{ + criteria: w.criteria, + cursor: cursor, + limit: nextPageLimit, + } + }(w) + case err := <-errCh: + logger.Debug("processBatch - received error", zap.Error(err)) + cancel() // Kill go routines + return err + case <-workCompleteCh: + logger.Debug("processBatch - all jobs complete") + cancel() // Kill go routines + } + } + + wg.Wait() + + logger.Info("synced topic", zap.NamedError("hasError", result)) + + return result +} + +func (hr *HistoryRetriever) createMessagesRequest( + ctx context.Context, + peerID peer.ID, + criteria store.FilterCriteria, + cursor []byte, + limit uint64, + waitForResponse bool, + processEnvelopes bool, + logger *zap.Logger, +) (storeCursor []byte, envelopesCount int, err error) { + if waitForResponse { + resultCh := make(chan struct { + storeCursor []byte + envelopesCount int + err error + }) + + go func() { + storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, processEnvelopes) + resultCh <- struct { + storeCursor []byte + envelopesCount int + err error + }{storeCursor, envelopesCount, err} + }() + + select { + case result := <-resultCh: + return result.storeCursor, result.envelopesCount, result.err + case <-ctx.Done(): + return nil, 0, ctx.Err() + } + } else { + go func() { + _, _, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, false) + if err != nil { + logger.Error("failed to request store messages", zap.Error(err)) + } + }() + } + + return +} + +func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID peer.ID, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) { + requestID := protocol.GenerateRequestID() + logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID)) + + opts := []store.RequestOption{ + store.WithPaging(false, limit), + store.WithRequestID(requestID), + store.WithPeer(peerID), + store.WithCursor(cursor)} + + logger.Debug("store.query", + logging.Timep("startTime", criteria.TimeStart), + logging.Timep("endTime", criteria.TimeEnd), + zap.Strings("contentTopics", criteria.ContentTopics.ToList()), + zap.String("pubsubTopic", criteria.PubsubTopic), + zap.String("cursor", hexutil.Encode(cursor)), + ) + + queryStart := time.Now() + result, err := hr.store.Query(ctx, criteria, opts...) + queryDuration := time.Since(queryStart) + if err != nil { + logger.Error("error querying storenode", zap.Error(err)) + + hr.historyProcessor.OnRequestFailed(requestID, peerID, err) + + return nil, 0, err + } + + messages := result.Messages() + envelopesCount := len(messages) + logger.Debug("store.query response", zap.Duration("queryDuration", queryDuration), zap.Int("numMessages", envelopesCount), zap.Bool("hasCursor", result.IsComplete() && result.Cursor() != nil)) + for _, mkv := range messages { + envelope := protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()) + err := hr.historyProcessor.OnEnvelope(envelope, processEnvelopes) + if err != nil { + return nil, 0, err + } + } + return result.Cursor(), envelopesCount, nil +} diff --git a/waku/v2/api/history/sort.go b/waku/v2/api/history/sort.go new file mode 100644 index 00000000..22e94c57 --- /dev/null +++ b/waku/v2/api/history/sort.go @@ -0,0 +1,32 @@ +package history + +import ( + "time" + + "github.com/libp2p/go-libp2p/core/peer" +) + +type SortedStorenode struct { + Storenode peer.ID + RTT time.Duration + CanConnectAfter time.Time +} + +type byRTTMsAndCanConnectBefore []SortedStorenode + +func (s byRTTMsAndCanConnectBefore) Len() int { + return len(s) +} + +func (s byRTTMsAndCanConnectBefore) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s byRTTMsAndCanConnectBefore) Less(i, j int) bool { + // Slightly inaccurate as time sensitive sorting, but it does not matter so much + now := time.Now() + if s[i].CanConnectAfter.Before(now) && s[j].CanConnectAfter.Before(now) { + return s[i].RTT < s[j].RTT + } + return s[i].CanConnectAfter.Before(s[j].CanConnectAfter) +} diff --git a/waku/v2/api/publish/message_check.go b/waku/v2/api/publish/message_check.go index c6df0f29..05916574 100644 --- a/waku/v2/api/publish/message_check.go +++ b/waku/v2/api/publish/message_check.go @@ -8,8 +8,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/libp2p/go-libp2p/core/peer" apicommon "github.com/waku-org/go-waku/waku/v2/api/common" + "github.com/waku-org/go-waku/waku/v2/api/history" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" @@ -29,7 +29,6 @@ type ISentCheck interface { Start() Add(topic string, messageID common.Hash, sentTime uint32) DeleteByMessageIDs(messageIDs []common.Hash) - SetStorePeerID(peerID peer.ID) } // MessageSentCheck tracks the outgoing messages and check against store node @@ -38,11 +37,11 @@ type ISentCheck interface { type MessageSentCheck struct { messageIDs map[string]map[common.Hash]uint32 messageIDsMu sync.RWMutex - storePeerID peer.ID messageStoredChan chan common.Hash messageExpiredChan chan common.Hash ctx context.Context store *store.WakuStore + storenodeCycle *history.StorenodeCycle timesource timesource.Timesource logger *zap.Logger maxHashQueryLength uint64 @@ -53,7 +52,7 @@ type MessageSentCheck struct { } // NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters -func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck { +func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, cycle *history.StorenodeCycle, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck { return &MessageSentCheck{ messageIDs: make(map[string]map[common.Hash]uint32), messageIDsMu: sync.RWMutex{}, @@ -61,6 +60,7 @@ func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource messageExpiredChan: msgExpiredChan, ctx: ctx, store: store, + storenodeCycle: cycle, timesource: timesource, logger: logger, maxHashQueryLength: DefaultMaxHashQueryLength, @@ -139,11 +139,6 @@ func (m *MessageSentCheck) DeleteByMessageIDs(messageIDs []common.Hash) { } } -// SetStorePeerID sets the peer id of store node -func (m *MessageSentCheck) SetStorePeerID(peerID peer.ID) { - m.storePeerID = peerID -} - // Start checks if the tracked outgoing messages are stored periodically func (m *MessageSentCheck) Start() { defer utils.LogOnPanic() @@ -211,7 +206,7 @@ func (m *MessageSentCheck) Start() { } func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []common.Hash, relayTime []uint32, pubsubTopic string) []common.Hash { - selectedPeer := m.storePeerID + selectedPeer := m.storenodeCycle.GetActiveStorenode() if selectedPeer == "" { m.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic)) return []common.Hash{} diff --git a/waku/v2/api/publish/message_check_test.go b/waku/v2/api/publish/message_check_test.go index ef53f4d3..4fe7b6ee 100644 --- a/waku/v2/api/publish/message_check_test.go +++ b/waku/v2/api/publish/message_check_test.go @@ -10,7 +10,7 @@ import ( func TestAddAndDelete(t *testing.T) { ctx := context.TODO() - messageSentCheck := NewMessageSentCheck(ctx, nil, nil, nil, nil, nil) + messageSentCheck := NewMessageSentCheck(ctx, nil, nil, nil, nil, nil, nil) messageSentCheck.Add("topic", [32]byte{1}, 1) messageSentCheck.Add("topic", [32]byte{2}, 2) diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go index 479d894a..c1e9a4ca 100644 --- a/waku/v2/api/publish/message_sender.go +++ b/waku/v2/api/publish/message_sender.go @@ -6,7 +6,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -162,9 +161,3 @@ func (ms *MessageSender) MessagesDelivered(messageIDs []common.Hash) { ms.messageSentCheck.DeleteByMessageIDs(messageIDs) } } - -func (ms *MessageSender) SetStorePeerID(peerID peer.ID) { - if ms.messageSentCheck != nil { - ms.messageSentCheck.SetStorePeerID(peerID) - } -} diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index f7427b97..adee966e 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -50,8 +50,8 @@ type StoreError struct { } // NewStoreError creates a new instance of StoreError -func NewStoreError(code int, message string) StoreError { - return StoreError{ +func NewStoreError(code int, message string) *StoreError { + return &StoreError{ Code: code, Message: message, } @@ -317,7 +317,7 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe if storeResponse.GetStatusCode() != ok { err := NewStoreError(int(storeResponse.GetStatusCode()), storeResponse.GetStatusDesc()) - return nil, &err + return nil, err } return storeResponse, nil }