mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-02 14:03:06 +00:00
feat: add admin rpc methods
This commit is contained in:
parent
6ae4d4fce2
commit
978bedfafa
@ -77,7 +77,7 @@ func (c ConnectionNotifier) Close() {
|
||||
func (w *WakuNode) sendConnStatus() {
|
||||
isOnline, hasHistory := w.Status()
|
||||
if w.connStatusChan != nil {
|
||||
connStatus := ConnStatus{IsOnline: isOnline, HasHistory: hasHistory, Peers: w.Peers()}
|
||||
connStatus := ConnStatus{IsOnline: isOnline, HasHistory: hasHistory, Peers: w.PeerStats()}
|
||||
w.connStatusChan <- connStatus
|
||||
}
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/event"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||
p2pproto "github.com/libp2p/go-libp2p-core/protocol"
|
||||
@ -32,6 +33,13 @@ var log = logging.Logger("wakunode")
|
||||
|
||||
type Message []byte
|
||||
|
||||
type Peer struct {
|
||||
ID peer.ID
|
||||
Protocols []string
|
||||
Addrs []ma.Multiaddr
|
||||
Connected bool
|
||||
}
|
||||
|
||||
type WakuNode struct {
|
||||
host host.Host
|
||||
opts *WakuNodeParameters
|
||||
@ -383,7 +391,7 @@ func (w *WakuNode) PeerCount() int {
|
||||
return len(w.host.Network().Peers())
|
||||
}
|
||||
|
||||
func (w *WakuNode) Peers() PeerStats {
|
||||
func (w *WakuNode) PeerStats() PeerStats {
|
||||
p := make(PeerStats)
|
||||
for _, peerID := range w.host.Network().Peers() {
|
||||
protocols, err := w.host.Peerstore().GetProtocols(peerID)
|
||||
@ -395,6 +403,26 @@ func (w *WakuNode) Peers() PeerStats {
|
||||
return p
|
||||
}
|
||||
|
||||
func (w *WakuNode) Peers() ([]*Peer, error) {
|
||||
var peers []*Peer
|
||||
for _, peerId := range w.host.Peerstore().Peers() {
|
||||
connected := w.host.Network().Connectedness(peerId) == network.Connected
|
||||
protocols, err := w.host.Peerstore().GetProtocols(peerId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
addrs := w.host.Peerstore().Addrs(peerId)
|
||||
peers = append(peers, &Peer{
|
||||
ID: peerId,
|
||||
Protocols: protocols,
|
||||
Connected: connected,
|
||||
Addrs: addrs,
|
||||
})
|
||||
}
|
||||
return peers, nil
|
||||
}
|
||||
|
||||
// startKeepAlive creates a go routine that periodically pings connected peers.
|
||||
// This is necessary because TCP connections are automatically closed due to inactivity,
|
||||
// and doing a ping will avoid this (with a small bandwidth cost)
|
||||
|
||||
71
waku/v2/rpc/admin.go
Normal file
71
waku/v2/rpc/admin.go
Normal file
@ -0,0 +1,71 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
|
||||
"github.com/status-im/go-waku/waku/v2/node"
|
||||
)
|
||||
|
||||
type AdminService struct {
|
||||
node *node.WakuNode
|
||||
}
|
||||
|
||||
type GetPeersArgs struct {
|
||||
}
|
||||
|
||||
type PeersArgs struct {
|
||||
Peers []string `json:"peers,omitempty"`
|
||||
}
|
||||
|
||||
type PeerReply struct {
|
||||
Multiaddr string `json:"mutliaddr,omitempty"`
|
||||
Protocol string `json:"protocol,omitempty"`
|
||||
Connected bool `json:"connected,omitempty"`
|
||||
}
|
||||
|
||||
type PeersReply struct {
|
||||
Peers []PeerReply `json:"peers,omitempty"`
|
||||
}
|
||||
|
||||
func (a *AdminService) PostV1Peers(req *http.Request, args *PeersArgs, reply *SuccessReply) error {
|
||||
for _, peer := range args.Peers {
|
||||
addr, err := ma.NewMultiaddr(peer)
|
||||
if err != nil {
|
||||
log.Error("Error building multiaddr", err)
|
||||
reply.Success = false
|
||||
reply.Error = err.Error()
|
||||
return nil
|
||||
}
|
||||
|
||||
err = a.node.DialPeerWithMultiAddress(req.Context(), addr)
|
||||
if err != nil {
|
||||
log.Error("Error dialing peers", err)
|
||||
reply.Success = false
|
||||
reply.Error = err.Error()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
reply.Success = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *AdminService) GetV1Peers(req *http.Request, args *GetPeersArgs, reply *PeersReply) error {
|
||||
peers, err := a.node.Peers()
|
||||
if err != nil {
|
||||
log.Error("Error getting peers", err)
|
||||
return nil
|
||||
}
|
||||
for _, peer := range peers {
|
||||
for idx, addr := range peer.Addrs {
|
||||
reply.Peers = append(reply.Peers, PeerReply{
|
||||
Multiaddr: addr.String(),
|
||||
Protocol: peer.Protocols[idx],
|
||||
Connected: peer.Connected,
|
||||
})
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
62
waku/v2/rpc/admin_test.go
Normal file
62
waku/v2/rpc/admin_test.go
Normal file
@ -0,0 +1,62 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
|
||||
"github.com/status-im/go-waku/tests"
|
||||
"github.com/status-im/go-waku/waku/v2/node"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func makeAdminService(t *testing.T) *AdminService {
|
||||
options := node.WithWakuRelay()
|
||||
n, err := node.New(context.Background(), options)
|
||||
require.NoError(t, err)
|
||||
err = n.Start()
|
||||
require.NoError(t, err)
|
||||
return &AdminService{n}
|
||||
}
|
||||
|
||||
func TestV1Peers(t *testing.T) {
|
||||
port, err := tests.FindFreePort(t, "", 5)
|
||||
require.NoError(t, err)
|
||||
|
||||
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
var reply PeersReply
|
||||
|
||||
request, err := http.NewRequest(http.MethodPost, "url", bytes.NewReader([]byte("")))
|
||||
require.NoError(t, err)
|
||||
|
||||
a := makeAdminService(t)
|
||||
|
||||
err = a.GetV1Peers(request, &GetPeersArgs{}, &reply)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, reply.Peers, 0)
|
||||
|
||||
var reply2 SuccessReply
|
||||
|
||||
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().Pretty()))
|
||||
require.NoError(t, err)
|
||||
|
||||
var addr multiaddr.Multiaddr
|
||||
for _, a := range host.Addrs() {
|
||||
addr = a.Encapsulate(hostInfo)
|
||||
break
|
||||
}
|
||||
err = a.PostV1Peers(request, &PeersArgs{Peers: []string{addr.String()}}, &reply2)
|
||||
require.NoError(t, err)
|
||||
require.True(t, reply2.Success)
|
||||
|
||||
err = a.GetV1Peers(request, &GetPeersArgs{}, &reply)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, reply.Peers, 2)
|
||||
}
|
||||
@ -21,11 +21,6 @@ type TopicsArgs struct {
|
||||
Topics []string `json:"topics,omitempty"`
|
||||
}
|
||||
|
||||
type SuccessReply struct {
|
||||
Success bool `json:"success,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error {
|
||||
_, err := r.node.Relay().Publish(req.Context(), &args.Message, (*relay.Topic)(&args.Topic))
|
||||
if err != nil {
|
||||
|
||||
6
waku/v2/rpc/rpc_type.go
Normal file
6
waku/v2/rpc/rpc_type.go
Normal file
@ -0,0 +1,6 @@
|
||||
package rpc
|
||||
|
||||
type SuccessReply struct {
|
||||
Success bool `json:"success,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
@ -38,6 +38,11 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc {
|
||||
log.Error(err)
|
||||
}
|
||||
|
||||
err = s.RegisterService(&AdminService{node}, "Admin")
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) {
|
||||
t := time.Now()
|
||||
|
||||
@ -3,7 +3,6 @@ package utils
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -71,7 +70,6 @@ func TestSelectPeerWithLowestRTT(t *testing.T) {
|
||||
|
||||
// No peers with selected protocol
|
||||
_, err = SelectPeerWithLowestRTT(ctx, h1, proto)
|
||||
fmt.Println(err)
|
||||
require.Error(t, ErrNoPeersAvailable, err)
|
||||
|
||||
// Peers with selected protocol
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user