diff --git a/waku/nwaku.go b/waku/nwaku.go index 1176c65..23552d9 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -314,6 +314,7 @@ import ( "unsafe" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" libp2pproto "github.com/libp2p/go-libp2p/core/protocol" @@ -431,11 +432,6 @@ func (w *Waku) DropPeer(peerID peer.ID) error { return w.node.DisconnectPeerByID(peerID) } -type response struct { - err error - value any -} - //export GoCallback func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { if resp != nil { @@ -448,13 +444,20 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { } } +type MessageData struct { + Msg pb.WakuMessage + PubsubTopic string +} + // WakuNode represents an instance of an nwaku node type WakuNode struct { wakuCtx unsafe.Pointer + logger *zap.Logger cancel context.CancelFunc + MsgChan chan MessageData } -func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) { +func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*WakuNode, error) { ctx, cancel := context.WithCancel(ctx) n := &WakuNode{ @@ -495,6 +498,8 @@ func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) { wg.Add(1) n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp) + n.MsgChan = make(chan MessageData, 100) + n.logger = logger.Named("nwaku") wg.Wait() // Notice that the events for self node are handled by the 'MyEventCallback' method @@ -516,7 +521,7 @@ func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) { logger.Info("starting Waku with config", zap.Any("nwakuCfg", nwakuCfg)) ctx, cancel := context.WithCancel(context.Background()) - wakunode, err := newWakuNode(ctx, nwakuCfg) + wakunode, err := newWakuNode(ctx, nwakuCfg, logger) if err != nil { cancel() return nil, err @@ -531,12 +536,70 @@ func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) { }, nil } +// The event callback sends back the node's ctx to know to which +// node is the event being emited for. Since we only have a global +// callback in the go side, We register all the nodes that we create +// so we can later obtain which instance of `WakuNode` is should +// be invoked depending on the ctx received + +var nodeRegistry map[unsafe.Pointer]*WakuNode + +func init() { + nodeRegistry = make(map[unsafe.Pointer]*WakuNode) +} + +func registerNode(node *WakuNode) { + _, ok := nodeRegistry[node.wakuCtx] + if !ok { + nodeRegistry[node.wakuCtx] = node + } +} + +func unregisterNode(node *WakuNode) { + delete(nodeRegistry, node.wakuCtx) +} + //export globalEventCallback func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) { - // This is shared among all Golang instances - // TODO-nwaku - // self := Waku{wakuCtx: userData} - // self.MyEventCallback(callerRet, msg, len) + if callerRet == C.RET_OK { + eventStr := C.GoStringN(msg, C.int(len)) + node, ok := nodeRegistry[userData] + if ok { + node.OnEvent(eventStr) + } + } else { + errMsgField := zap.Skip() + if len != 0 { + errMsgField = zap.String("error", C.GoStringN(msg, C.int(len))) + } + log.Error("globalEventCallback retCode not ok", zap.Int("retCode", int(callerRet)), errMsgField) + } +} + +type jsonEvent struct { + EventType string `json:"eventType"` +} + +func (n *WakuNode) OnEvent(eventStr string) { + jsonEvent := jsonEvent{} + err := json.Unmarshal([]byte(eventStr), &jsonEvent) + if err != nil { + n.logger.Error("could not unmarshal nwaku event string", zap.Error(err)) + return + } + + switch jsonEvent.EventType { + case "message": + n.parseMessageEvent(eventStr) + } +} + +func (n *WakuNode) parseMessageEvent(eventStr string) { + envelope, err := common.NewEnvelope(eventStr) + if err != nil { + n.logger.Error("could not parse message", zap.Error(err)) + } + n.MsgChan <- envelope } func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { @@ -900,6 +963,7 @@ func (n *WakuNode) Start() error { C.cGoWakuStart(n.wakuCtx, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + registerNode(n) return nil } @@ -917,6 +981,7 @@ func (n *WakuNode) Stop() error { C.cGoWakuStop(n.wakuCtx, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + unregisterNode(n) return nil }