From 8a28978f83ab1b711f2e4bd295b4934cf021baab Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Sun, 10 Oct 2021 11:46:00 -0400 Subject: [PATCH] fix: wait until peer is connected to resume history --- waku/v2/node/wakunode2.go | 38 +++++++++++++++++++++++----- waku/v2/protocol/store/waku_store.go | 8 +++--- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 76fcd8ce..e22383de 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -30,6 +30,7 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/store" + "github.com/status-im/go-waku/waku/v2/utils" ) var log = logging.Logger("wakunode") @@ -266,14 +267,39 @@ func (w *WakuNode) startStore() { w.opts.store.Start(w.ctx, w.host) if w.opts.shouldResume { - if _, err := w.opts.store.Resume(string(relay.GetTopic(nil)), nil); err != nil { - log.Error("failed to resume", err) - } + // TODO: extract this to a function and run it when you go offline + // TODO: determine if a store is listening to a topic + go func() { + for { + t := time.NewTicker(time.Second) + peerVerif: + for { + select { + case <-w.quit: + return + case <-t.C: + _, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta3)) + if err == nil { + break peerVerif + } + } + } + + ctxWithTimeout, ctxCancel := context.WithTimeout(w.ctx, 20*time.Second) + defer ctxCancel() + if err := w.Resume(ctxWithTimeout, nil); err != nil { + log.Info("Retrying in 10s...") + time.Sleep(10 * time.Second) + } else { + break + } + } + }() } } func (w *WakuNode) addPeer(info *peer.AddrInfo, protocolID p2pproto.ID) error { - log.Info(fmt.Sprintf("adding peer %s", info.ID.Pretty())) + log.Info(fmt.Sprintf("Adding peer %s to peerstore", info.ID.Pretty())) w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) return w.host.Peerstore().AddProtocols(info.ID, string(protocolID)) @@ -314,12 +340,12 @@ func (w *WakuNode) Resume(ctx context.Context, peerList []peer.ID) error { return errors.New("WakuStore is not set") } - result, err := w.opts.store.Resume(string(relay.DefaultWakuTopic), peerList) + result, err := w.opts.store.Resume(ctx, string(relay.DefaultWakuTopic), peerList) if err != nil { return err } - log.Info("the number of retrieved messages since the last online time: ", result) + log.Info("Retrieved messages since the last online time: ", result) return nil } diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index be184d29..5a2d63c0 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -468,9 +468,11 @@ func DefaultOptions() []HistoryRequestOption { } func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) { + log.Info(fmt.Sprintf("Resuming message history with peer %s", selectedPeer)) + connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta3) if err != nil { - log.Info("failed to connect to remote peer", err) + log.Error("Failed to connect to remote peer", err) return nil, err } @@ -576,8 +578,6 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList currentTime := float64(time.Now().UnixNano()) lastSeenTime := store.findLastSeen() - log.Info("resuming message history") - var offset float64 = 200000 currentTime = currentTime + offset lastSeenTime = math.Max(lastSeenTime-offset, 0) @@ -614,8 +614,6 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList } } - log.Info(fmt.Sprintf("obtained %d messages...", len(response.Messages))) - for _, msg := range response.Messages { store.storeMessage(pubsubTopic, msg) }