chore: porting fixes for receiving relay messages (#7)

This commit is contained in:
gabrielmer 2024-12-13 13:28:26 +01:00 committed by GitHub
parent 532fc08bb0
commit 81b59bcbeb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 152 additions and 11 deletions

79
waku/common/envelope.go Normal file
View File

@ -0,0 +1,79 @@
package common
import (
"encoding/json"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
// Envelope contains information about the pubsub topic of a WakuMessage
// and a hash used to identify a message based on the bytes of a WakuMessage
// protobuffer
type Envelope interface {
Message() *pb.WakuMessage
PubsubTopic() string
Hash() pb.MessageHash
}
type envelopeImpl struct {
msg *pb.WakuMessage
topic string
hash pb.MessageHash
}
type tmpWakuMessageJson struct {
Payload []byte `json:"payload,omitempty"`
ContentTopic string `json:"contentTopic,omitempty"`
Version *uint32 `json:"version,omitempty"`
Timestamp *int64 `json:"timestamp,omitempty"`
Meta []byte `json:"meta,omitempty"`
Ephemeral *bool `json:"ephemeral,omitempty"`
RateLimitProof []byte `json:"proof,omitempty"`
}
type tmpEnvelopeStruct struct {
WakuMessage tmpWakuMessageJson `json:"wakuMessage"`
PubsubTopic string `json:"pubsubTopic"`
MessageHash string `json:"messageHash"`
}
// NewEnvelope creates a new Envelope from a json string generated in nwaku
func NewEnvelope(jsonEventStr string) (Envelope, error) {
tmpEnvelopeStruct := tmpEnvelopeStruct{}
err := json.Unmarshal([]byte(jsonEventStr), &tmpEnvelopeStruct)
if err != nil {
return nil, err
}
hash, err := hexutil.Decode(tmpEnvelopeStruct.MessageHash)
if err != nil {
return nil, err
}
return &envelopeImpl{
msg: &pb.WakuMessage{
Payload: tmpEnvelopeStruct.WakuMessage.Payload,
ContentTopic: tmpEnvelopeStruct.WakuMessage.ContentTopic,
Version: tmpEnvelopeStruct.WakuMessage.Version,
Timestamp: tmpEnvelopeStruct.WakuMessage.Timestamp,
Meta: tmpEnvelopeStruct.WakuMessage.Meta,
Ephemeral: tmpEnvelopeStruct.WakuMessage.Ephemeral,
RateLimitProof: tmpEnvelopeStruct.WakuMessage.RateLimitProof,
},
topic: tmpEnvelopeStruct.PubsubTopic,
hash: pb.ToMessageHash(hash),
}, nil
}
func (e *envelopeImpl) Message() *pb.WakuMessage {
return e.msg
}
func (e *envelopeImpl) PubsubTopic() string {
return e.topic
}
func (e *envelopeImpl) Hash() pb.MessageHash {
return e.hash
}

View File

@ -314,6 +314,7 @@ import (
"unsafe"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
libp2pproto "github.com/libp2p/go-libp2p/core/protocol"
@ -321,10 +322,12 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/waku-go-bindings/waku/common"
"go.uber.org/zap"
)
const requestTimeout = 30 * time.Second
const MsgChanBufferSize = 100
type WakuConfig struct {
Host string `json:"host,omitempty"`
@ -431,11 +434,6 @@ func (w *Waku) DropPeer(peerID peer.ID) error {
return w.node.DisconnectPeerByID(peerID)
}
type response struct {
err error
value any
}
//export GoCallback
func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
if resp != nil {
@ -451,10 +449,12 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
// WakuNode represents an instance of an nwaku node
type WakuNode struct {
wakuCtx unsafe.Pointer
logger *zap.Logger
cancel context.CancelFunc
MsgChan chan common.Envelope
}
func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) {
func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
ctx, cancel := context.WithCancel(ctx)
n := &WakuNode{
@ -495,6 +495,8 @@ func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) {
wg.Add(1)
n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp)
n.MsgChan = make(chan common.Envelope, MsgChanBufferSize)
n.logger = logger.Named("nwaku")
wg.Wait()
// Notice that the events for self node are handled by the 'MyEventCallback' method
@ -516,7 +518,7 @@ func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) {
logger.Info("starting Waku with config", zap.Any("nwakuCfg", nwakuCfg))
ctx, cancel := context.WithCancel(context.Background())
wakunode, err := newWakuNode(ctx, nwakuCfg)
wakunode, err := newWakuNode(ctx, nwakuCfg, logger)
if err != nil {
cancel()
return nil, err
@ -531,12 +533,70 @@ func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) {
}, nil
}
// The event callback sends back the node's ctx to know to which
// node is the event being emited for. Since we only have a global
// callback in the go side, We register all the nodes that we create
// so we can later obtain which instance of `WakuNode` is should
// be invoked depending on the ctx received
var nodeRegistry map[unsafe.Pointer]*WakuNode
func init() {
nodeRegistry = make(map[unsafe.Pointer]*WakuNode)
}
func registerNode(node *WakuNode) {
_, ok := nodeRegistry[node.wakuCtx]
if !ok {
nodeRegistry[node.wakuCtx] = node
}
}
func unregisterNode(node *WakuNode) {
delete(nodeRegistry, node.wakuCtx)
}
//export globalEventCallback
func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) {
// This is shared among all Golang instances
// TODO-nwaku
// self := Waku{wakuCtx: userData}
// self.MyEventCallback(callerRet, msg, len)
if callerRet == C.RET_OK {
eventStr := C.GoStringN(msg, C.int(len))
node, ok := nodeRegistry[userData] // userData contains node's ctx
if ok {
node.OnEvent(eventStr)
}
} else {
errMsgField := zap.Skip()
if len != 0 {
errMsgField = zap.String("error", C.GoStringN(msg, C.int(len)))
}
log.Error("globalEventCallback retCode not ok", zap.Int("retCode", int(callerRet)), errMsgField)
}
}
type jsonEvent struct {
EventType string `json:"eventType"`
}
func (n *WakuNode) OnEvent(eventStr string) {
jsonEvent := jsonEvent{}
err := json.Unmarshal([]byte(eventStr), &jsonEvent)
if err != nil {
n.logger.Error("could not unmarshal nwaku event string", zap.Error(err))
return
}
switch jsonEvent.EventType {
case "message":
n.parseMessageEvent(eventStr)
}
}
func (n *WakuNode) parseMessageEvent(eventStr string) {
envelope, err := common.NewEnvelope(eventStr)
if err != nil {
n.logger.Error("could not parse message", zap.Error(err))
}
n.MsgChan <- envelope
}
func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
@ -900,6 +960,7 @@ func (n *WakuNode) Start() error {
C.cGoWakuStart(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
registerNode(n)
return nil
}
@ -917,6 +978,7 @@ func (n *WakuNode) Stop() error {
C.cGoWakuStop(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
unregisterNode(n)
return nil
}