diff --git a/timesource/timesource.go b/timesource/timesource.go index e6f5510c8..57cf9a5ba 100644 --- a/timesource/timesource.go +++ b/timesource/timesource.go @@ -239,7 +239,11 @@ func (s *NTPTimeSource) Stop() { } func (s *NTPTimeSource) GetCurrentTime() time.Time { - s.Start(context.Background()) + err := s.Start(context.Background()) + if err != nil { + panic("could not obtain timesource") + } + return s.Now() } @@ -249,7 +253,11 @@ func (s *NTPTimeSource) GetCurrentTimeInMillis() uint64 { func GetCurrentTime() time.Time { ts := Default() - ts.Start(context.Background()) + err := ts.Start(context.Background()) + if err != nil { + panic("could not obtain timesource") + } + return ts.Now() } diff --git a/timesource/timesource_test.go b/timesource/timesource_test.go index 1721eb1d0..35840586a 100644 --- a/timesource/timesource_test.go +++ b/timesource/timesource_test.go @@ -1,6 +1,7 @@ package timesource import ( + "context" "errors" "sync" "testing" @@ -214,7 +215,7 @@ func TestRunningPeriodically(t *testing.T) { // on NTPTimeSource specified periods (fastNTPSyncPeriod & slowNTPSyncPeriod) wg := sync.WaitGroup{} wg.Add(1) - source.runPeriodically(func() error { + source.runPeriodically(context.TODO(), func() error { mu.Lock() periods = append(periods, time.Since(lastCall)) mu.Unlock() diff --git a/wakuv2/api.go b/wakuv2/api.go index c129abdc0..49be14a32 100644 --- a/wakuv2/api.go +++ b/wakuv2/api.go @@ -18,7 +18,6 @@ package wakuv2 -/* TODO-nwaku import ( "context" "crypto/ecdsa" @@ -513,4 +512,4 @@ func (api *PublicWakuAPI) NewMessageFilter(req Criteria) (string, error) { api.mu.Unlock() return id, nil -} */ +} diff --git a/wakuv2/api_test.go b/wakuv2/api_test.go index 10d16a4c3..d3353ffa3 100644 --- a/wakuv2/api_test.go +++ b/wakuv2/api_test.go @@ -68,4 +68,5 @@ func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) { if !found { t.Fatalf("Could not find filter with both topics") } -} */ \ No newline at end of file +} +*/ diff --git a/wakuv2/history_processor_wrapper.go b/wakuv2/history_processor_wrapper.go index bdb72745c..56c3f8cf4 100644 --- a/wakuv2/history_processor_wrapper.go +++ b/wakuv2/history_processor_wrapper.go @@ -3,6 +3,8 @@ package wakuv2 import ( "github.com/libp2p/go-libp2p/core/peer" + "github.com/status-im/status-go/wakuv2/common" + "github.com/waku-org/go-waku/waku/v2/api/history" "github.com/waku-org/go-waku/waku/v2/protocol" ) @@ -16,9 +18,7 @@ func NewHistoryProcessorWrapper(waku *Waku) history.HistoryProcessor { } func (hr *HistoryProcessorWrapper) OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error { - // TODO-nwaku - // return hr.waku.OnNewEnvelopes(env, common.StoreMessageType, processEnvelopes) - return nil + return hr.waku.OnNewEnvelopes(env, common.StoreMessageType, processEnvelopes) } func (hr *HistoryProcessorWrapper) OnRequestFailed(requestID []byte, peerID peer.ID, err error) { diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 6deda9317..baa4e2e49 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -286,7 +286,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/jellydator/ttlcache/v3" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/peerstore" "github.com/multiformats/go-multiaddr" "go.uber.org/zap" @@ -300,6 +299,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/rpc" "github.com/libp2p/go-libp2p/core/metrics" @@ -342,7 +342,6 @@ const maxRelayPeers = 300 const randomPeersKeepAliveInterval = 5 * time.Second const allPeersKeepAliveInterval = 5 * time.Minute -/* TODO-nwaku type SentEnvelope struct { Envelope *protocol.Envelope PublishMethod publish.PublishMethod @@ -364,7 +363,6 @@ type ITelemetryClient interface { PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) } -*/ type WakuMessageHash = string type WakuPubsubTopic = string @@ -462,15 +460,14 @@ type Waku struct { onHistoricMessagesRequestFailed func([]byte, peer.ID, error) onPeerStats func(types.ConnStatus) - // statusTelemetryClient ITelemetryClient // TODO-nwaku + statusTelemetryClient ITelemetryClient defaultShardInfo protocol.RelayShards } -/* TODO-nwaku func (w *Waku) SetStatusTelemetryClient(client ITelemetryClient) { w.statusTelemetryClient = client -} */ +} func newTTLCache() *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] { cache := ttlcache.New[gethcommon.Hash, *common.ReceivedMessage](ttlcache.WithTTL[gethcommon.Hash, *common.ReceivedMessage](cacheTTL)) @@ -497,12 +494,12 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuCon defaultPubsubTopic, err := node.WakuDefaultPubsubTopic() if err != nil { - fmt.Println("Error happened:", err.Error()) + return nil, err } err = node.WakuRelaySubscribe(defaultPubsubTopic) if err != nil { - fmt.Println("Error happened:", err.Error()) + return nil, err } node.WakuSetEventCallback() @@ -1003,6 +1000,7 @@ func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.P return nil } +*/ // MaxMessageSize returns the maximum accepted message size. func (w *Waku) MaxMessageSize() uint32 { @@ -1024,7 +1022,7 @@ func (w *Waku) APIs() []rpc.API { Public: false, }, } -} */ +} // Protocols returns the waku sub-protocols ran by this particular client. func (w *Waku) Protocols() []p2p.Protocol { @@ -1277,7 +1275,6 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) { return nil, fmt.Errorf("non-existent key ID") } -/* TODO-nwaku // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. func (w *Waku) Subscribe(f *common.Filter) (string, error) { @@ -1332,13 +1329,12 @@ func (w *Waku) SkipPublishToTopic(value bool) { func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { w.messageSender.MessagesDelivered(hashes) -} */ +} -/* TODO-nwaku // OnNewEnvelope is an interface from Waku FilterManager API that gets invoked when any new message is received by Filter. func (w *Waku) OnNewEnvelope(env *protocol.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType, false) -} */ +} // Start implements node.Service, starting the background data propagation thread // of the Waku protocol. @@ -1614,20 +1610,18 @@ func (w *Waku) startMessageSender() error { Event: common.EventEnvelopeSent, }) - // TODO-nwaku - /*if w.statusTelemetryClient != nil { + if w.statusTelemetryClient != nil { w.statusTelemetryClient.PushMessageCheckSuccess(w.ctx, hash.Hex()) - }*/ + } case hash := <-msgExpiredChan: w.SendEnvelopeEvent(common.EnvelopeEvent{ Hash: hash, Event: common.EventEnvelopeExpired, }) - // TODO-nwaku - /* if w.statusTelemetryClient != nil { + if w.statusTelemetryClient != nil { w.statusTelemetryClient.PushMessageCheckFailure(w.ctx, hash.Hex()) - }*/ + } } } }() @@ -2203,12 +2197,6 @@ func (w *Waku) PeerID() peer.ID { return "" } -// TODO-nwaku -func (w *Waku) Peerstore() peerstore.Peerstore { - // return w.node.Host().Peerstore() - return nil -} - // validatePrivateKey checks the format of the given private key. func validatePrivateKey(k *ecdsa.PrivateKey) bool { if k == nil || k.D == nil || k.D.Sign() == 0 { @@ -2539,7 +2527,7 @@ func (self *Waku) WakuRelaySubscribe(pubsubTopic string) error { defer C.free(unsafe.Pointer(cPubsubTopic)) if self.wakuCtx == nil { - fmt.Println("ctx is nil") + return errors.New("wakuCtx is nil") } // if self.cPubsubTopic == nil { // fmt.Println("cPubsubTopic is nil") @@ -2688,8 +2676,6 @@ func (self *Waku) ListenAddresses() ([]multiaddr.Multiaddr, error) { addrsRet = append(addrsRet, addr) } - fmt.Println("AAAAAA listen addresses: ", listenAddresses) - return addrsRet, nil } errMsg := "error WakuListenAddresses: " + @@ -2728,7 +2714,6 @@ func (self *Waku) ListPeersInMesh(pubsubTopic string) (int, error) { numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numPeers, err := strconv.Atoi(numPeersStr) if err != nil { - fmt.Println(":", err) errMsg := "ListPeersInMesh - error converting string to int: " + err.Error() return 0, errors.New(errMsg) } @@ -2758,7 +2743,6 @@ func (self *Waku) GetNumConnectedPeers(paramPubsubTopic ...string) (int, error) numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numPeers, err := strconv.Atoi(numPeersStr) if err != nil { - fmt.Println(":", err) errMsg := "GetNumConnectedPeers - error converting string to int: " + err.Error() return 0, errors.New(errMsg) }