feat(nwaku)_: message publisher and sent verifier (#5966)

This commit is contained in:
richΛrd 2024-10-23 16:22:51 -04:00 committed by Richard Ramos
parent 3f8370ff45
commit a5e3516e0f
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
6 changed files with 188 additions and 99 deletions

View File

@ -2,6 +2,7 @@ package timesource
import (
"bytes"
"context"
"errors"
"sort"
"sync"
@ -144,8 +145,8 @@ type NTPTimeSource struct {
timeQuery ntpQuery // for ease of testing
now func() time.Time
quit chan struct{}
started bool
cancel context.CancelFunc
mu sync.RWMutex
latestOffset time.Duration
@ -175,7 +176,7 @@ func (s *NTPTimeSource) updateOffset() error {
// runPeriodically runs periodically the given function based on NTPTimeSource
// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod)
func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod bool) {
func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error, starWithSlowSyncPeriod bool) {
if s.started {
return
}
@ -184,7 +185,7 @@ func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod
if starWithSlowSyncPeriod {
period = s.slowNTPSyncPeriod
}
s.quit = make(chan struct{})
go func() {
defer common.LogOnPanic()
for {
@ -196,7 +197,7 @@ func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod
period = s.fastNTPSyncPeriod
}
case <-s.quit:
case <-ctx.Done():
return
}
}
@ -204,11 +205,13 @@ func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod
}
// Start initializes the local offset and starts a goroutine that periodically updates the local offset.
func (s *NTPTimeSource) Start() {
func (s *NTPTimeSource) Start(ctx context.Context) error {
if s.started {
return
return nil
}
ctx, cancel := context.WithCancel(ctx)
// Attempt to update the offset synchronously so that user can have reliable messages right away
err := s.updateOffset()
if err != nil {
@ -217,23 +220,26 @@ func (s *NTPTimeSource) Start() {
logutils.ZapLogger().Error("failed to update offset", zap.Error(err))
}
s.runPeriodically(s.updateOffset, err == nil)
s.runPeriodically(ctx, s.updateOffset, err == nil)
s.started = true
}
s.cancel = cancel
// Stop goroutine that updates time source.
func (s *NTPTimeSource) Stop() error {
if s.quit == nil {
return nil
}
close(s.quit)
s.started = false
return nil
}
// Stop goroutine that updates time source.
func (s *NTPTimeSource) Stop() {
if s.cancel == nil {
return
}
s.cancel()
s.started = false
}
func (s *NTPTimeSource) GetCurrentTime() time.Time {
s.Start()
s.Start(context.Background())
return s.Now()
}
@ -243,7 +249,7 @@ func (s *NTPTimeSource) GetCurrentTimeInMillis() uint64 {
func GetCurrentTime() time.Time {
ts := Default()
ts.Start()
ts.Start(context.Background())
return ts.Now()
}

View File

@ -277,14 +277,12 @@ func TestGetCurrentTimeInMillis(t *testing.T) {
// test repeat invoke GetCurrentTimeInMillis
n = ts.GetCurrentTimeInMillis()
require.Equal(t, expectedTime, n)
e := ts.Stop()
require.NoError(t, e)
ts.Stop()
// test invoke after stop
n = ts.GetCurrentTimeInMillis()
require.Equal(t, expectedTime, n)
e = ts.Stop()
require.NoError(t, e)
ts.Stop()
}
func TestGetCurrentTimeOffline(t *testing.T) {

View File

@ -1994,22 +1994,6 @@ func (w *Waku) LegacyStoreNode() legacy_store.Store {
return w.node.LegacyStore()
}
func (w *Waku) WakuLightpushPublish(message *pb.WakuMessage, pubsubTopic string) (string, error) {
msgHash, err := w.node.Lightpush().Publish(w.ctx, message, lightpush.WithPubSubTopic(pubsubTopic))
if err != nil {
return "", err
}
return msgHash.String(), nil
}
func (w *Waku) WakuRelayPublish(message *pb.WakuMessage, pubsubTopic string) (string, error) {
msgHash, err := w.node.Relay().Publish(w.ctx, message, relay.WithPubSubTopic(pubsubTopic))
if err != nil {
return "", err
}
return msgHash.String(), nil
}
func (w *Waku) ListPeersInMesh(pubsubTopic string) (int, error) {
listPeers := w.node.Relay().PubSub().ListPeers(pubsubTopic)
return len(listPeers), nil

View File

@ -1,6 +1,5 @@
package wakuv2
/* TODO-nwaku
import (
"errors"
@ -93,6 +92,7 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope) {
err = w.messageSender.Send(publish.NewRequest(w.ctx, envelope))
}
/* TODO-nwaku
if w.statusTelemetryClient != nil {
if err == nil {
w.statusTelemetryClient.PushSentEnvelope(w.ctx, SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()})
@ -100,6 +100,7 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope) {
w.statusTelemetryClient.PushErrorSendingEnvelope(w.ctx, ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}})
}
}
*/
if err != nil {
logger.Error("could not send message", zap.Error(err))
@ -117,4 +118,3 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope) {
})
}
}
*/

View File

@ -270,16 +270,20 @@ import (
"crypto/ecdsa"
"crypto/sha256"
"database/sql"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net/http"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
"unsafe"
"github.com/golang/protobuf/proto"
"github.com/jellydator/ttlcache/v3"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
@ -288,6 +292,7 @@ import (
"go.uber.org/zap"
"golang.org/x/crypto/pbkdf2"
"golang.org/x/time/rate"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -307,9 +312,11 @@ import (
"github.com/waku-org/go-waku/waku/v2/peermanager"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
gocommon "github.com/status-im/status-go/common"
@ -412,7 +419,9 @@ type Waku struct {
cancel context.CancelFunc
wg sync.WaitGroup
cfg *WakuConfig
cfg *Config
wakuCfg *WakuConfig
options []node.WakuNodeOption
envelopeFeed event.Feed
@ -467,7 +476,7 @@ func newTTLCache() *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] {
}
// New creates a WakuV2 client ready to communicate through the LibP2P network.
func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *WakuConfig, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuConfig, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
// Lock the main goroutine to its current OS thread
runtime.LockOSThread()
@ -475,7 +484,9 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *WakuConfig, logger *zap.L
node, err := wakuNew(nodeKey,
fleet,
cfg, logger, appDB, ts, onHistoricMessagesRequestFailed,
cfg,
nwakuCfg,
logger, appDB, ts, onHistoricMessagesRequestFailed,
onPeerStats)
if err != nil {
return nil, err
@ -918,6 +929,7 @@ func (w *Waku) runPeerExchangeLoop() {
}
}
}
*/
func (w *Waku) GetPubsubTopic(topic string) string {
if topic == "" {
@ -927,6 +939,7 @@ func (w *Waku) GetPubsubTopic(topic string) string {
return topic
}
/* TODO-nwaku
func (w *Waku) unsubscribeFromPubsubTopicWithWakuRelay(topic string) error {
topic = w.GetPubsubTopic(topic)
@ -1329,8 +1342,7 @@ func (w *Waku) OnNewEnvelope(env *protocol.Envelope) error {
func (w *Waku) Start() error {
err := w.WakuStart()
if err != nil {
fmt.Println("Error happened:", err.Error())
return err
return fmt.Errorf("failed to start nwaku node: %v", err)
}
/* TODO-nwaku
@ -1468,19 +1480,16 @@ func (w *Waku) Start() error {
}
*/
// w.wg.Add(1)
w.wg.Add(1)
// TODO-nwaku
// go w.broadcast()
go w.broadcast()
// go w.sendQueue.Start(w.ctx)
go w.sendQueue.Start(w.ctx)
/* TODO-nwaku
err = w.startMessageSender()
if err != nil {
return err
}
*/
/* TODO-nwaku
// we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()`
@ -1568,14 +1577,14 @@ func (w *Waku) reportPeerMetrics() {
}
*/
/* TODO-nwaku
func (w *Waku) startMessageSender() error {
publishMethod := publish.Relay
/* TODO-nwaku
if w.cfg.LightClient {
publishMethod = publish.LightPush
}
}*/
sender, err := publish.NewMessageSender(publishMethod, publish.NewDefaultPublisher(w.node.Lightpush(), w.node.Relay()), w.logger)
sender, err := publish.NewMessageSender(publishMethod, newPublisher(w.wakuCtx), w.logger)
if err != nil {
w.logger.Error("failed to create message sender", zap.Error(err))
return err
@ -1584,7 +1593,7 @@ func (w *Waku) startMessageSender() error {
if w.cfg.EnableStoreConfirmationForMessagesSent {
msgStoredChan := make(chan gethcommon.Hash, 1000)
msgExpiredChan := make(chan gethcommon.Hash, 1000)
messageSentCheck := publish.NewMessageSentCheck(w.ctx, publish.NewDefaultStorenodeMessageVerifier(w.node.Store()), w.StorenodeCycle, w.node.Timesource(), msgStoredChan, msgExpiredChan, w.logger)
messageSentCheck := publish.NewMessageSentCheck(w.ctx, newStorenodeMessageVerifier(w.wakuCtx), w.StorenodeCycle, w.timesource, msgStoredChan, msgExpiredChan, w.logger)
sender.WithMessageSentCheck(messageSentCheck)
w.wg.Add(1)
@ -1600,17 +1609,21 @@ func (w *Waku) startMessageSender() error {
Hash: hash,
Event: common.EventEnvelopeSent,
})
if w.statusTelemetryClient != nil {
// TODO-nwaku
/*if w.statusTelemetryClient != nil {
w.statusTelemetryClient.PushMessageCheckSuccess(w.ctx, hash.Hex())
}
}*/
case hash := <-msgExpiredChan:
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: hash,
Event: common.EventEnvelopeExpired,
})
if w.statusTelemetryClient != nil {
// TODO-nwaku
/* if w.statusTelemetryClient != nil {
w.statusTelemetryClient.PushMessageCheckFailure(w.ctx, hash.Hex())
}
}*/
}
}
}()
@ -1628,7 +1641,6 @@ func (w *Waku) startMessageSender() error {
return nil
}
*/
func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) {
w.poolMu.Lock()
@ -1790,7 +1802,6 @@ func (w *Waku) postEvent(envelope *common.ReceivedMessage) {
w.msgQueue <- envelope
}
/* TODO-nwaku
// processQueueLoop delivers the messages to the watchers during the lifetime of the waku node.
func (w *Waku) processQueueLoop() {
defer gocommon.LogOnPanic()
@ -1806,7 +1817,7 @@ func (w *Waku) processQueueLoop() {
w.processMessage(e)
}
}
}*/
}
func (w *Waku) processMessage(e *common.ReceivedMessage) {
logger := w.logger.With(
@ -2281,7 +2292,8 @@ func printStackTrace() {
func wakuNew(nodeKey *ecdsa.PrivateKey,
fleet string,
cfg *WakuConfig,
cfg *Config, // TODO: merge Config and WakuConfig
nwakuCfg *WakuConfig,
logger *zap.Logger,
appDB *sql.DB,
ts *timesource.NTPTimeSource,
@ -2298,15 +2310,17 @@ func wakuNew(nodeKey *ecdsa.PrivateKey,
ts = timesource.Default()
}
/* TODO-nwaku
nwakuCfg.NodeKey = hex.EncodeToString(crypto.FromECDSA(nodeKey))
// TODO-nwaku
// TODO: merge Config and WakuConfig
cfg = setDefaults(cfg)
if err = cfg.Validate(logger); err != nil {
return nil, err
} */
}
logger.Info("starting wakuv2 with config", zap.Any("nwakuCfg", nwakuCfg), zap.Any("wakuCfg", cfg))
ctx, cancel := context.WithCancel(context.Background())
jsonConfig, err := json.Marshal(cfg)
jsonConfig, err := json.Marshal(nwakuCfg)
if err != nil {
return nil, err
}
@ -2321,9 +2335,10 @@ func wakuNew(nodeKey *ecdsa.PrivateKey,
// Notice that the events for self node are handled by the 'MyEventCallback' method
if C.getRet(resp) == C.RET_OK {
ctx, cancel := context.WithCancel(context.Background())
return &Waku{
wakuCtx: wakuCtx,
wakuCfg: nwakuCfg,
cfg: cfg,
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
@ -2337,15 +2352,16 @@ func wakuNew(nodeKey *ecdsa.PrivateKey,
wg: sync.WaitGroup{},
dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode),
dnsAddressCacheLock: &sync.RWMutex{},
dnsDiscAsyncRetrievedSignal: make(chan struct{}),
storeMsgIDs: make(map[gethcommon.Hash]bool),
timesource: ts,
storeMsgIDsMu: sync.RWMutex{},
logger: logger,
discV5BootstrapNodes: cfg.Discv5BootstrapNodes,
discV5BootstrapNodes: nwakuCfg.Discv5BootstrapNodes,
onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed,
onPeerStats: onPeerStats,
onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker),
//sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), // TODO-nwaku
sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish),
}, nil
}
@ -2513,32 +2529,6 @@ func (self *Waku) WakuDefaultPubsubTopic() (WakuPubsubTopic, error) {
return "", errors.New(errMsg)
}
func (self *Waku) WakuRelayPublish(wakuMsg *pb.WakuMessage, pubsubTopic string) (string, error) {
timeoutMs := 1000
message, err := json.Marshal(wakuMsg)
if err != nil {
return "", err
}
var cPubsubTopic = C.CString(pubsubTopic)
var msg = C.CString(string(message))
var resp = C.allocResp()
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPubsubTopic))
defer C.free(unsafe.Pointer(msg))
C.cGoWakuRelayPublish(self.wakuCtx, cPubsubTopic, msg, C.int(timeoutMs), resp)
if C.getRet(resp) == C.RET_OK {
msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return msgHash, nil
}
errMsg := "error WakuRelayPublish: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return "", errors.New(errMsg)
}
func (self *Waku) WakuRelaySubscribe(pubsubTopic string) error {
var resp = C.allocResp()
var cPubsubTopic = C.CString(pubsubTopic)
@ -2605,7 +2595,8 @@ func (self *Waku) WakuLightpushPublish(message *pb.WakuMessage, pubsubTopic stri
return "", errors.New(errMsg)
}
func (self *Waku) wakuStoreQuery(
func wakuStoreQuery(
wakuCtx unsafe.Pointer,
jsonQuery string,
peerAddr string,
timeoutMs int) (string, error) {
@ -2618,7 +2609,7 @@ func (self *Waku) wakuStoreQuery(
defer C.free(unsafe.Pointer(cPeerAddr))
defer C.freeResp(resp)
C.cGoWakuStoreQuery(self.wakuCtx, cJsonQuery, cPeerAddr, C.int(timeoutMs), resp)
C.cGoWakuStoreQuery(wakuCtx, cJsonQuery, cPeerAddr, C.int(timeoutMs), resp)
if C.getRet(resp) == C.RET_OK {
msg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return msg, nil
@ -2923,3 +2914,108 @@ func (self *Waku) DisconnectPeerById(peerId peer.ID) error {
func (w *Waku) MaxMessageSize() uint32 {
return w.cfg.MaxMessageSize
} */
func newPublisher(wakuCtx unsafe.Pointer) publish.Publisher {
return &nwakuPublisher{
wakuCtx: wakuCtx,
}
}
type nwakuPublisher struct {
wakuCtx unsafe.Pointer
}
func (p *nwakuPublisher) RelayListPeers(pubsubTopic string) ([]peer.ID, error) {
// TODO-nwaku
return nil, nil
}
func (p *nwakuPublisher) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) {
timeoutMs := 1000
jsonMsg, err := json.Marshal(message)
if err != nil {
return pb.MessageHash{}, err
}
var cPubsubTopic = C.CString(pubsubTopic)
var msg = C.CString(string(jsonMsg))
var resp = C.allocResp()
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPubsubTopic))
defer C.free(unsafe.Pointer(msg))
C.cGoWakuRelayPublish(p.wakuCtx, cPubsubTopic, msg, C.int(timeoutMs), resp)
if C.getRet(resp) == C.RET_OK {
msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
msgHashBytes, err := hexutil.Decode(msgHash)
if err != nil {
return pb.MessageHash{}, err
}
return pb.ToMessageHash(msgHashBytes), nil
}
errMsg := "error WakuRelayPublish: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return pb.MessageHash{}, errors.New(errMsg)
}
// LightpushPublish publishes a message via WakuLightPush
func (p *nwakuPublisher) LightpushPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string, maxPeers int) (pb.MessageHash, error) {
// TODO-nwaku
return pb.MessageHash{}, errors.New("not implemented yet")
}
func newStorenodeMessageVerifier(wakuCtx unsafe.Pointer) publish.StorenodeMessageVerifier {
return &defaultStorenodeMessageVerifier{
wakuCtx: wakuCtx,
}
}
type defaultStorenodeMessageVerifier struct {
wakuCtx unsafe.Pointer
}
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
requestIDStr := hexutil.Encode(requestID)
storeRequest := &storepb.StoreQueryRequest{
RequestId: requestIDStr,
MessageHashes: make([][]byte, len(messageHashes)),
IncludeData: false,
PaginationCursor: nil,
PaginationForward: false,
PaginationLimit: proto.Uint64(pageSize),
}
for i, mhash := range messageHashes {
storeRequest.MessageHashes[i] = mhash.Bytes()
}
jsonQuery, err := json.Marshal(storeRequest)
if err != nil {
return nil, err
}
// TODO: timeouts need to be managed differently. For now we're using a 1m timeout
jsonResponse, err := wakuStoreQuery(d.wakuCtx, string(jsonQuery), peerID.String(), int(time.Minute.Milliseconds()))
if err != nil {
return nil, err
}
response := &storepb.StoreQueryResponse{}
err = json.Unmarshal([]byte(jsonResponse), response)
if err != nil {
return nil, err
}
if response.GetStatusCode() != http.StatusOK {
return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, response.GetStatusCode(), response.GetStatusDesc())
}
result := make([]pb.MessageHash, len(response.Messages))
for i, msg := range response.Messages {
result[i] = pb.ToMessageHash(msg.GetMessageHash())
}
return result, nil
}

View File

@ -163,6 +163,11 @@ func TestBasicWakuV2(t *testing.T) {
storeNodeInfo, err := GetNwakuInfo(nil, &extNodeRestPort)
require.NoError(t, err)
wakuConfig := Config{
UseThrottledPublish: true,
ClusterID: 16,
}
nwakuConfig := WakuConfig{
Port: 30303,
NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710",
@ -176,7 +181,7 @@ func TestBasicWakuV2(t *testing.T) {
Shards: []uint16{64},
}
w, err := New(nil, "", &nwakuConfig, nil, nil, nil, nil, nil)
w, err := New(nil, "", &wakuConfig, &nwakuConfig, nil, nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, w.Start())