fix_: ensure storenode requests do not exceed 24h (#6115)
This commit is contained in:
parent
581e4776c9
commit
35e4c9e11c
2
go.mod
2
go.mod
|
@ -97,7 +97,7 @@ require (
|
||||||
github.com/schollz/peerdiscovery v1.7.0
|
github.com/schollz/peerdiscovery v1.7.0
|
||||||
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
||||||
github.com/urfave/cli/v2 v2.27.2
|
github.com/urfave/cli/v2 v2.27.2
|
||||||
github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0
|
github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90
|
||||||
github.com/wk8/go-ordered-map/v2 v2.1.7
|
github.com/wk8/go-ordered-map/v2 v2.1.7
|
||||||
github.com/yeqown/go-qrcode/v2 v2.2.1
|
github.com/yeqown/go-qrcode/v2 v2.2.1
|
||||||
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -2152,8 +2152,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27
|
||||||
github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE=
|
github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE=
|
||||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
||||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
||||||
github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0 h1:PNKcOPMn0yoC2NQaJPPB8FvHT/YtaU8hZAoovSl42KM=
|
github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90 h1:p7tehUW7f+D6pvMJYop2yJV03SJU2fFUusmSnKL3uow=
|
||||||
github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
|
github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
|
||||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
||||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
||||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/exp/maps"
|
||||||
|
|
||||||
"github.com/waku-org/go-waku/waku/v2/api/history"
|
"github.com/waku-org/go-waku/waku/v2/api/history"
|
||||||
|
|
||||||
|
@ -542,55 +543,15 @@ func (m *Messenger) syncFiltersFrom(peerID peer.ID, filters []*transport.Filter,
|
||||||
m.config.messengerSignalsHandler.HistoryRequestStarted(len(batches))
|
m.config.messengerSignalsHandler.HistoryRequestStarted(len(batches))
|
||||||
}
|
}
|
||||||
|
|
||||||
var batches24h []types.MailserverBatch
|
|
||||||
for pubsubTopic := range batches {
|
for pubsubTopic := range batches {
|
||||||
batchKeys := make([]int, 0, len(batches[pubsubTopic]))
|
batchKeys := maps.Keys(batches[pubsubTopic])
|
||||||
for k := range batches[pubsubTopic] {
|
|
||||||
batchKeys = append(batchKeys, k)
|
|
||||||
}
|
|
||||||
sort.Ints(batchKeys)
|
sort.Ints(batchKeys)
|
||||||
|
for _, k := range batchKeys {
|
||||||
keysToIterate := append([]int{}, batchKeys...)
|
err := m.processMailserverBatch(peerID, batches[pubsubTopic][k])
|
||||||
for {
|
if err != nil {
|
||||||
// For all batches
|
m.logger.Error("error syncing topics", zap.Error(err))
|
||||||
var tmpKeysToIterate []int
|
return nil, err
|
||||||
for _, k := range keysToIterate {
|
|
||||||
batch := batches[pubsubTopic][k]
|
|
||||||
|
|
||||||
dayBatch := types.MailserverBatch{
|
|
||||||
To: batch.To,
|
|
||||||
Cursor: batch.Cursor,
|
|
||||||
PubsubTopic: batch.PubsubTopic,
|
|
||||||
Topics: batch.Topics,
|
|
||||||
ChatIDs: batch.ChatIDs,
|
|
||||||
}
|
|
||||||
|
|
||||||
from := batch.To.Add(-oneDayDuration)
|
|
||||||
if from.After(batch.From) {
|
|
||||||
dayBatch.From = from
|
|
||||||
batches24h = append(batches24h, dayBatch)
|
|
||||||
|
|
||||||
// Replace og batch with new dates
|
|
||||||
batch.To = from
|
|
||||||
batches[pubsubTopic][k] = batch
|
|
||||||
tmpKeysToIterate = append(tmpKeysToIterate, k)
|
|
||||||
} else {
|
|
||||||
batches24h = append(batches24h, batch)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(tmpKeysToIterate) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
keysToIterate = tmpKeysToIterate
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, batch := range batches24h {
|
|
||||||
err := m.processMailserverBatch(peerID, batch)
|
|
||||||
if err != nil {
|
|
||||||
m.logger.Error("error syncing topics", zap.Error(err))
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -93,63 +93,64 @@ func (m *Messenger) asyncRequestAllHistoricMessages() {
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) GetPinnedStorenode() (peer.ID, error) {
|
func (m *Messenger) GetPinnedStorenode() (peer.AddrInfo, error) {
|
||||||
fleet, err := m.getFleet()
|
fleet, err := m.getFleet()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return peer.AddrInfo{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
pinnedMailservers, err := m.settings.GetPinnedMailservers()
|
pinnedMailservers, err := m.settings.GetPinnedMailservers()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return peer.AddrInfo{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
pinnedMailserver, ok := pinnedMailservers[fleet]
|
pinnedMailserver, ok := pinnedMailservers[fleet]
|
||||||
if !ok {
|
if !ok {
|
||||||
return "", nil
|
return peer.AddrInfo{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
fleetMailservers := mailservers.DefaultMailservers()
|
fleetMailservers := mailservers.DefaultMailservers()
|
||||||
|
|
||||||
for _, c := range fleetMailservers {
|
for _, c := range fleetMailservers {
|
||||||
if c.Fleet == fleet && c.ID == pinnedMailserver {
|
if c.Fleet == fleet && c.ID == pinnedMailserver {
|
||||||
return c.PeerID()
|
return c.PeerInfo()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.mailserversDatabase != nil {
|
if m.mailserversDatabase != nil {
|
||||||
customMailservers, err := m.mailserversDatabase.Mailservers()
|
customMailservers, err := m.mailserversDatabase.Mailservers()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return peer.AddrInfo{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, c := range customMailservers {
|
for _, c := range customMailservers {
|
||||||
if c.Fleet == fleet && c.ID == pinnedMailserver {
|
if c.Fleet == fleet && c.ID == pinnedMailserver {
|
||||||
return c.PeerID()
|
return c.PeerInfo()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return "", nil
|
return peer.AddrInfo{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) UseStorenodes() (bool, error) {
|
func (m *Messenger) UseStorenodes() (bool, error) {
|
||||||
return m.settings.CanUseMailservers()
|
return m.settings.CanUseMailservers()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) Storenodes() ([]peer.ID, error) {
|
func (m *Messenger) Storenodes() ([]peer.AddrInfo, error) {
|
||||||
mailservers, err := m.AllMailservers()
|
mailservers, err := m.AllMailservers()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var result []peer.ID
|
var result []peer.AddrInfo
|
||||||
for _, m := range mailservers {
|
for _, m := range mailservers {
|
||||||
peerID, err := m.PeerID()
|
|
||||||
|
peerInfo, err := m.PeerInfo()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
result = append(result, peerID)
|
result = append(result, peerInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
|
|
|
@ -49,13 +49,13 @@ type Mailserver struct {
|
||||||
FailedRequests uint `json:"-"`
|
FailedRequests uint `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m Mailserver) PeerInfo() (*peer.AddrInfo, error) {
|
func (m Mailserver) PeerInfo() (peer.AddrInfo, error) {
|
||||||
var maddrs []multiaddr.Multiaddr
|
var maddrs []multiaddr.Multiaddr
|
||||||
|
|
||||||
if m.ENR != nil {
|
if m.ENR != nil {
|
||||||
addrInfo, err := enr.EnodeToPeerInfo(m.ENR)
|
addrInfo, err := enr.EnodeToPeerInfo(m.ENR)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return peer.AddrInfo{}, err
|
||||||
}
|
}
|
||||||
addrInfo.Addrs = utils.EncapsulatePeerID(addrInfo.ID, addrInfo.Addrs...)
|
addrInfo.Addrs = utils.EncapsulatePeerID(addrInfo.ID, addrInfo.Addrs...)
|
||||||
maddrs = append(maddrs, addrInfo.Addrs...)
|
maddrs = append(maddrs, addrInfo.Addrs...)
|
||||||
|
@ -67,14 +67,14 @@ func (m Mailserver) PeerInfo() (*peer.AddrInfo, error) {
|
||||||
|
|
||||||
p, err := peer.AddrInfosFromP2pAddrs(maddrs...)
|
p, err := peer.AddrInfosFromP2pAddrs(maddrs...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return peer.AddrInfo{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(p) != 1 {
|
if len(p) != 1 {
|
||||||
return nil, errors.New("invalid mailserver setup")
|
return peer.AddrInfo{}, errors.New("invalid mailserver setup")
|
||||||
}
|
}
|
||||||
|
|
||||||
return &p[0], nil
|
return p[0], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m Mailserver) PeerID() (peer.ID, error) {
|
func (m Mailserver) PeerID() (peer.ID, error) {
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p/core/host"
|
||||||
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||||
|
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Pinger interface {
|
||||||
|
PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type defaultPingImpl struct {
|
||||||
|
host host.Host
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDefaultPinger(host host.Host) Pinger {
|
||||||
|
return &defaultPingImpl{
|
||||||
|
host: host,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *defaultPingImpl) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) {
|
||||||
|
d.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.AddressTTL)
|
||||||
|
pingResultCh := ping.Ping(ctx, d.host, peerInfo.ID)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return 0, ctx.Err()
|
||||||
|
case r := <-pingResultCh:
|
||||||
|
if r.Error != nil {
|
||||||
|
return 0, r.Error
|
||||||
|
}
|
||||||
|
return r.RTT, nil
|
||||||
|
}
|
||||||
|
}
|
12
vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenode_requestor.go
generated
vendored
Normal file
12
vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenode_requestor.go
generated
vendored
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type StorenodeRequestor interface {
|
||||||
|
Query(ctx context.Context, peerID peer.ID, query *pb.StoreQueryRequest) (StoreRequestResult, error)
|
||||||
|
}
|
|
@ -230,6 +230,7 @@ func (mgr *FilterManager) UnsubscribeFilter(filterID string) {
|
||||||
}
|
}
|
||||||
if len(af.sub.ContentFilter.ContentTopics) == 0 {
|
if len(af.sub.ContentFilter.ContentTopics) == 0 {
|
||||||
af.cancel()
|
af.cancel()
|
||||||
|
delete(mgr.filterSubscriptions, filterConfig.ID)
|
||||||
} else {
|
} else {
|
||||||
go af.sub.Unsubscribe(filterConfig.contentFilter)
|
go af.sub.Unsubscribe(filterConfig.contentFilter)
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,9 +14,8 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/host"
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
"github.com/waku-org/go-waku/waku/v2/api/common"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -46,8 +45,8 @@ type peerStatus struct {
|
||||||
|
|
||||||
type StorenodeConfigProvider interface {
|
type StorenodeConfigProvider interface {
|
||||||
UseStorenodes() (bool, error)
|
UseStorenodes() (bool, error)
|
||||||
GetPinnedStorenode() (peer.ID, error)
|
GetPinnedStorenode() (peer.AddrInfo, error)
|
||||||
Storenodes() ([]peer.ID, error)
|
Storenodes() ([]peer.AddrInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type StorenodeCycle struct {
|
type StorenodeCycle struct {
|
||||||
|
@ -55,9 +54,8 @@ type StorenodeCycle struct {
|
||||||
|
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
|
|
||||||
host host.Host
|
|
||||||
|
|
||||||
storenodeConfigProvider StorenodeConfigProvider
|
storenodeConfigProvider StorenodeConfigProvider
|
||||||
|
pinger common.Pinger
|
||||||
|
|
||||||
StorenodeAvailableOneshotEmitter *OneShotEmitter[struct{}]
|
StorenodeAvailableOneshotEmitter *OneShotEmitter[struct{}]
|
||||||
StorenodeChangedEmitter *Emitter[peer.ID]
|
StorenodeChangedEmitter *Emitter[peer.ID]
|
||||||
|
@ -71,19 +69,19 @@ type StorenodeCycle struct {
|
||||||
peers map[peer.ID]peerStatus
|
peers map[peer.ID]peerStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle {
|
func NewStorenodeCycle(logger *zap.Logger, pinger common.Pinger) *StorenodeCycle {
|
||||||
return &StorenodeCycle{
|
return &StorenodeCycle{
|
||||||
StorenodeAvailableOneshotEmitter: NewOneshotEmitter[struct{}](),
|
StorenodeAvailableOneshotEmitter: NewOneshotEmitter[struct{}](),
|
||||||
StorenodeChangedEmitter: NewEmitter[peer.ID](),
|
StorenodeChangedEmitter: NewEmitter[peer.ID](),
|
||||||
StorenodeNotWorkingEmitter: NewEmitter[struct{}](),
|
StorenodeNotWorkingEmitter: NewEmitter[struct{}](),
|
||||||
StorenodeAvailableEmitter: NewEmitter[peer.ID](),
|
StorenodeAvailableEmitter: NewEmitter[peer.ID](),
|
||||||
|
pinger: pinger,
|
||||||
logger: logger.Named("storenode-cycle"),
|
logger: logger.Named("storenode-cycle"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *StorenodeCycle) Start(ctx context.Context, h host.Host) {
|
func (m *StorenodeCycle) Start(ctx context.Context) {
|
||||||
m.logger.Debug("starting storenode cycle")
|
m.logger.Debug("starting storenode cycle")
|
||||||
m.host = h
|
|
||||||
m.failedRequests = make(map[peer.ID]uint)
|
m.failedRequests = make(map[peer.ID]uint)
|
||||||
m.peers = make(map[peer.ID]peerStatus)
|
m.peers = make(map[peer.ID]peerStatus)
|
||||||
|
|
||||||
|
@ -107,7 +105,7 @@ func (m *StorenodeCycle) connectToNewStorenodeAndWait(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no pinned storenode, no need to disconnect and wait for it to be available
|
// If no pinned storenode, no need to disconnect and wait for it to be available
|
||||||
if pinnedStorenode == "" {
|
if pinnedStorenode.ID == "" {
|
||||||
m.disconnectActiveStorenode(graylistBackoff)
|
m.disconnectActiveStorenode(graylistBackoff)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,21 +181,26 @@ func poolSize(fleetSize int) int {
|
||||||
return int(math.Ceil(float64(fleetSize) / 4))
|
return int(math.Ceil(float64(fleetSize) / 4))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.ID) []peer.ID {
|
func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.AddrInfo) []peer.AddrInfo {
|
||||||
|
peerIDToInfo := make(map[peer.ID]peer.AddrInfo)
|
||||||
|
for _, p := range allStorenodes {
|
||||||
|
peerIDToInfo[p.ID] = p
|
||||||
|
}
|
||||||
|
|
||||||
availableStorenodes := make(map[peer.ID]time.Duration)
|
availableStorenodes := make(map[peer.ID]time.Duration)
|
||||||
availableStorenodesMutex := sync.Mutex{}
|
availableStorenodesMutex := sync.Mutex{}
|
||||||
availableStorenodesWg := sync.WaitGroup{}
|
availableStorenodesWg := sync.WaitGroup{}
|
||||||
for _, storenode := range allStorenodes {
|
for _, storenode := range allStorenodes {
|
||||||
availableStorenodesWg.Add(1)
|
availableStorenodesWg.Add(1)
|
||||||
go func(peerID peer.ID) {
|
go func(peerInfo peer.AddrInfo) {
|
||||||
defer availableStorenodesWg.Done()
|
defer availableStorenodesWg.Done()
|
||||||
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
|
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
rtt, err := m.pingPeer(ctx, peerID)
|
rtt, err := m.pinger.PingPeer(ctx, peerInfo)
|
||||||
if err == nil { // pinging storenodes might fail, but we don't care
|
if err == nil { // pinging storenodes might fail, but we don't care
|
||||||
availableStorenodesMutex.Lock()
|
availableStorenodesMutex.Lock()
|
||||||
availableStorenodes[peerID] = rtt
|
availableStorenodes[peerInfo.ID] = rtt
|
||||||
availableStorenodesMutex.Unlock()
|
availableStorenodesMutex.Unlock()
|
||||||
}
|
}
|
||||||
}(storenode)
|
}(storenode)
|
||||||
|
@ -212,7 +215,7 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context,
|
||||||
var sortedStorenodes []SortedStorenode
|
var sortedStorenodes []SortedStorenode
|
||||||
for storenodeID, rtt := range availableStorenodes {
|
for storenodeID, rtt := range availableStorenodes {
|
||||||
sortedStorenode := SortedStorenode{
|
sortedStorenode := SortedStorenode{
|
||||||
Storenode: storenodeID,
|
Storenode: peerIDToInfo[storenodeID],
|
||||||
RTT: rtt,
|
RTT: rtt,
|
||||||
}
|
}
|
||||||
m.peersMutex.Lock()
|
m.peersMutex.Lock()
|
||||||
|
@ -225,7 +228,7 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context,
|
||||||
}
|
}
|
||||||
sort.Sort(byRTTMsAndCanConnectBefore(sortedStorenodes))
|
sort.Sort(byRTTMsAndCanConnectBefore(sortedStorenodes))
|
||||||
|
|
||||||
result := make([]peer.ID, len(sortedStorenodes))
|
result := make([]peer.AddrInfo, len(sortedStorenodes))
|
||||||
for i, s := range sortedStorenodes {
|
for i, s := range sortedStorenodes {
|
||||||
result[i] = s.Storenode
|
result[i] = s.Storenode
|
||||||
}
|
}
|
||||||
|
@ -233,19 +236,6 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context,
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *StorenodeCycle) pingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
|
|
||||||
pingResultCh := ping.Ping(ctx, m.host, peerID)
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return 0, ctx.Err()
|
|
||||||
case r := <-pingResultCh:
|
|
||||||
if r.Error != nil {
|
|
||||||
return 0, r.Error
|
|
||||||
}
|
|
||||||
return r.RTT, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
|
func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
|
||||||
// we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581
|
// we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581
|
||||||
if overrideDNS {
|
if overrideDNS {
|
||||||
|
@ -268,8 +258,8 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if pinnedStorenode != "" {
|
if pinnedStorenode.ID != "" {
|
||||||
return m.setActiveStorenode(pinnedStorenode)
|
return m.setActiveStorenode(pinnedStorenode.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.logger.Info("Finding a new storenode..")
|
m.logger.Info("Finding a new storenode..")
|
||||||
|
@ -303,7 +293,7 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
ms := allStorenodes[r.Int64()]
|
ms := allStorenodes[r.Int64()]
|
||||||
return m.setActiveStorenode(ms)
|
return m.setActiveStorenode(ms.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus {
|
func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package history
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -10,8 +11,12 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/api/common"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -25,7 +30,7 @@ type work struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type HistoryRetriever struct {
|
type HistoryRetriever struct {
|
||||||
store Store
|
store common.StorenodeRequestor
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
historyProcessor HistoryProcessor
|
historyProcessor HistoryProcessor
|
||||||
}
|
}
|
||||||
|
@ -35,11 +40,7 @@ type HistoryProcessor interface {
|
||||||
OnRequestFailed(requestID []byte, peerID peer.ID, err error)
|
OnRequestFailed(requestID []byte, peerID peer.ID, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Store interface {
|
func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
|
||||||
Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewHistoryRetriever(store Store, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
|
|
||||||
return &HistoryRetriever{
|
return &HistoryRetriever{
|
||||||
store: store,
|
store: store,
|
||||||
logger: logger.Named("history-retriever"),
|
logger: logger.Named("history-retriever"),
|
||||||
|
@ -158,7 +159,26 @@ loop:
|
||||||
}()
|
}()
|
||||||
|
|
||||||
queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout)
|
queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout)
|
||||||
cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, w.criteria, w.cursor, w.limit, true, processEnvelopes, logger)
|
|
||||||
|
// If time range is greater than 24 hours, limit the range: to - (to-24h)
|
||||||
|
// TODO: handle cases in which TimeStart/TimeEnd could be nil
|
||||||
|
// (this type of query does not happen in status-go, though, and
|
||||||
|
// nwaku might limit query duration to 24h anyway, so perhaps
|
||||||
|
// it's not worth adding such logic)
|
||||||
|
timeStart := w.criteria.TimeStart
|
||||||
|
timeEnd := w.criteria.TimeEnd
|
||||||
|
exceeds24h := false
|
||||||
|
if timeStart != nil && timeEnd != nil && *timeEnd-*timeStart > (24*time.Hour).Nanoseconds() {
|
||||||
|
newTimeStart := *timeEnd - (24 * time.Hour).Nanoseconds()
|
||||||
|
timeStart = &newTimeStart
|
||||||
|
exceeds24h = true
|
||||||
|
}
|
||||||
|
|
||||||
|
newCriteria := w.criteria
|
||||||
|
newCriteria.TimeStart = timeStart
|
||||||
|
newCriteria.TimeEnd = timeEnd
|
||||||
|
|
||||||
|
cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, newCriteria, w.cursor, w.limit, true, processEnvelopes, logger)
|
||||||
queryCancel()
|
queryCancel()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -179,18 +199,28 @@ loop:
|
||||||
|
|
||||||
// Check the cursor after calling `shouldProcessNextPage`.
|
// Check the cursor after calling `shouldProcessNextPage`.
|
||||||
// The app might use process the fetched envelopes in the callback for own needs.
|
// The app might use process the fetched envelopes in the callback for own needs.
|
||||||
if cursor == nil {
|
// If from/to does not exceed 24h and no cursor was returned, we have already
|
||||||
|
// requested the entire time range
|
||||||
|
if cursor == nil && !exceeds24h {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debug("processBatch producer - creating work (cursor)")
|
logger.Debug("processBatch producer - creating work (cursor)")
|
||||||
|
|
||||||
workWg.Add(1)
|
newWork := work{
|
||||||
workCh <- work{
|
|
||||||
criteria: w.criteria,
|
criteria: w.criteria,
|
||||||
cursor: cursor,
|
cursor: cursor,
|
||||||
limit: nextPageLimit,
|
limit: nextPageLimit,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If from/to has exceeded the 24h, but there are no more records within the current
|
||||||
|
// 24h range, then we update the `to` for the new work to not include it.
|
||||||
|
if cursor == nil && exceeds24h {
|
||||||
|
newWork.criteria.TimeEnd = timeStart
|
||||||
|
}
|
||||||
|
|
||||||
|
workWg.Add(1)
|
||||||
|
workCh <- newWork
|
||||||
}(w)
|
}(w)
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
logger.Debug("processBatch - received error", zap.Error(err))
|
logger.Debug("processBatch - received error", zap.Error(err))
|
||||||
|
@ -257,12 +287,6 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee
|
||||||
requestID := protocol.GenerateRequestID()
|
requestID := protocol.GenerateRequestID()
|
||||||
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID))
|
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID))
|
||||||
|
|
||||||
opts := []store.RequestOption{
|
|
||||||
store.WithPaging(false, limit),
|
|
||||||
store.WithRequestID(requestID),
|
|
||||||
store.WithPeer(peerID),
|
|
||||||
store.WithCursor(cursor)}
|
|
||||||
|
|
||||||
logger.Debug("store.query",
|
logger.Debug("store.query",
|
||||||
logging.Timep("startTime", criteria.TimeStart),
|
logging.Timep("startTime", criteria.TimeStart),
|
||||||
logging.Timep("endTime", criteria.TimeEnd),
|
logging.Timep("endTime", criteria.TimeEnd),
|
||||||
|
@ -271,8 +295,19 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee
|
||||||
zap.String("cursor", hexutil.Encode(cursor)),
|
zap.String("cursor", hexutil.Encode(cursor)),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
storeQueryRequest := &pb.StoreQueryRequest{
|
||||||
|
RequestId: hex.EncodeToString(requestID),
|
||||||
|
IncludeData: true,
|
||||||
|
PubsubTopic: &criteria.PubsubTopic,
|
||||||
|
ContentTopics: criteria.ContentTopicsList(),
|
||||||
|
TimeStart: criteria.TimeStart,
|
||||||
|
TimeEnd: criteria.TimeEnd,
|
||||||
|
PaginationCursor: cursor,
|
||||||
|
PaginationLimit: proto.Uint64(limit),
|
||||||
|
}
|
||||||
|
|
||||||
queryStart := time.Now()
|
queryStart := time.Now()
|
||||||
result, err := hr.store.Query(ctx, criteria, opts...)
|
result, err := hr.store.Query(ctx, peerID, storeQueryRequest)
|
||||||
queryDuration := time.Since(queryStart)
|
queryDuration := time.Since(queryStart)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("error querying storenode", zap.Error(err))
|
logger.Error("error querying storenode", zap.Error(err))
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type SortedStorenode struct {
|
type SortedStorenode struct {
|
||||||
Storenode peer.ID
|
Storenode peer.AddrInfo
|
||||||
RTT time.Duration
|
RTT time.Duration
|
||||||
CanConnectAfter time.Time
|
CanConnectAfter time.Time
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,12 +5,12 @@ import (
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/waku-org/go-waku/waku/v2/api/common"
|
"github.com/waku-org/go-waku/waku/v2/api/common"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||||
|
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewDefaultStorenodeRequestor(store *store.WakuStore) StorenodeRequestor {
|
func NewDefaultStorenodeRequestor(store *store.WakuStore) common.StorenodeRequestor {
|
||||||
return &defaultStorenodeRequestor{
|
return &defaultStorenodeRequestor{
|
||||||
store: store,
|
store: store,
|
||||||
}
|
}
|
||||||
|
@ -24,10 +24,6 @@ func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerI
|
||||||
return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize))
|
return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *defaultStorenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) {
|
func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) {
|
||||||
return d.store.Query(ctx, store.FilterCriteria{
|
return d.store.RequestRaw(ctx, peerID, storeQueryRequest)
|
||||||
ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...),
|
|
||||||
TimeStart: from,
|
|
||||||
TimeEnd: to,
|
|
||||||
}, store.WithPeer(peerID), store.WithPaging(false, pageSize), store.IncludeData(false))
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/api/common"
|
"github.com/waku-org/go-waku/waku/v2/api/common"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
|
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -31,17 +32,12 @@ type MessageTracker interface {
|
||||||
MessageExists(pb.MessageHash) (bool, error)
|
MessageExists(pb.MessageHash) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type StorenodeRequestor interface {
|
|
||||||
GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error)
|
|
||||||
QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria
|
// MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria
|
||||||
type MissingMessageVerifier struct {
|
type MissingMessageVerifier struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
params missingMessageVerifierParams
|
params missingMessageVerifierParams
|
||||||
|
|
||||||
storenodeRequestor StorenodeRequestor
|
storenodeRequestor common.StorenodeRequestor
|
||||||
messageTracker MessageTracker
|
messageTracker MessageTracker
|
||||||
|
|
||||||
criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
|
criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
|
||||||
|
@ -54,7 +50,7 @@ type MissingMessageVerifier struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMissingMessageVerifier creates an instance of a MissingMessageVerifier
|
// NewMissingMessageVerifier creates an instance of a MissingMessageVerifier
|
||||||
func NewMissingMessageVerifier(storenodeRequester StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier {
|
func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier {
|
||||||
options = append(defaultMissingMessagesVerifierOptions, options...)
|
options = append(defaultMissingMessagesVerifierOptions, options...)
|
||||||
params := missingMessageVerifierParams{}
|
params := missingMessageVerifierParams{}
|
||||||
for _, opt := range options {
|
for _, opt := range options {
|
||||||
|
@ -219,14 +215,19 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
||||||
)
|
)
|
||||||
|
|
||||||
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
|
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
|
||||||
return m.storenodeRequestor.QueryWithCriteria(
|
storeQueryRequest := &storepb.StoreQueryRequest{
|
||||||
|
RequestId: hex.EncodeToString(protocol.GenerateRequestID()),
|
||||||
|
PubsubTopic: &interest.contentFilter.PubsubTopic,
|
||||||
|
ContentTopics: contentTopics[batchFrom:batchTo],
|
||||||
|
TimeStart: proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()),
|
||||||
|
TimeEnd: proto.Int64(now.Add(-m.params.delay).UnixNano()),
|
||||||
|
PaginationLimit: proto.Uint64(messageFetchPageSize),
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.storenodeRequestor.Query(
|
||||||
ctx,
|
ctx,
|
||||||
interest.peerID,
|
interest.peerID,
|
||||||
messageFetchPageSize,
|
storeQueryRequest,
|
||||||
interest.contentFilter.PubsubTopic,
|
|
||||||
contentTopics[batchFrom:batchTo],
|
|
||||||
proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()),
|
|
||||||
proto.Int64(now.Add(-m.params.delay).UnixNano()),
|
|
||||||
)
|
)
|
||||||
}, logger, "retrieving history to check for missing messages")
|
}, logger, "retrieving history to check for missing messages")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -295,7 +296,20 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
||||||
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
|
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
|
||||||
queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout)
|
queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
return m.storenodeRequestor.GetMessagesByHash(queryCtx, interest.peerID, maxMsgHashesPerRequest, messageHashes)
|
|
||||||
|
var messageHashesBytes [][]byte
|
||||||
|
for _, m := range messageHashes {
|
||||||
|
messageHashesBytes = append(messageHashesBytes, m.Bytes())
|
||||||
|
}
|
||||||
|
|
||||||
|
storeQueryRequest := &storepb.StoreQueryRequest{
|
||||||
|
RequestId: hex.EncodeToString(protocol.GenerateRequestID()),
|
||||||
|
IncludeData: true,
|
||||||
|
MessageHashes: messageHashesBytes,
|
||||||
|
PaginationLimit: proto.Uint64(maxMsgHashesPerRequest),
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.storenodeRequestor.Query(queryCtx, interest.peerID, storeQueryRequest)
|
||||||
}, logger, "retrieving missing messages")
|
}, logger, "retrieving missing messages")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, context.Canceled) {
|
if !errors.Is(err, context.Canceled) {
|
||||||
|
|
|
@ -464,13 +464,17 @@ func (w *WakuNode) Start(ctx context.Context) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
//TODO: setting this up temporarily to improve connectivity success for lightNode in status.
|
}
|
||||||
//This will have to be removed or changed with community sharding will be implemented.
|
|
||||||
if w.opts.shards != nil {
|
//TODO: setting this up temporarily to improve connectivity success for lightNode
|
||||||
err = w.SetRelayShards(*w.opts.shards)
|
// in status. Also, when executing go-waku service-node as a lightclient
|
||||||
if err != nil {
|
// (using --pubsub-topic and --relay=false)
|
||||||
return err
|
// This will have to be removed or changed with community sharding will be
|
||||||
}
|
// implemented.
|
||||||
|
if w.opts.shards != nil {
|
||||||
|
err = w.SetRelayShards(*w.opts.shards)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -101,7 +101,9 @@ const (
|
||||||
const maxFailedAttempts = 5
|
const maxFailedAttempts = 5
|
||||||
const prunePeerStoreInterval = 10 * time.Minute
|
const prunePeerStoreInterval = 10 * time.Minute
|
||||||
const peerConnectivityLoopSecs = 15
|
const peerConnectivityLoopSecs = 15
|
||||||
const maxConnsToPeerRatio = 5
|
const maxConnsToPeerRatio = 3
|
||||||
|
const badPeersCleanupInterval = 1 * time.Minute
|
||||||
|
const maxDialFailures = 2
|
||||||
|
|
||||||
// 80% relay peers 20% service peers
|
// 80% relay peers 20% service peers
|
||||||
func relayAndServicePeers(maxConnections int) (int, int) {
|
func relayAndServicePeers(maxConnections int) (int, int) {
|
||||||
|
@ -256,16 +258,32 @@ func (pm *PeerManager) Start(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pm *PeerManager) removeBadPeers() {
|
||||||
|
if !pm.RelayEnabled {
|
||||||
|
for _, peerID := range pm.host.Peerstore().Peers() {
|
||||||
|
if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures {
|
||||||
|
//delete peer from peerStore
|
||||||
|
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
|
||||||
|
pm.RemovePeer(peerID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
|
func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
|
||||||
defer utils.LogOnPanic()
|
defer utils.LogOnPanic()
|
||||||
t := time.NewTicker(prunePeerStoreInterval)
|
t := time.NewTicker(prunePeerStoreInterval)
|
||||||
|
t1 := time.NewTicker(badPeersCleanupInterval)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
defer t1.Stop()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
pm.prunePeerStore()
|
pm.prunePeerStore()
|
||||||
|
case <-t1.C:
|
||||||
|
pm.removeBadPeers()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -744,4 +762,9 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
|
||||||
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
|
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if !pm.RelayEnabled && pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) >= maxDialFailures {
|
||||||
|
//delete peer from peerStore
|
||||||
|
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
|
||||||
|
pm.RemovePeer(peerID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -194,6 +194,35 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *WakuStore) RequestRaw(ctx context.Context, peerID peer.ID, storeRequest *pb.StoreQueryRequest) (Result, error) {
|
||||||
|
err := storeRequest.Validate()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var params Parameters
|
||||||
|
params.selectedPeer = peerID
|
||||||
|
if params.selectedPeer == "" {
|
||||||
|
return nil, ErrMustSelectPeer
|
||||||
|
}
|
||||||
|
|
||||||
|
response, err := s.queryFrom(ctx, storeRequest, ¶ms)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result := &resultImpl{
|
||||||
|
store: s,
|
||||||
|
messages: response.Messages,
|
||||||
|
storeRequest: storeRequest,
|
||||||
|
storeResponse: response,
|
||||||
|
peerID: params.selectedPeer,
|
||||||
|
cursor: response.PaginationCursor,
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Query retrieves all the messages that match a criteria. Use the options to indicate whether to return the message themselves or not.
|
// Query retrieves all the messages that match a criteria. Use the options to indicate whether to return the message themselves or not.
|
||||||
func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (Result, error) {
|
func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (Result, error) {
|
||||||
return s.Request(ctx, criteria, opts...)
|
return s.Request(ctx, criteria, opts...)
|
||||||
|
|
|
@ -3,21 +3,21 @@ package utils
|
||||||
import (
|
import (
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/btcec/v2"
|
"github.com/decred/dcrd/dcrec/secp256k1/v4"
|
||||||
"github.com/libp2p/go-libp2p/core/crypto"
|
"github.com/libp2p/go-libp2p/core/crypto"
|
||||||
)
|
)
|
||||||
|
|
||||||
// EcdsaPubKeyToSecp256k1PublicKey converts an `ecdsa.PublicKey` into a libp2p `crypto.Secp256k1PublicKey“
|
// EcdsaPubKeyToSecp256k1PublicKey converts an `ecdsa.PublicKey` into a libp2p `crypto.Secp256k1PublicKey“
|
||||||
func EcdsaPubKeyToSecp256k1PublicKey(pubKey *ecdsa.PublicKey) *crypto.Secp256k1PublicKey {
|
func EcdsaPubKeyToSecp256k1PublicKey(pubKey *ecdsa.PublicKey) *crypto.Secp256k1PublicKey {
|
||||||
xFieldVal := &btcec.FieldVal{}
|
xFieldVal := &secp256k1.FieldVal{}
|
||||||
yFieldVal := &btcec.FieldVal{}
|
yFieldVal := &secp256k1.FieldVal{}
|
||||||
xFieldVal.SetByteSlice(pubKey.X.Bytes())
|
xFieldVal.SetByteSlice(pubKey.X.Bytes())
|
||||||
yFieldVal.SetByteSlice(pubKey.Y.Bytes())
|
yFieldVal.SetByteSlice(pubKey.Y.Bytes())
|
||||||
return (*crypto.Secp256k1PublicKey)(btcec.NewPublicKey(xFieldVal, yFieldVal))
|
return (*crypto.Secp256k1PublicKey)(secp256k1.NewPublicKey(xFieldVal, yFieldVal))
|
||||||
}
|
}
|
||||||
|
|
||||||
// EcdsaPrivKeyToSecp256k1PrivKey converts an `ecdsa.PrivateKey` into a libp2p `crypto.Secp256k1PrivateKey“
|
// EcdsaPrivKeyToSecp256k1PrivKey converts an `ecdsa.PrivateKey` into a libp2p `crypto.Secp256k1PrivateKey“
|
||||||
func EcdsaPrivKeyToSecp256k1PrivKey(privKey *ecdsa.PrivateKey) *crypto.Secp256k1PrivateKey {
|
func EcdsaPrivKeyToSecp256k1PrivKey(privKey *ecdsa.PrivateKey) *crypto.Secp256k1PrivateKey {
|
||||||
privK, _ := btcec.PrivKeyFromBytes(privKey.D.Bytes())
|
privK := secp256k1.PrivKeyFromBytes(privKey.D.Bytes())
|
||||||
return (*crypto.Secp256k1PrivateKey)(privK)
|
return (*crypto.Secp256k1PrivateKey)(privK)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1044,7 +1044,7 @@ github.com/waku-org/go-discover/discover/v5wire
|
||||||
github.com/waku-org/go-libp2p-rendezvous
|
github.com/waku-org/go-libp2p-rendezvous
|
||||||
github.com/waku-org/go-libp2p-rendezvous/db
|
github.com/waku-org/go-libp2p-rendezvous/db
|
||||||
github.com/waku-org/go-libp2p-rendezvous/pb
|
github.com/waku-org/go-libp2p-rendezvous/pb
|
||||||
# github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0
|
# github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90
|
||||||
## explicit; go 1.21
|
## explicit; go 1.21
|
||||||
github.com/waku-org/go-waku/logging
|
github.com/waku-org/go-waku/logging
|
||||||
github.com/waku-org/go-waku/tests
|
github.com/waku-org/go-waku/tests
|
||||||
|
|
|
@ -72,6 +72,8 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
|
|
||||||
|
commonapi "github.com/waku-org/go-waku/waku/v2/api/common"
|
||||||
|
|
||||||
gocommon "github.com/status-im/status-go/common"
|
gocommon "github.com/status-im/status-go/common"
|
||||||
"github.com/status-im/status-go/connection"
|
"github.com/status-im/status-go/connection"
|
||||||
"github.com/status-im/status-go/eth-node/types"
|
"github.com/status-im/status-go/eth-node/types"
|
||||||
|
@ -1064,10 +1066,10 @@ func (w *Waku) Start() error {
|
||||||
return fmt.Errorf("failed to start go-waku node: %v", err)
|
return fmt.Errorf("failed to start go-waku node: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
w.StorenodeCycle = history.NewStorenodeCycle(w.logger)
|
w.StorenodeCycle = history.NewStorenodeCycle(w.logger, commonapi.NewDefaultPinger(w.node.Host()))
|
||||||
w.HistoryRetriever = history.NewHistoryRetriever(w.node.Store(), NewHistoryProcessorWrapper(w), w.logger)
|
w.HistoryRetriever = history.NewHistoryRetriever(missing.NewDefaultStorenodeRequestor(w.node.Store()), NewHistoryProcessorWrapper(w), w.logger)
|
||||||
|
|
||||||
w.StorenodeCycle.Start(w.ctx, w.node.Host())
|
w.StorenodeCycle.Start(w.ctx)
|
||||||
|
|
||||||
w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.node.Host().ID()))
|
w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.node.Host().ID()))
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue