fix: wait until peer is connected to resume history

This commit is contained in:
Richard Ramos 2021-10-10 11:46:00 -04:00
parent 8f87009466
commit 8a28978f83
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
2 changed files with 35 additions and 11 deletions

View File

@ -30,6 +30,7 @@ import (
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/status-im/go-waku/waku/v2/protocol/store" "github.com/status-im/go-waku/waku/v2/protocol/store"
"github.com/status-im/go-waku/waku/v2/utils"
) )
var log = logging.Logger("wakunode") var log = logging.Logger("wakunode")
@ -266,14 +267,39 @@ func (w *WakuNode) startStore() {
w.opts.store.Start(w.ctx, w.host) w.opts.store.Start(w.ctx, w.host)
if w.opts.shouldResume { if w.opts.shouldResume {
if _, err := w.opts.store.Resume(string(relay.GetTopic(nil)), nil); err != nil { // TODO: extract this to a function and run it when you go offline
log.Error("failed to resume", err) // TODO: determine if a store is listening to a topic
} go func() {
for {
t := time.NewTicker(time.Second)
peerVerif:
for {
select {
case <-w.quit:
return
case <-t.C:
_, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta3))
if err == nil {
break peerVerif
}
}
}
ctxWithTimeout, ctxCancel := context.WithTimeout(w.ctx, 20*time.Second)
defer ctxCancel()
if err := w.Resume(ctxWithTimeout, nil); err != nil {
log.Info("Retrying in 10s...")
time.Sleep(10 * time.Second)
} else {
break
}
}
}()
} }
} }
func (w *WakuNode) addPeer(info *peer.AddrInfo, protocolID p2pproto.ID) error { func (w *WakuNode) addPeer(info *peer.AddrInfo, protocolID p2pproto.ID) error {
log.Info(fmt.Sprintf("adding peer %s", info.ID.Pretty())) log.Info(fmt.Sprintf("Adding peer %s to peerstore", info.ID.Pretty()))
w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
return w.host.Peerstore().AddProtocols(info.ID, string(protocolID)) return w.host.Peerstore().AddProtocols(info.ID, string(protocolID))
@ -314,12 +340,12 @@ func (w *WakuNode) Resume(ctx context.Context, peerList []peer.ID) error {
return errors.New("WakuStore is not set") return errors.New("WakuStore is not set")
} }
result, err := w.opts.store.Resume(string(relay.DefaultWakuTopic), peerList) result, err := w.opts.store.Resume(ctx, string(relay.DefaultWakuTopic), peerList)
if err != nil { if err != nil {
return err return err
} }
log.Info("the number of retrieved messages since the last online time: ", result) log.Info("Retrieved messages since the last online time: ", result)
return nil return nil
} }

View File

@ -468,9 +468,11 @@ func DefaultOptions() []HistoryRequestOption {
} }
func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) { func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) {
log.Info(fmt.Sprintf("Resuming message history with peer %s", selectedPeer))
connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta3) connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta3)
if err != nil { if err != nil {
log.Info("failed to connect to remote peer", err) log.Error("Failed to connect to remote peer", err)
return nil, err return nil, err
} }
@ -576,8 +578,6 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
currentTime := float64(time.Now().UnixNano()) currentTime := float64(time.Now().UnixNano())
lastSeenTime := store.findLastSeen() lastSeenTime := store.findLastSeen()
log.Info("resuming message history")
var offset float64 = 200000 var offset float64 = 200000
currentTime = currentTime + offset currentTime = currentTime + offset
lastSeenTime = math.Max(lastSeenTime-offset, 0) lastSeenTime = math.Max(lastSeenTime-offset, 0)
@ -614,8 +614,6 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
} }
} }
log.Info(fmt.Sprintf("obtained %d messages...", len(response.Messages)))
for _, msg := range response.Messages { for _, msg := range response.Messages {
store.storeMessage(pubsubTopic, msg) store.storeMessage(pubsubTopic, msg)
} }