feat(nwaku)_: receive messages via relay (#6185)
This commit is contained in:
parent
cc2022b0c8
commit
387d45dc96
|
@ -1 +1 @@
|
|||
Subproject commit 47a6235414c2910ad9f540882bc5193ece84c552
|
||||
Subproject commit 2022f54f5c371b4f389c54451dcda77386959e17
|
|
@ -0,0 +1,79 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
)
|
||||
|
||||
// Envelope contains information about the pubsub topic of a WakuMessage
|
||||
// and a hash used to identify a message based on the bytes of a WakuMessage
|
||||
// protobuffer
|
||||
type Envelope interface {
|
||||
Message() *pb.WakuMessage
|
||||
PubsubTopic() string
|
||||
Hash() pb.MessageHash
|
||||
}
|
||||
|
||||
type envelopeImpl struct {
|
||||
msg *pb.WakuMessage
|
||||
topic string
|
||||
hash pb.MessageHash
|
||||
}
|
||||
|
||||
type tmpWakuMessageJson struct {
|
||||
Payload []byte `json:"payload,omitempty"`
|
||||
ContentTopic string `json:"contentTopic,omitempty"`
|
||||
Version *uint32 `json:"version,omitempty"`
|
||||
Timestamp *int64 `json:"timestamp,omitempty"`
|
||||
Meta []byte `json:"meta,omitempty"`
|
||||
Ephemeral *bool `json:"ephemeral,omitempty"`
|
||||
RateLimitProof []byte `json:"proof,omitempty"`
|
||||
}
|
||||
|
||||
type tmpEnvelopeStruct struct {
|
||||
WakuMessage tmpWakuMessageJson `json:"wakuMessage"`
|
||||
PubsubTopic string `json:"pubsubTopic"`
|
||||
MessageHash string `json:"messageHash"`
|
||||
}
|
||||
|
||||
// NewEnvelope creates a new Envelope from a json string generated in nwaku
|
||||
func NewEnvelope(jsonEventStr string) (Envelope, error) {
|
||||
tmpEnvelopeStruct := tmpEnvelopeStruct{}
|
||||
err := json.Unmarshal([]byte(jsonEventStr), &tmpEnvelopeStruct)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hash, err := hexutil.Decode(tmpEnvelopeStruct.MessageHash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &envelopeImpl{
|
||||
msg: &pb.WakuMessage{
|
||||
Payload: tmpEnvelopeStruct.WakuMessage.Payload,
|
||||
ContentTopic: tmpEnvelopeStruct.WakuMessage.ContentTopic,
|
||||
Version: tmpEnvelopeStruct.WakuMessage.Version,
|
||||
Timestamp: tmpEnvelopeStruct.WakuMessage.Timestamp,
|
||||
Meta: tmpEnvelopeStruct.WakuMessage.Meta,
|
||||
Ephemeral: tmpEnvelopeStruct.WakuMessage.Ephemeral,
|
||||
RateLimitProof: tmpEnvelopeStruct.WakuMessage.RateLimitProof,
|
||||
},
|
||||
topic: tmpEnvelopeStruct.PubsubTopic,
|
||||
hash: pb.ToMessageHash(hash),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *envelopeImpl) Message() *pb.WakuMessage {
|
||||
return e.msg
|
||||
}
|
||||
|
||||
func (e *envelopeImpl) PubsubTopic() string {
|
||||
return e.topic
|
||||
}
|
||||
|
||||
func (e *envelopeImpl) Hash() pb.MessageHash {
|
||||
return e.hash
|
||||
}
|
|
@ -9,7 +9,6 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/payload"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
|
||||
"github.com/status-im/status-go/logutils"
|
||||
|
||||
|
@ -41,7 +40,7 @@ type MessageParams struct {
|
|||
// ReceivedMessage represents a data packet to be received through the
|
||||
// WakuV2 protocol and successfully decrypted.
|
||||
type ReceivedMessage struct {
|
||||
Envelope *protocol.Envelope // Wrapped Waku Message
|
||||
Envelope Envelope // Wrapped Waku Message
|
||||
|
||||
MsgType MessageType
|
||||
|
||||
|
@ -105,7 +104,7 @@ type MemoryMessageStore struct {
|
|||
messages map[common.Hash]*ReceivedMessage
|
||||
}
|
||||
|
||||
func NewReceivedMessage(env *protocol.Envelope, msgType MessageType) *ReceivedMessage {
|
||||
func NewReceivedMessage(env Envelope, msgType MessageType) *ReceivedMessage {
|
||||
ct, err := ExtractTopicFromContentTopic(env.Message().ContentTopic)
|
||||
if err != nil {
|
||||
logutils.ZapLogger().Debug("failed to extract content topic from message",
|
||||
|
|
|
@ -334,6 +334,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
|
@ -380,7 +381,7 @@ const randomPeersKeepAliveInterval = 5 * time.Second
|
|||
const allPeersKeepAliveInterval = 5 * time.Minute
|
||||
|
||||
type SentEnvelope struct {
|
||||
Envelope *protocol.Envelope
|
||||
Envelope common.Envelope
|
||||
PublishMethod publish.PublishMethod
|
||||
}
|
||||
|
||||
|
@ -580,7 +581,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuCon
|
|||
sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish),
|
||||
}
|
||||
|
||||
waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger)
|
||||
waku.bandwidthCounter = metrics.NewBandwidthCounter()
|
||||
|
||||
if nodeKey == nil {
|
||||
|
@ -1020,12 +1020,12 @@ func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.P
|
|||
w.logger.Error("could not unsubscribe", zap.Error(err))
|
||||
}
|
||||
return
|
||||
// TODO-nwaku
|
||||
/*case env := <-sub[0].Ch:
|
||||
|
||||
case env := <-w.node.MsgChan:
|
||||
err := w.OnNewEnvelopes(env, common.RelayedMessageType, false)
|
||||
if err != nil {
|
||||
w.logger.Error("OnNewEnvelopes error", zap.Error(err))
|
||||
}*/
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -1363,7 +1363,7 @@ func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) {
|
|||
}
|
||||
|
||||
// OnNewEnvelope is an interface from Waku FilterManager API that gets invoked when any new message is received by Filter.
|
||||
func (w *Waku) OnNewEnvelope(env *protocol.Envelope) error {
|
||||
func (w *Waku) OnNewEnvelope(env common.Envelope) error {
|
||||
return w.OnNewEnvelopes(env, common.RelayedMessageType, false)
|
||||
}
|
||||
|
||||
|
@ -1503,6 +1503,7 @@ func (w *Waku) Start() error {
|
|||
w.node.FilterLightnode(),
|
||||
filterapi.WithBatchInterval(300*time.Millisecond))
|
||||
}
|
||||
*/
|
||||
|
||||
err = w.setupRelaySubscriptions()
|
||||
if err != nil {
|
||||
|
@ -1514,7 +1515,6 @@ func (w *Waku) Start() error {
|
|||
w.wg.Add(1)
|
||||
go w.processQueueLoop()
|
||||
}
|
||||
*/
|
||||
|
||||
w.wg.Add(1)
|
||||
|
||||
|
@ -1748,7 +1748,7 @@ func (w *Waku) Stop() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error {
|
||||
func (w *Waku) OnNewEnvelopes(envelope common.Envelope, msgType common.MessageType, processImmediately bool) error {
|
||||
if envelope == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -2326,7 +2326,7 @@ func wakuNew(nodeKey *ecdsa.PrivateKey,
|
|||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
wakunode, err := newWakuNode(ctx, nwakuCfg)
|
||||
wakunode, err := newWakuNode(ctx, nwakuCfg, logger)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, err
|
||||
|
@ -2360,20 +2360,49 @@ func wakuNew(nodeKey *ecdsa.PrivateKey,
|
|||
onPeerStats: onPeerStats,
|
||||
onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker),
|
||||
sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish),
|
||||
filters: common.NewFilters(cfg.DefaultShardPubsubTopic, logger),
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
//export globalEventCallback
|
||||
func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) {
|
||||
// This is shared among all Golang instances
|
||||
// TODO-nwaku
|
||||
// self := Waku{wakuCtx: userData}
|
||||
// self.MyEventCallback(callerRet, msg, len)
|
||||
// The event callback sends back the node's ctx to know to which
|
||||
// node is the event being emited for. Since we only have a global
|
||||
// callback in the go side, We register all the nodes that we create
|
||||
// so we can later obtain which instance of `WakuNode` is should
|
||||
// be invoked depending on the ctx received
|
||||
|
||||
var nodeRegistry map[unsafe.Pointer]*WakuNode
|
||||
|
||||
func init() {
|
||||
nodeRegistry = make(map[unsafe.Pointer]*WakuNode)
|
||||
}
|
||||
|
||||
func (self *Waku) MyEventCallback(callerRet C.int, msg *C.char, len C.size_t) {
|
||||
fmt.Println("Event received:", C.GoStringN(msg, C.int(len)))
|
||||
func registerNode(node *WakuNode) {
|
||||
_, ok := nodeRegistry[node.wakuCtx]
|
||||
if !ok {
|
||||
nodeRegistry[node.wakuCtx] = node
|
||||
}
|
||||
}
|
||||
|
||||
func unregisterNode(node *WakuNode) {
|
||||
delete(nodeRegistry, node.wakuCtx)
|
||||
}
|
||||
|
||||
//export globalEventCallback
|
||||
func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) {
|
||||
if callerRet == C.RET_OK {
|
||||
eventStr := C.GoStringN(msg, C.int(len))
|
||||
node, ok := nodeRegistry[userData]
|
||||
if ok {
|
||||
node.OnEvent(eventStr)
|
||||
}
|
||||
} else {
|
||||
errMsgField := zap.Skip()
|
||||
if len != 0 {
|
||||
errMsgField = zap.String("error", C.GoStringN(msg, C.int(len)))
|
||||
}
|
||||
log.Error("globalEventCallback retCode not ok", zap.Int("retCode", int(callerRet)), errMsgField)
|
||||
}
|
||||
}
|
||||
|
||||
type response struct {
|
||||
|
@ -2396,10 +2425,12 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
|||
// WakuNode represents an instance of an nwaku node
|
||||
type WakuNode struct {
|
||||
wakuCtx unsafe.Pointer
|
||||
logger *zap.Logger
|
||||
cancel context.CancelFunc
|
||||
MsgChan chan common.Envelope
|
||||
}
|
||||
|
||||
func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) {
|
||||
func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
n := &WakuNode{
|
||||
|
@ -2440,6 +2471,8 @@ func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) {
|
|||
|
||||
wg.Add(1)
|
||||
n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp)
|
||||
n.MsgChan = make(chan common.Envelope, 100)
|
||||
n.logger = logger.Named("nwaku")
|
||||
wg.Wait()
|
||||
|
||||
// Notice that the events for self node are handled by the 'MyEventCallback' method
|
||||
|
@ -2448,6 +2481,32 @@ func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) {
|
|||
return n, nil
|
||||
}
|
||||
|
||||
type jsonEvent struct {
|
||||
EventType string `json:"eventType"`
|
||||
}
|
||||
|
||||
func (n *WakuNode) OnEvent(eventStr string) {
|
||||
jsonEvent := jsonEvent{}
|
||||
err := json.Unmarshal([]byte(eventStr), &jsonEvent)
|
||||
if err != nil {
|
||||
n.logger.Error("could not unmarshal nwaku event string", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
switch jsonEvent.EventType {
|
||||
case "message":
|
||||
n.parseMessageEvent(eventStr)
|
||||
}
|
||||
}
|
||||
|
||||
func (n *WakuNode) parseMessageEvent(eventStr string) {
|
||||
envelope, err := common.NewEnvelope(eventStr)
|
||||
if err != nil {
|
||||
n.logger.Error("could not parse message", zap.Error(err))
|
||||
}
|
||||
n.MsgChan <- envelope
|
||||
}
|
||||
|
||||
func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
|
||||
var pubsubTopic string
|
||||
if len(optPubsubTopic) == 0 {
|
||||
|
@ -2811,6 +2870,7 @@ func (n *WakuNode) Start() error {
|
|||
C.cGoWakuStart(n.wakuCtx, resp)
|
||||
wg.Wait()
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
registerNode(n)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -2828,6 +2888,7 @@ func (n *WakuNode) Stop() error {
|
|||
C.cGoWakuStop(n.wakuCtx, resp)
|
||||
wg.Wait()
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
unregisterNode(n)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue