From a5e3516e0f4c48f669b5ff628e1b064c7014d020 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Wed, 23 Oct 2024 16:22:51 -0400 Subject: [PATCH] feat(nwaku)_: message publisher and sent verifier (#5966) --- timesource/timesource.go | 40 ++++--- timesource/timesource_test.go | 6 +- wakuv2/gowaku.go | 16 --- wakuv2/message_publishing.go | 4 +- wakuv2/nwaku.go | 214 ++++++++++++++++++++++++---------- wakuv2/nwaku_test.go | 7 +- 6 files changed, 188 insertions(+), 99 deletions(-) diff --git a/timesource/timesource.go b/timesource/timesource.go index e21e8ea68..78b4e91ab 100644 --- a/timesource/timesource.go +++ b/timesource/timesource.go @@ -2,6 +2,7 @@ package timesource import ( "bytes" + "context" "errors" "sort" "sync" @@ -144,8 +145,8 @@ type NTPTimeSource struct { timeQuery ntpQuery // for ease of testing now func() time.Time - quit chan struct{} started bool + cancel context.CancelFunc mu sync.RWMutex latestOffset time.Duration @@ -175,7 +176,7 @@ func (s *NTPTimeSource) updateOffset() error { // runPeriodically runs periodically the given function based on NTPTimeSource // synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod) -func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod bool) { +func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error, starWithSlowSyncPeriod bool) { if s.started { return } @@ -184,7 +185,7 @@ func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod if starWithSlowSyncPeriod { period = s.slowNTPSyncPeriod } - s.quit = make(chan struct{}) + go func() { defer common.LogOnPanic() for { @@ -196,7 +197,7 @@ func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod period = s.fastNTPSyncPeriod } - case <-s.quit: + case <-ctx.Done(): return } } @@ -204,11 +205,13 @@ func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod } // Start initializes the local offset and starts a goroutine that periodically updates the local offset. -func (s *NTPTimeSource) Start() { +func (s *NTPTimeSource) Start(ctx context.Context) error { if s.started { - return + return nil } + ctx, cancel := context.WithCancel(ctx) + // Attempt to update the offset synchronously so that user can have reliable messages right away err := s.updateOffset() if err != nil { @@ -217,23 +220,26 @@ func (s *NTPTimeSource) Start() { logutils.ZapLogger().Error("failed to update offset", zap.Error(err)) } - s.runPeriodically(s.updateOffset, err == nil) + s.runPeriodically(ctx, s.updateOffset, err == nil) s.started = true -} + s.cancel = cancel -// Stop goroutine that updates time source. -func (s *NTPTimeSource) Stop() error { - if s.quit == nil { - return nil - } - close(s.quit) - s.started = false return nil } +// Stop goroutine that updates time source. +func (s *NTPTimeSource) Stop() { + if s.cancel == nil { + return + } + + s.cancel() + s.started = false +} + func (s *NTPTimeSource) GetCurrentTime() time.Time { - s.Start() + s.Start(context.Background()) return s.Now() } @@ -243,7 +249,7 @@ func (s *NTPTimeSource) GetCurrentTimeInMillis() uint64 { func GetCurrentTime() time.Time { ts := Default() - ts.Start() + ts.Start(context.Background()) return ts.Now() } diff --git a/timesource/timesource_test.go b/timesource/timesource_test.go index 51a753657..1721eb1d0 100644 --- a/timesource/timesource_test.go +++ b/timesource/timesource_test.go @@ -277,14 +277,12 @@ func TestGetCurrentTimeInMillis(t *testing.T) { // test repeat invoke GetCurrentTimeInMillis n = ts.GetCurrentTimeInMillis() require.Equal(t, expectedTime, n) - e := ts.Stop() - require.NoError(t, e) + ts.Stop() // test invoke after stop n = ts.GetCurrentTimeInMillis() require.Equal(t, expectedTime, n) - e = ts.Stop() - require.NoError(t, e) + ts.Stop() } func TestGetCurrentTimeOffline(t *testing.T) { diff --git a/wakuv2/gowaku.go b/wakuv2/gowaku.go index 978c0964a..6d9012bd6 100644 --- a/wakuv2/gowaku.go +++ b/wakuv2/gowaku.go @@ -1994,22 +1994,6 @@ func (w *Waku) LegacyStoreNode() legacy_store.Store { return w.node.LegacyStore() } -func (w *Waku) WakuLightpushPublish(message *pb.WakuMessage, pubsubTopic string) (string, error) { - msgHash, err := w.node.Lightpush().Publish(w.ctx, message, lightpush.WithPubSubTopic(pubsubTopic)) - if err != nil { - return "", err - } - return msgHash.String(), nil -} - -func (w *Waku) WakuRelayPublish(message *pb.WakuMessage, pubsubTopic string) (string, error) { - msgHash, err := w.node.Relay().Publish(w.ctx, message, relay.WithPubSubTopic(pubsubTopic)) - if err != nil { - return "", err - } - return msgHash.String(), nil -} - func (w *Waku) ListPeersInMesh(pubsubTopic string) (int, error) { listPeers := w.node.Relay().PubSub().ListPeers(pubsubTopic) return len(listPeers), nil diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index 90dce0cae..3be0ea96d 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -1,6 +1,5 @@ package wakuv2 -/* TODO-nwaku import ( "errors" @@ -93,6 +92,7 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope) { err = w.messageSender.Send(publish.NewRequest(w.ctx, envelope)) } + /* TODO-nwaku if w.statusTelemetryClient != nil { if err == nil { w.statusTelemetryClient.PushSentEnvelope(w.ctx, SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}) @@ -100,6 +100,7 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope) { w.statusTelemetryClient.PushErrorSendingEnvelope(w.ctx, ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}}) } } + */ if err != nil { logger.Error("could not send message", zap.Error(err)) @@ -117,4 +118,3 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope) { }) } } -*/ diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index f121343e8..796630a3b 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -270,16 +270,20 @@ import ( "crypto/ecdsa" "crypto/sha256" "database/sql" + "encoding/hex" "encoding/json" "errors" "fmt" + "net/http" "runtime" "strconv" "strings" "sync" + "testing" "time" "unsafe" + "github.com/golang/protobuf/proto" "github.com/jellydator/ttlcache/v3" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" @@ -288,6 +292,7 @@ import ( "go.uber.org/zap" "golang.org/x/crypto/pbkdf2" + "golang.org/x/time/rate" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -307,9 +312,11 @@ import ( "github.com/waku-org/go-waku/waku/v2/peermanager" wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" + storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/utils" gocommon "github.com/status-im/status-go/common" @@ -412,7 +419,9 @@ type Waku struct { cancel context.CancelFunc wg sync.WaitGroup - cfg *WakuConfig + cfg *Config + wakuCfg *WakuConfig + options []node.WakuNodeOption envelopeFeed event.Feed @@ -467,7 +476,7 @@ func newTTLCache() *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] { } // New creates a WakuV2 client ready to communicate through the LibP2P network. -func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *WakuConfig, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { +func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuConfig, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { // Lock the main goroutine to its current OS thread runtime.LockOSThread() @@ -475,7 +484,9 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *WakuConfig, logger *zap.L node, err := wakuNew(nodeKey, fleet, - cfg, logger, appDB, ts, onHistoricMessagesRequestFailed, + cfg, + nwakuCfg, + logger, appDB, ts, onHistoricMessagesRequestFailed, onPeerStats) if err != nil { return nil, err @@ -918,6 +929,7 @@ func (w *Waku) runPeerExchangeLoop() { } } } +*/ func (w *Waku) GetPubsubTopic(topic string) string { if topic == "" { @@ -927,6 +939,7 @@ func (w *Waku) GetPubsubTopic(topic string) string { return topic } +/* TODO-nwaku func (w *Waku) unsubscribeFromPubsubTopicWithWakuRelay(topic string) error { topic = w.GetPubsubTopic(topic) @@ -1329,8 +1342,7 @@ func (w *Waku) OnNewEnvelope(env *protocol.Envelope) error { func (w *Waku) Start() error { err := w.WakuStart() if err != nil { - fmt.Println("Error happened:", err.Error()) - return err + return fmt.Errorf("failed to start nwaku node: %v", err) } /* TODO-nwaku @@ -1468,19 +1480,16 @@ func (w *Waku) Start() error { } */ - // w.wg.Add(1) + w.wg.Add(1) - // TODO-nwaku - // go w.broadcast() + go w.broadcast() - // go w.sendQueue.Start(w.ctx) + go w.sendQueue.Start(w.ctx) - /* TODO-nwaku err = w.startMessageSender() if err != nil { return err } - */ /* TODO-nwaku // we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()` @@ -1568,14 +1577,14 @@ func (w *Waku) reportPeerMetrics() { } */ -/* TODO-nwaku func (w *Waku) startMessageSender() error { publishMethod := publish.Relay + /* TODO-nwaku if w.cfg.LightClient { publishMethod = publish.LightPush - } + }*/ - sender, err := publish.NewMessageSender(publishMethod, publish.NewDefaultPublisher(w.node.Lightpush(), w.node.Relay()), w.logger) + sender, err := publish.NewMessageSender(publishMethod, newPublisher(w.wakuCtx), w.logger) if err != nil { w.logger.Error("failed to create message sender", zap.Error(err)) return err @@ -1584,7 +1593,7 @@ func (w *Waku) startMessageSender() error { if w.cfg.EnableStoreConfirmationForMessagesSent { msgStoredChan := make(chan gethcommon.Hash, 1000) msgExpiredChan := make(chan gethcommon.Hash, 1000) - messageSentCheck := publish.NewMessageSentCheck(w.ctx, publish.NewDefaultStorenodeMessageVerifier(w.node.Store()), w.StorenodeCycle, w.node.Timesource(), msgStoredChan, msgExpiredChan, w.logger) + messageSentCheck := publish.NewMessageSentCheck(w.ctx, newStorenodeMessageVerifier(w.wakuCtx), w.StorenodeCycle, w.timesource, msgStoredChan, msgExpiredChan, w.logger) sender.WithMessageSentCheck(messageSentCheck) w.wg.Add(1) @@ -1600,17 +1609,21 @@ func (w *Waku) startMessageSender() error { Hash: hash, Event: common.EventEnvelopeSent, }) - if w.statusTelemetryClient != nil { + + // TODO-nwaku + /*if w.statusTelemetryClient != nil { w.statusTelemetryClient.PushMessageCheckSuccess(w.ctx, hash.Hex()) - } + }*/ case hash := <-msgExpiredChan: w.SendEnvelopeEvent(common.EnvelopeEvent{ Hash: hash, Event: common.EventEnvelopeExpired, }) - if w.statusTelemetryClient != nil { + + // TODO-nwaku + /* if w.statusTelemetryClient != nil { w.statusTelemetryClient.PushMessageCheckFailure(w.ctx, hash.Hex()) - } + }*/ } } }() @@ -1628,7 +1641,6 @@ func (w *Waku) startMessageSender() error { return nil } -*/ func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { w.poolMu.Lock() @@ -1790,7 +1802,6 @@ func (w *Waku) postEvent(envelope *common.ReceivedMessage) { w.msgQueue <- envelope } -/* TODO-nwaku // processQueueLoop delivers the messages to the watchers during the lifetime of the waku node. func (w *Waku) processQueueLoop() { defer gocommon.LogOnPanic() @@ -1806,7 +1817,7 @@ func (w *Waku) processQueueLoop() { w.processMessage(e) } } -}*/ +} func (w *Waku) processMessage(e *common.ReceivedMessage) { logger := w.logger.With( @@ -2281,7 +2292,8 @@ func printStackTrace() { func wakuNew(nodeKey *ecdsa.PrivateKey, fleet string, - cfg *WakuConfig, + cfg *Config, // TODO: merge Config and WakuConfig + nwakuCfg *WakuConfig, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, @@ -2298,15 +2310,17 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, ts = timesource.Default() } - /* TODO-nwaku + nwakuCfg.NodeKey = hex.EncodeToString(crypto.FromECDSA(nodeKey)) + + // TODO-nwaku + // TODO: merge Config and WakuConfig cfg = setDefaults(cfg) if err = cfg.Validate(logger); err != nil { return nil, err - } */ + } + logger.Info("starting wakuv2 with config", zap.Any("nwakuCfg", nwakuCfg), zap.Any("wakuCfg", cfg)) - ctx, cancel := context.WithCancel(context.Background()) - - jsonConfig, err := json.Marshal(cfg) + jsonConfig, err := json.Marshal(nwakuCfg) if err != nil { return nil, err } @@ -2321,9 +2335,10 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, // Notice that the events for self node are handled by the 'MyEventCallback' method if C.getRet(resp) == C.RET_OK { - + ctx, cancel := context.WithCancel(context.Background()) return &Waku{ wakuCtx: wakuCtx, + wakuCfg: nwakuCfg, cfg: cfg, privateKeys: make(map[string]*ecdsa.PrivateKey), symKeys: make(map[string][]byte), @@ -2337,15 +2352,16 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, wg: sync.WaitGroup{}, dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode), dnsAddressCacheLock: &sync.RWMutex{}, + dnsDiscAsyncRetrievedSignal: make(chan struct{}), storeMsgIDs: make(map[gethcommon.Hash]bool), timesource: ts, storeMsgIDsMu: sync.RWMutex{}, logger: logger, - discV5BootstrapNodes: cfg.Discv5BootstrapNodes, + discV5BootstrapNodes: nwakuCfg.Discv5BootstrapNodes, onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, onPeerStats: onPeerStats, onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), - //sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), // TODO-nwaku + sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), }, nil } @@ -2513,32 +2529,6 @@ func (self *Waku) WakuDefaultPubsubTopic() (WakuPubsubTopic, error) { return "", errors.New(errMsg) } -func (self *Waku) WakuRelayPublish(wakuMsg *pb.WakuMessage, pubsubTopic string) (string, error) { - timeoutMs := 1000 - - message, err := json.Marshal(wakuMsg) - if err != nil { - return "", err - } - - var cPubsubTopic = C.CString(pubsubTopic) - var msg = C.CString(string(message)) - var resp = C.allocResp() - - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPubsubTopic)) - defer C.free(unsafe.Pointer(msg)) - - C.cGoWakuRelayPublish(self.wakuCtx, cPubsubTopic, msg, C.int(timeoutMs), resp) - if C.getRet(resp) == C.RET_OK { - msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return msgHash, nil - } - errMsg := "error WakuRelayPublish: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return "", errors.New(errMsg) -} - func (self *Waku) WakuRelaySubscribe(pubsubTopic string) error { var resp = C.allocResp() var cPubsubTopic = C.CString(pubsubTopic) @@ -2605,7 +2595,8 @@ func (self *Waku) WakuLightpushPublish(message *pb.WakuMessage, pubsubTopic stri return "", errors.New(errMsg) } -func (self *Waku) wakuStoreQuery( +func wakuStoreQuery( + wakuCtx unsafe.Pointer, jsonQuery string, peerAddr string, timeoutMs int) (string, error) { @@ -2618,7 +2609,7 @@ func (self *Waku) wakuStoreQuery( defer C.free(unsafe.Pointer(cPeerAddr)) defer C.freeResp(resp) - C.cGoWakuStoreQuery(self.wakuCtx, cJsonQuery, cPeerAddr, C.int(timeoutMs), resp) + C.cGoWakuStoreQuery(wakuCtx, cJsonQuery, cPeerAddr, C.int(timeoutMs), resp) if C.getRet(resp) == C.RET_OK { msg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) return msg, nil @@ -2923,3 +2914,108 @@ func (self *Waku) DisconnectPeerById(peerId peer.ID) error { func (w *Waku) MaxMessageSize() uint32 { return w.cfg.MaxMessageSize } */ + +func newPublisher(wakuCtx unsafe.Pointer) publish.Publisher { + return &nwakuPublisher{ + wakuCtx: wakuCtx, + } +} + +type nwakuPublisher struct { + wakuCtx unsafe.Pointer +} + +func (p *nwakuPublisher) RelayListPeers(pubsubTopic string) ([]peer.ID, error) { + // TODO-nwaku + return nil, nil +} + +func (p *nwakuPublisher) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { + timeoutMs := 1000 + + jsonMsg, err := json.Marshal(message) + if err != nil { + return pb.MessageHash{}, err + } + + var cPubsubTopic = C.CString(pubsubTopic) + var msg = C.CString(string(jsonMsg)) + var resp = C.allocResp() + + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPubsubTopic)) + defer C.free(unsafe.Pointer(msg)) + + C.cGoWakuRelayPublish(p.wakuCtx, cPubsubTopic, msg, C.int(timeoutMs), resp) + if C.getRet(resp) == C.RET_OK { + msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + msgHashBytes, err := hexutil.Decode(msgHash) + if err != nil { + return pb.MessageHash{}, err + } + return pb.ToMessageHash(msgHashBytes), nil + } + errMsg := "error WakuRelayPublish: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return pb.MessageHash{}, errors.New(errMsg) +} + +// LightpushPublish publishes a message via WakuLightPush +func (p *nwakuPublisher) LightpushPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string, maxPeers int) (pb.MessageHash, error) { + // TODO-nwaku + return pb.MessageHash{}, errors.New("not implemented yet") +} + +func newStorenodeMessageVerifier(wakuCtx unsafe.Pointer) publish.StorenodeMessageVerifier { + return &defaultStorenodeMessageVerifier{ + wakuCtx: wakuCtx, + } +} + +type defaultStorenodeMessageVerifier struct { + wakuCtx unsafe.Pointer +} + +func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) { + requestIDStr := hexutil.Encode(requestID) + storeRequest := &storepb.StoreQueryRequest{ + RequestId: requestIDStr, + MessageHashes: make([][]byte, len(messageHashes)), + IncludeData: false, + PaginationCursor: nil, + PaginationForward: false, + PaginationLimit: proto.Uint64(pageSize), + } + + for i, mhash := range messageHashes { + storeRequest.MessageHashes[i] = mhash.Bytes() + } + + jsonQuery, err := json.Marshal(storeRequest) + if err != nil { + return nil, err + } + + // TODO: timeouts need to be managed differently. For now we're using a 1m timeout + jsonResponse, err := wakuStoreQuery(d.wakuCtx, string(jsonQuery), peerID.String(), int(time.Minute.Milliseconds())) + if err != nil { + return nil, err + } + + response := &storepb.StoreQueryResponse{} + err = json.Unmarshal([]byte(jsonResponse), response) + if err != nil { + return nil, err + } + + if response.GetStatusCode() != http.StatusOK { + return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, response.GetStatusCode(), response.GetStatusDesc()) + } + + result := make([]pb.MessageHash, len(response.Messages)) + for i, msg := range response.Messages { + result[i] = pb.ToMessageHash(msg.GetMessageHash()) + } + + return result, nil +} diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index a8a5366d3..fdf6f5458 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -163,6 +163,11 @@ func TestBasicWakuV2(t *testing.T) { storeNodeInfo, err := GetNwakuInfo(nil, &extNodeRestPort) require.NoError(t, err) + wakuConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + nwakuConfig := WakuConfig{ Port: 30303, NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", @@ -176,7 +181,7 @@ func TestBasicWakuV2(t *testing.T) { Shards: []uint16{64}, } - w, err := New(nil, "", &nwakuConfig, nil, nil, nil, nil, nil) + w, err := New(nil, "", &wakuConfig, &nwakuConfig, nil, nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, w.Start())