mirror of
https://github.com/logos-messaging/logos-messaging-go-bindings.git
synced 2026-01-14 03:43:08 +00:00
initial changes (not yet working)
This commit is contained in:
parent
532fc08bb0
commit
494018937d
@ -314,6 +314,7 @@ import (
|
||||
"unsafe"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
libp2pproto "github.com/libp2p/go-libp2p/core/protocol"
|
||||
@ -431,11 +432,6 @@ func (w *Waku) DropPeer(peerID peer.ID) error {
|
||||
return w.node.DisconnectPeerByID(peerID)
|
||||
}
|
||||
|
||||
type response struct {
|
||||
err error
|
||||
value any
|
||||
}
|
||||
|
||||
//export GoCallback
|
||||
func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
||||
if resp != nil {
|
||||
@ -448,13 +444,20 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
||||
}
|
||||
}
|
||||
|
||||
type MessageData struct {
|
||||
Msg pb.WakuMessage
|
||||
PubsubTopic string
|
||||
}
|
||||
|
||||
// WakuNode represents an instance of an nwaku node
|
||||
type WakuNode struct {
|
||||
wakuCtx unsafe.Pointer
|
||||
logger *zap.Logger
|
||||
cancel context.CancelFunc
|
||||
MsgChan chan MessageData
|
||||
}
|
||||
|
||||
func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) {
|
||||
func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
n := &WakuNode{
|
||||
@ -495,6 +498,8 @@ func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) {
|
||||
|
||||
wg.Add(1)
|
||||
n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp)
|
||||
n.MsgChan = make(chan MessageData, 100)
|
||||
n.logger = logger.Named("nwaku")
|
||||
wg.Wait()
|
||||
|
||||
// Notice that the events for self node are handled by the 'MyEventCallback' method
|
||||
@ -516,7 +521,7 @@ func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) {
|
||||
logger.Info("starting Waku with config", zap.Any("nwakuCfg", nwakuCfg))
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
wakunode, err := newWakuNode(ctx, nwakuCfg)
|
||||
wakunode, err := newWakuNode(ctx, nwakuCfg, logger)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, err
|
||||
@ -531,12 +536,70 @@ func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// The event callback sends back the node's ctx to know to which
|
||||
// node is the event being emited for. Since we only have a global
|
||||
// callback in the go side, We register all the nodes that we create
|
||||
// so we can later obtain which instance of `WakuNode` is should
|
||||
// be invoked depending on the ctx received
|
||||
|
||||
var nodeRegistry map[unsafe.Pointer]*WakuNode
|
||||
|
||||
func init() {
|
||||
nodeRegistry = make(map[unsafe.Pointer]*WakuNode)
|
||||
}
|
||||
|
||||
func registerNode(node *WakuNode) {
|
||||
_, ok := nodeRegistry[node.wakuCtx]
|
||||
if !ok {
|
||||
nodeRegistry[node.wakuCtx] = node
|
||||
}
|
||||
}
|
||||
|
||||
func unregisterNode(node *WakuNode) {
|
||||
delete(nodeRegistry, node.wakuCtx)
|
||||
}
|
||||
|
||||
//export globalEventCallback
|
||||
func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) {
|
||||
// This is shared among all Golang instances
|
||||
// TODO-nwaku
|
||||
// self := Waku{wakuCtx: userData}
|
||||
// self.MyEventCallback(callerRet, msg, len)
|
||||
if callerRet == C.RET_OK {
|
||||
eventStr := C.GoStringN(msg, C.int(len))
|
||||
node, ok := nodeRegistry[userData]
|
||||
if ok {
|
||||
node.OnEvent(eventStr)
|
||||
}
|
||||
} else {
|
||||
errMsgField := zap.Skip()
|
||||
if len != 0 {
|
||||
errMsgField = zap.String("error", C.GoStringN(msg, C.int(len)))
|
||||
}
|
||||
log.Error("globalEventCallback retCode not ok", zap.Int("retCode", int(callerRet)), errMsgField)
|
||||
}
|
||||
}
|
||||
|
||||
type jsonEvent struct {
|
||||
EventType string `json:"eventType"`
|
||||
}
|
||||
|
||||
func (n *WakuNode) OnEvent(eventStr string) {
|
||||
jsonEvent := jsonEvent{}
|
||||
err := json.Unmarshal([]byte(eventStr), &jsonEvent)
|
||||
if err != nil {
|
||||
n.logger.Error("could not unmarshal nwaku event string", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
switch jsonEvent.EventType {
|
||||
case "message":
|
||||
n.parseMessageEvent(eventStr)
|
||||
}
|
||||
}
|
||||
|
||||
func (n *WakuNode) parseMessageEvent(eventStr string) {
|
||||
envelope, err := common.NewEnvelope(eventStr)
|
||||
if err != nil {
|
||||
n.logger.Error("could not parse message", zap.Error(err))
|
||||
}
|
||||
n.MsgChan <- envelope
|
||||
}
|
||||
|
||||
func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
|
||||
@ -900,6 +963,7 @@ func (n *WakuNode) Start() error {
|
||||
C.cGoWakuStart(n.wakuCtx, resp)
|
||||
wg.Wait()
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
registerNode(n)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -917,6 +981,7 @@ func (n *WakuNode) Stop() error {
|
||||
C.cGoWakuStop(n.wakuCtx, resp)
|
||||
wg.Wait()
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
unregisterNode(n)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user