diff --git a/third_party/nwaku b/third_party/nwaku index 47a623541..2022f54f5 160000 --- a/third_party/nwaku +++ b/third_party/nwaku @@ -1 +1 @@ -Subproject commit 47a6235414c2910ad9f540882bc5193ece84c552 +Subproject commit 2022f54f5c371b4f389c54451dcda77386959e17 diff --git a/wakuv2/common/envelope.go b/wakuv2/common/envelope.go new file mode 100644 index 000000000..dbf80cb24 --- /dev/null +++ b/wakuv2/common/envelope.go @@ -0,0 +1,79 @@ +package common + +import ( + "encoding/json" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" +) + +// Envelope contains information about the pubsub topic of a WakuMessage +// and a hash used to identify a message based on the bytes of a WakuMessage +// protobuffer +type Envelope interface { + Message() *pb.WakuMessage + PubsubTopic() string + Hash() pb.MessageHash +} + +type envelopeImpl struct { + msg *pb.WakuMessage + topic string + hash pb.MessageHash +} + +type tmpWakuMessageJson struct { + Payload []byte `json:"payload,omitempty"` + ContentTopic string `json:"contentTopic,omitempty"` + Version *uint32 `json:"version,omitempty"` + Timestamp *int64 `json:"timestamp,omitempty"` + Meta []byte `json:"meta,omitempty"` + Ephemeral *bool `json:"ephemeral,omitempty"` + RateLimitProof []byte `json:"proof,omitempty"` +} + +type tmpEnvelopeStruct struct { + WakuMessage tmpWakuMessageJson `json:"wakuMessage"` + PubsubTopic string `json:"pubsubTopic"` + MessageHash string `json:"messageHash"` +} + +// NewEnvelope creates a new Envelope from a json string generated in nwaku +func NewEnvelope(jsonEventStr string) (Envelope, error) { + tmpEnvelopeStruct := tmpEnvelopeStruct{} + err := json.Unmarshal([]byte(jsonEventStr), &tmpEnvelopeStruct) + if err != nil { + return nil, err + } + + hash, err := hexutil.Decode(tmpEnvelopeStruct.MessageHash) + if err != nil { + return nil, err + } + + return &envelopeImpl{ + msg: &pb.WakuMessage{ + Payload: tmpEnvelopeStruct.WakuMessage.Payload, + ContentTopic: tmpEnvelopeStruct.WakuMessage.ContentTopic, + Version: tmpEnvelopeStruct.WakuMessage.Version, + Timestamp: tmpEnvelopeStruct.WakuMessage.Timestamp, + Meta: tmpEnvelopeStruct.WakuMessage.Meta, + Ephemeral: tmpEnvelopeStruct.WakuMessage.Ephemeral, + RateLimitProof: tmpEnvelopeStruct.WakuMessage.RateLimitProof, + }, + topic: tmpEnvelopeStruct.PubsubTopic, + hash: pb.ToMessageHash(hash), + }, nil +} + +func (e *envelopeImpl) Message() *pb.WakuMessage { + return e.msg +} + +func (e *envelopeImpl) PubsubTopic() string { + return e.topic +} + +func (e *envelopeImpl) Hash() pb.MessageHash { + return e.hash +} diff --git a/wakuv2/common/message.go b/wakuv2/common/message.go index 58521ae75..977bea6d8 100644 --- a/wakuv2/common/message.go +++ b/wakuv2/common/message.go @@ -9,7 +9,6 @@ import ( "go.uber.org/zap" "github.com/waku-org/go-waku/waku/v2/payload" - "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/status-im/status-go/logutils" @@ -41,7 +40,7 @@ type MessageParams struct { // ReceivedMessage represents a data packet to be received through the // WakuV2 protocol and successfully decrypted. type ReceivedMessage struct { - Envelope *protocol.Envelope // Wrapped Waku Message + Envelope Envelope // Wrapped Waku Message MsgType MessageType @@ -105,7 +104,7 @@ type MemoryMessageStore struct { messages map[common.Hash]*ReceivedMessage } -func NewReceivedMessage(env *protocol.Envelope, msgType MessageType) *ReceivedMessage { +func NewReceivedMessage(env Envelope, msgType MessageType) *ReceivedMessage { ct, err := ExtractTopicFromContentTopic(env.Message().ContentTopic) if err != nil { logutils.ZapLogger().Debug("failed to extract content topic from message", diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index c0f9a298a..e19b5eb36 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -334,6 +334,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rpc" @@ -380,7 +381,7 @@ const randomPeersKeepAliveInterval = 5 * time.Second const allPeersKeepAliveInterval = 5 * time.Minute type SentEnvelope struct { - Envelope *protocol.Envelope + Envelope common.Envelope PublishMethod publish.PublishMethod } @@ -580,7 +581,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuCon sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), } - waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger) waku.bandwidthCounter = metrics.NewBandwidthCounter() if nodeKey == nil { @@ -1020,12 +1020,12 @@ func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.P w.logger.Error("could not unsubscribe", zap.Error(err)) } return - // TODO-nwaku - /*case env := <-sub[0].Ch: + + case env := <-w.node.MsgChan: err := w.OnNewEnvelopes(env, common.RelayedMessageType, false) if err != nil { w.logger.Error("OnNewEnvelopes error", zap.Error(err)) - }*/ + } } } }() @@ -1363,7 +1363,7 @@ func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { } // OnNewEnvelope is an interface from Waku FilterManager API that gets invoked when any new message is received by Filter. -func (w *Waku) OnNewEnvelope(env *protocol.Envelope) error { +func (w *Waku) OnNewEnvelope(env common.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType, false) } @@ -1503,6 +1503,7 @@ func (w *Waku) Start() error { w.node.FilterLightnode(), filterapi.WithBatchInterval(300*time.Millisecond)) } + */ err = w.setupRelaySubscriptions() if err != nil { @@ -1514,7 +1515,6 @@ func (w *Waku) Start() error { w.wg.Add(1) go w.processQueueLoop() } - */ w.wg.Add(1) @@ -1748,7 +1748,7 @@ func (w *Waku) Stop() error { return nil } -func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error { +func (w *Waku) OnNewEnvelopes(envelope common.Envelope, msgType common.MessageType, processImmediately bool) error { if envelope == nil { return nil } @@ -2326,7 +2326,7 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, ctx, cancel := context.WithCancel(context.Background()) - wakunode, err := newWakuNode(ctx, nwakuCfg) + wakunode, err := newWakuNode(ctx, nwakuCfg, logger) if err != nil { cancel() return nil, err @@ -2360,20 +2360,49 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, onPeerStats: onPeerStats, onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), + filters: common.NewFilters(cfg.DefaultShardPubsubTopic, logger), }, nil } -//export globalEventCallback -func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) { - // This is shared among all Golang instances - // TODO-nwaku - // self := Waku{wakuCtx: userData} - // self.MyEventCallback(callerRet, msg, len) +// The event callback sends back the node's ctx to know to which +// node is the event being emited for. Since we only have a global +// callback in the go side, We register all the nodes that we create +// so we can later obtain which instance of `WakuNode` is should +// be invoked depending on the ctx received + +var nodeRegistry map[unsafe.Pointer]*WakuNode + +func init() { + nodeRegistry = make(map[unsafe.Pointer]*WakuNode) } -func (self *Waku) MyEventCallback(callerRet C.int, msg *C.char, len C.size_t) { - fmt.Println("Event received:", C.GoStringN(msg, C.int(len))) +func registerNode(node *WakuNode) { + _, ok := nodeRegistry[node.wakuCtx] + if !ok { + nodeRegistry[node.wakuCtx] = node + } +} + +func unregisterNode(node *WakuNode) { + delete(nodeRegistry, node.wakuCtx) +} + +//export globalEventCallback +func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) { + if callerRet == C.RET_OK { + eventStr := C.GoStringN(msg, C.int(len)) + node, ok := nodeRegistry[userData] + if ok { + node.OnEvent(eventStr) + } + } else { + errMsgField := zap.Skip() + if len != 0 { + errMsgField = zap.String("error", C.GoStringN(msg, C.int(len))) + } + log.Error("globalEventCallback retCode not ok", zap.Int("retCode", int(callerRet)), errMsgField) + } } type response struct { @@ -2396,10 +2425,12 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // WakuNode represents an instance of an nwaku node type WakuNode struct { wakuCtx unsafe.Pointer + logger *zap.Logger cancel context.CancelFunc + MsgChan chan common.Envelope } -func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) { +func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { ctx, cancel := context.WithCancel(ctx) n := &WakuNode{ @@ -2440,6 +2471,8 @@ func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) { wg.Add(1) n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp) + n.MsgChan = make(chan common.Envelope, 100) + n.logger = logger.Named("nwaku") wg.Wait() // Notice that the events for self node are handled by the 'MyEventCallback' method @@ -2448,6 +2481,32 @@ func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) { return n, nil } +type jsonEvent struct { + EventType string `json:"eventType"` +} + +func (n *WakuNode) OnEvent(eventStr string) { + jsonEvent := jsonEvent{} + err := json.Unmarshal([]byte(eventStr), &jsonEvent) + if err != nil { + n.logger.Error("could not unmarshal nwaku event string", zap.Error(err)) + return + } + + switch jsonEvent.EventType { + case "message": + n.parseMessageEvent(eventStr) + } +} + +func (n *WakuNode) parseMessageEvent(eventStr string) { + envelope, err := common.NewEnvelope(eventStr) + if err != nil { + n.logger.Error("could not parse message", zap.Error(err)) + } + n.MsgChan <- envelope +} + func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { var pubsubTopic string if len(optPubsubTopic) == 0 { @@ -2811,6 +2870,7 @@ func (n *WakuNode) Start() error { C.cGoWakuStart(n.wakuCtx, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + registerNode(n) return nil } @@ -2828,6 +2888,7 @@ func (n *WakuNode) Stop() error { C.cGoWakuStop(n.wakuCtx, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + unregisterNode(n) return nil }