fix: js-waku / nwaku interop (#252)

This commit is contained in:
Richard Ramos 2022-06-13 14:30:35 -04:00 committed by GitHub
parent 73af2002ce
commit 3c0c3c4eeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 920 additions and 394 deletions

2
go.mod
View File

@ -36,6 +36,8 @@ require (
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29
)
require golang.org/x/text v0.3.7
require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect

10
waku.go
View File

@ -111,6 +111,16 @@ func main() {
Usage: "Use SQLiteDB to persist information",
Destination: &options.UseDB,
},
&cli.BoolFlag{
Name: "persist-messages",
Usage: "Enable message persistence",
Destination: &options.Store.PersistMessages,
},
&cli.StringFlag{
Name: "nat",
Usage: "TODO - Not implemented yet.", // This was added so js-waku test don't fail
Destination: &options.NAT,
},
&cli.StringFlag{
Name: "db-path",
Aliases: []string{"dbpath"},

View File

@ -193,10 +193,14 @@ func Execute(options Options) {
}
if options.Store.Enable {
nodeOpts = append(nodeOpts, node.WithWakuStoreAndRetentionPolicy(options.Store.ShouldResume, options.Store.RetentionMaxDaysDuration(), options.Store.RetentionMaxMessages))
dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxDaysDuration()))
failOnErr(err, "DBStore")
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))
if options.Store.PersistMessages {
nodeOpts = append(nodeOpts, node.WithWakuStoreAndRetentionPolicy(options.Store.ShouldResume, options.Store.RetentionMaxDaysDuration(), options.Store.RetentionMaxMessages))
dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxDaysDuration()))
failOnErr(err, "DBStore")
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))
} else {
nodeOpts = append(nodeOpts, node.WithWakuStore(false, false))
}
}
if options.LightPush.Enable {

View File

@ -53,6 +53,7 @@ type LightpushOptions struct {
// node and provide message history to nodes that ask for it.
type StoreOptions struct {
Enable bool
PersistMessages bool
ShouldResume bool
RetentionMaxDays int
RetentionMaxMessages int
@ -123,6 +124,7 @@ type Options struct {
ShowAddresses bool
LogLevel string
LogEncoding string
NAT string
Websocket WSOptions
Relay RelayOptions

View File

@ -3,11 +3,8 @@ package discv5
import (
"context"
"crypto/ecdsa"
"errors"
"math"
"math/rand"
"net"
"strconv"
"sync"
"time"
@ -17,7 +14,6 @@ import (
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-discover/discover"
"github.com/status-im/go-waku/logging"
"github.com/status-im/go-waku/waku/v2/utils"
@ -43,12 +39,6 @@ type DiscoveryV5 struct {
wg *sync.WaitGroup
peerCache peerCache
// Used for those weird cases where updateAddress
// receives the same external address twice both with the original port
// and the nat port. Ideally this attribute should be removed by doing
// hole punching before starting waku
ogTCPPort int
}
type peerCache struct {
@ -101,7 +91,7 @@ func DefaultOptions() []DiscoveryV5Option {
}
}
func NewDiscoveryV5(host host.Host, addresses []ma.Multiaddr, priv *ecdsa.PrivateKey, wakuFlags utils.WakuEnrBitfield, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.LocalNode, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
params := new(discV5Parameters)
optList := DefaultOptions()
optList = append(optList, opts...)
@ -111,16 +101,6 @@ func NewDiscoveryV5(host host.Host, addresses []ma.Multiaddr, priv *ecdsa.Privat
logger := log.Named("discv5")
ipAddr, err := selectIPAddr(addresses)
if err != nil {
return nil, err
}
localnode, err := newLocalnode(priv, ipAddr, params.udpPort, wakuFlags, params.advertiseAddr, logger)
if err != nil {
return nil, err
}
var NAT nat.Interface = nil
if params.advertiseAddr == nil {
NAT = nat.Any()
@ -151,41 +131,10 @@ func NewDiscoveryV5(host host.Host, addresses []ma.Multiaddr, priv *ecdsa.Privat
IP: net.IPv4zero,
Port: params.udpPort,
},
log: logger,
ogTCPPort: ipAddr.Port,
log: logger,
}, nil
}
func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) {
db, err := enode.OpenDB("")
if err != nil {
return nil, err
}
localnode := enode.NewLocalNode(db, priv)
localnode.SetFallbackUDP(udpPort)
localnode.Set(enr.WithEntry(utils.WakuENRField, wakuFlags))
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
localnode.SetStaticIP(ipAddr.IP)
if udpPort > 0 && udpPort <= math.MaxUint16 {
localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion]
} else {
log.Error("setting udpPort", zap.Int("port", udpPort))
}
if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 {
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion]
} else {
log.Error("setting tcpPort", zap.Int("port", ipAddr.Port))
}
if advertiseAddr != nil {
localnode.SetStaticIP(*advertiseAddr)
}
return localnode, nil
}
func (d *DiscoveryV5) listen() error {
conn, err := net.ListenUDP("udp", d.udpAddr)
if err != nil {
@ -193,7 +142,6 @@ func (d *DiscoveryV5) listen() error {
}
d.udpAddr = conn.LocalAddr().(*net.UDPAddr)
if d.NAT != nil && !d.udpAddr.IP.IsLoopback() {
d.wg.Add(1)
go func() {
@ -250,34 +198,6 @@ func (d *DiscoveryV5) Stop() {
d.wg.Wait()
}
func (d *DiscoveryV5) UpdateAddr(addr *net.TCPAddr) error {
if !d.params.autoUpdate {
return nil
}
d.Lock()
defer d.Unlock()
// TODO: This code is not elegant and should be improved
if !isExternal(addr) && !isExternal(&net.TCPAddr{IP: d.localnode.Node().IP()}) {
if !((d.localnode.Node().IP().IsLoopback() && isPrivate(addr)) || (isPrivate(&net.TCPAddr{IP: d.localnode.Node().IP()}) && isExternal(addr))) {
return nil
}
}
if addr.IP.IsUnspecified() || (d.localnode.Node().IP().Equal(addr.IP) && addr.Port == d.ogTCPPort) {
return nil
}
d.localnode.SetStaticIP(addr.IP)
d.localnode.Set(enr.TCP(uint16(addr.Port))) // lgtm [go/incorrect-integer-conversion]
d.log.Info("updated Discovery V5 node address", logging.TCPAddr("address", d.localnode.Node().IP(), d.localnode.Node().TCP()))
d.log.Info("Discovery V5", logging.ENode("enr", d.localnode.Node()))
return nil
}
/*
func isWakuNode(node *enode.Node) bool {
enrField := new(utils.WakuEnrBitfield)
@ -472,82 +392,3 @@ func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...disco
return chPeer, err
}
// IsPrivate reports whether ip is a private address, according to
// RFC 1918 (IPv4 addresses) and RFC 4193 (IPv6 addresses).
// Copied/Adapted from https://go-review.googlesource.com/c/go/+/272668/11/src/net/ip.go
// Copyright (c) The Go Authors. All rights reserved.
// @TODO: once Go 1.17 is released in Q42021, remove this function as it will become part of the language
func isPrivate(ip *net.TCPAddr) bool {
if ip4 := ip.IP.To4(); ip4 != nil {
// Following RFC 4193, Section 3. Local IPv6 Unicast Addresses which says:
// The Internet Assigned Numbers Authority (IANA) has reserved the
// following three blocks of the IPv4 address space for private internets:
// 10.0.0.0 - 10.255.255.255 (10/8 prefix)
// 172.16.0.0 - 172.31.255.255 (172.16/12 prefix)
// 192.168.0.0 - 192.168.255.255 (192.168/16 prefix)
return ip4[0] == 10 ||
(ip4[0] == 172 && ip4[1]&0xf0 == 16) ||
(ip4[0] == 192 && ip4[1] == 168)
}
// Following RFC 4193, Section 3. Private Address Space which says:
// The Internet Assigned Numbers Authority (IANA) has reserved the
// following block of the IPv6 address space for local internets:
// FC00:: - FDFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF (FC00::/7 prefix)
return len(ip.IP) == net.IPv6len && ip.IP[0]&0xfe == 0xfc
}
func isExternal(ip *net.TCPAddr) bool {
return !isPrivate(ip) && !ip.IP.IsLoopback() && !ip.IP.IsUnspecified()
}
func isLoopback(ip *net.TCPAddr) bool {
return ip.IP.IsLoopback()
}
func filter(ss []*net.TCPAddr, fn func(*net.TCPAddr) bool) (ret []*net.TCPAddr) {
for _, s := range ss {
if fn(s) {
ret = append(ret, s)
}
}
return
}
func selectIPAddr(addresses []ma.Multiaddr) (*net.TCPAddr, error) {
var ipAddrs []*net.TCPAddr
for _, addr := range addresses {
ipStr, err := addr.ValueForProtocol(ma.P_IP4)
if err != nil {
continue
}
portStr, err := addr.ValueForProtocol(ma.P_TCP)
if err != nil {
continue
}
port, err := strconv.Atoi(portStr)
if err != nil {
continue
}
ipAddrs = append(ipAddrs, &net.TCPAddr{
IP: net.ParseIP(ipStr),
Port: port,
})
}
externalIPs := filter(ipAddrs, isExternal)
if len(externalIPs) > 0 {
return externalIPs[0], nil
}
privateIPs := filter(ipAddrs, isPrivate)
if len(privateIPs) > 0 {
return privateIPs[0], nil
}
loopback := filter(ipAddrs, isLoopback)
if len(loopback) > 0 {
return loopback[0], nil
}
return nil, errors.New("could not obtain ip address")
}

View File

@ -4,15 +4,20 @@ import (
"context"
"crypto/ecdsa"
"fmt"
"math"
"net"
"strconv"
"testing"
"time"
gcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/tests"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"github.com/libp2p/go-libp2p"
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
@ -41,25 +46,87 @@ func createHost(t *testing.T) (host.Host, int, *ecdsa.PrivateKey) {
return host, port, privKey
}
func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) {
db, err := enode.OpenDB("")
if err != nil {
return nil, err
}
localnode := enode.NewLocalNode(db, priv)
localnode.SetFallbackUDP(udpPort)
localnode.Set(enr.WithEntry(utils.WakuENRField, wakuFlags))
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
localnode.SetStaticIP(ipAddr.IP)
if udpPort > 0 && udpPort <= math.MaxUint16 {
localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion]
} else {
log.Error("setting udpPort", zap.Int("port", udpPort))
}
if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 {
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion]
} else {
log.Error("setting tcpPort", zap.Int("port", ipAddr.Port))
}
if advertiseAddr != nil {
localnode.SetStaticIP(*advertiseAddr)
}
return localnode, nil
}
func extractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) {
ipStr, err := addr.ValueForProtocol(multiaddr.P_IP4)
if err != nil {
return nil, err
}
portStr, err := addr.ValueForProtocol(multiaddr.P_TCP)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}
return &net.TCPAddr{
IP: net.ParseIP(ipStr),
Port: port,
}, nil
}
func TestDiscV5(t *testing.T) {
// Host1 <-> Host2 <-> Host3
// H1
host1, _, prvKey1 := createHost(t)
udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3)
require.NoError(t, err)
d1, err := NewDiscoveryV5(host1, host1.Addrs(), prvKey1, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort1))
ip1, _ := extractIP(host1.Addrs()[0])
l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
d1, err := NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), WithUDPPort(udpPort1))
require.NoError(t, err)
// H2
host2, _, prvKey2 := createHost(t)
ip2, _ := extractIP(host2.Addrs()[0])
udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3)
require.NoError(t, err)
d2, err := NewDiscoveryV5(host2, host2.Addrs(), prvKey2, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()}))
l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
d2, err := NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()}))
require.NoError(t, err)
// H3
host3, _, prvKey3 := createHost(t)
ip3, _ := extractIP(host3.Addrs()[0])
udpPort3, err := tests.FindFreePort(t, "127.0.0.1", 3)
require.NoError(t, err)
d3, err := NewDiscoveryV5(host3, host3.Addrs(), prvKey3, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()}))
l3, err := newLocalnode(prvKey3, ip3, udpPort3, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
d3, err := NewDiscoveryV5(host3, prvKey3, l3, utils.Logger(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()}))
require.NoError(t, err)
defer d1.Stop()
@ -75,7 +142,7 @@ func TestDiscV5(t *testing.T) {
err = d3.Start()
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
peerChan, err := d3.FindPeers(ctx, "", discovery.Limit(2))

236
waku/v2/node/localnode.go Normal file
View File

@ -0,0 +1,236 @@
package node
import (
"crypto/ecdsa"
"encoding/binary"
"errors"
"fmt"
"math"
"net"
"strconv"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/logging"
"github.com/status-im/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
func (w *WakuNode) newLocalnode(priv *ecdsa.PrivateKey, wsAddr []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) {
db, err := enode.OpenDB("")
if err != nil {
return nil, err
}
localnode := enode.NewLocalNode(db, priv)
localnode.SetFallbackUDP(udpPort)
localnode.Set(enr.WithEntry(utils.WakuENRField, wakuFlags))
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
localnode.SetStaticIP(ipAddr.IP)
if udpPort > 0 && udpPort <= math.MaxUint16 {
localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion]
}
if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 {
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion]
} else {
log.Error("setting tcpPort", zap.Int("port", ipAddr.Port))
}
if advertiseAddr != nil {
localnode.SetStaticIP(*advertiseAddr)
}
// Adding websocket multiaddresses
var fieldRaw []byte
for _, addr := range wsAddr {
p2p, err := addr.ValueForProtocol(ma.P_P2P)
if err != nil {
return nil, err
}
p2pAddr, err := ma.NewMultiaddr("/p2p/" + p2p)
if err != nil {
return nil, fmt.Errorf("could not create p2p addr: %w", err)
}
maRaw := addr.Decapsulate(p2pAddr).Bytes()
maSize := make([]byte, 2)
binary.BigEndian.PutUint16(maSize, uint16(len(maRaw)))
fieldRaw = append(fieldRaw, maSize...)
fieldRaw = append(fieldRaw, maRaw...)
}
if len(fieldRaw) != 0 {
localnode.Set(enr.WithEntry(utils.MultiaddrENRField, fieldRaw))
}
return localnode, nil
}
func isPrivate(addr candidateAddr) bool {
return addr.ip.IP.IsPrivate()
}
func isExternal(addr candidateAddr) bool {
return !isPrivate(addr) && !addr.ip.IP.IsLoopback() && !addr.ip.IP.IsUnspecified()
}
func isLoopback(addr candidateAddr) bool {
return addr.ip.IP.IsLoopback()
}
func filterIP(ss []candidateAddr, fn func(candidateAddr) bool) (ret []candidateAddr) {
for _, s := range ss {
if fn(s) {
ret = append(ret, s)
}
}
return
}
type candidateAddr struct {
ip *net.TCPAddr
maddr ma.Multiaddr
}
func extractIP(addr ma.Multiaddr) (*net.TCPAddr, error) {
var ipStr string
dns4, err := addr.ValueForProtocol(ma.P_DNS4)
if err != nil {
ipStr, err = addr.ValueForProtocol(ma.P_IP4)
if err != nil {
return nil, err
}
} else {
netIP, err := net.ResolveIPAddr("ip4", dns4)
if err != nil {
return nil, err
}
ipStr = netIP.String()
}
portStr, err := addr.ValueForProtocol(ma.P_TCP)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}
return &net.TCPAddr{
IP: net.ParseIP(ipStr),
Port: port,
}, nil
}
func selectMostExternalAddress(addresses []ma.Multiaddr) (ma.Multiaddr, *net.TCPAddr, error) {
var ipAddrs []candidateAddr
for _, addr := range addresses {
ipAddr, err := extractIP(addr)
if err != nil {
continue
}
ipAddrs = append(ipAddrs, candidateAddr{
ip: ipAddr,
maddr: addr,
})
}
externalIPs := filterIP(ipAddrs, isExternal)
if len(externalIPs) > 0 {
return externalIPs[0].maddr, externalIPs[0].ip, nil
}
privateIPs := filterIP(ipAddrs, isPrivate)
if len(privateIPs) > 0 {
return privateIPs[0].maddr, privateIPs[0].ip, nil
}
loopback := filterIP(ipAddrs, isLoopback)
if len(loopback) > 0 {
return loopback[0].maddr, loopback[0].ip, nil
}
return nil, nil, errors.New("could not obtain ip address")
}
func selectWSListenAddress(addresses []ma.Multiaddr, extAddr ma.Multiaddr) ([]ma.Multiaddr, error) {
extAddrDNS, err := extAddr.ValueForProtocol(ma.P_DNS4)
var extAddrIP string
if err != nil {
extAddrIP, err = extAddr.ValueForProtocol(ma.P_IP4)
if err != nil {
return nil, err
}
}
var result []ma.Multiaddr
for _, addr := range addresses {
// Filter addresses that match the extAddr
if extAddrDNS != "" {
dns4, err := addr.ValueForProtocol(ma.P_DNS4)
if err != nil {
continue
}
if dns4 != extAddrDNS {
continue
}
} else {
ip4, err := addr.ValueForProtocol(ma.P_IP4)
if err != nil {
continue
}
if ip4 != extAddrIP {
continue
}
}
_, err := addr.ValueForProtocol(ma.P_WS)
if err == nil {
result = append(result, addr)
}
_, err = addr.ValueForProtocol(ma.P_WSS)
if err == nil {
result = append(result, addr)
}
}
return result, nil
}
func (w *WakuNode) setupENR(addrs []ma.Multiaddr) error {
extAddr, ipAddr, err := selectMostExternalAddress(addrs)
if err != nil {
w.log.Error("obtaining external address", zap.Error(err))
return err
}
wsAddresses, err := selectWSListenAddress(addrs, extAddr)
if err != nil {
w.log.Error("obtaining websocket addresses", zap.Error(err))
return err
}
// TODO: make this optional depending on DNS Disc being enabled
if w.opts.privKey != nil {
localNode, err := w.newLocalnode(w.opts.privKey, wsAddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddr, w.log)
if err != nil {
w.log.Error("obtaining ENR record from multiaddress", logging.MultiAddrs("multiaddr", extAddr), zap.Error(err))
return err
} else {
if w.localNode == nil || w.localNode.Node().String() != localNode.Node().String() {
w.localNode = localNode
w.log.Info("enr record", logging.ENode("enr", w.localNode.Node()))
}
}
}
return nil
}

View File

@ -5,10 +5,11 @@ import (
"errors"
"fmt"
"net"
"strconv"
"sync"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p"
"go.uber.org/zap"
@ -59,6 +60,8 @@ type WakuNode struct {
swap *swap.WakuSwap
wakuFlag utils.WakuEnrBitfield
localNode *enode.LocalNode
addrChan chan ma.Multiaddr
discoveryV5 *discv5.DiscoveryV5
@ -93,19 +96,24 @@ func defaultStoreFactory(w *WakuNode) store.Store {
func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
params := new(WakuNodeParameters)
ctx, cancel := context.WithCancel(ctx)
params.libP2POpts = DefaultLibP2POptions
opts = append(DefaultWakuNodeOptions, opts...)
for _, opt := range opts {
err := opt(params)
if err != nil {
cancel()
return nil, err
}
}
if params.privKey == nil {
prvKey, err := crypto.GenerateKey()
if err != nil {
return nil, err
}
params.privKey = prvKey
}
if params.enableWSS {
params.libP2POpts = append(params.libP2POpts, libp2p.Transport(ws.New, ws.WithTLSConfig(params.tlsConfig)))
} else if params.enableWS {
@ -116,7 +124,6 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
if params.hostAddr == nil {
err := WithHostAddress(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})(params)
if err != nil {
cancel()
return nil, err
}
}
@ -124,9 +131,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
params.libP2POpts = append(params.libP2POpts, libp2p.ListenAddrs(params.multiAddr...))
}
if params.privKey != nil {
params.libP2POpts = append(params.libP2POpts, params.Identity())
}
params.libP2POpts = append(params.libP2POpts, params.Identity())
if params.addressFactory != nil {
params.libP2POpts = append(params.libP2POpts, libp2p.AddrsFactory(params.addressFactory))
@ -134,10 +139,11 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
host, err := libp2p.New(params.libP2POpts...)
if err != nil {
cancel()
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
w := new(WakuNode)
w.bcaster = v2.NewBroadcaster(1024)
w.host = host
@ -191,52 +197,8 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
func (w *WakuNode) onAddrChange() {
for m := range w.addrChan {
ipStr, err := m.ValueForProtocol(ma.P_IP4)
if err != nil {
w.log.Error("extracting ip from ma", logging.MultiAddrs("ma", m), zap.Error(err))
continue
}
portStr, err := m.ValueForProtocol(ma.P_TCP)
if err != nil {
w.log.Error("extracting port from ma", logging.MultiAddrs("ma", m), zap.Error(err))
continue
}
port, err := strconv.Atoi(portStr)
if err != nil {
w.log.Error("converting port to int", zap.Error(err))
continue
}
addr := &net.TCPAddr{
IP: net.ParseIP(ipStr),
Port: port,
}
if !addr.IP.IsLoopback() && !addr.IP.IsUnspecified() {
if w.opts.enableDiscV5 {
err := w.discoveryV5.UpdateAddr(addr)
if err != nil {
w.log.Error("updating DiscV5 address with IP", zap.Stringer("address", addr), zap.Error(err))
continue
}
}
}
}
}
func (w *WakuNode) logAddress(addr ma.Multiaddr) {
logger := w.log.With(logging.MultiAddrs("multiaddr", addr))
// TODO: make this optional depending on DNS Disc being enabled
if w.opts.privKey != nil {
enr, ip, err := utils.GetENRandIP(addr, w.wakuFlag, w.opts.privKey)
if err != nil {
logger.Error("obtaining ENR record from multiaddress", zap.Error(err))
} else {
logger.Info("listening", logging.ENode("enr", enr), zap.Stringer("ip", ip))
}
_ = m
// TODO: determine if still needed. Otherwise remove
}
}
@ -251,29 +213,27 @@ func (w *WakuNode) checkForAddressChanges() {
case <-w.quit:
return
case <-first:
for _, addr := range addrs {
w.logAddress(addr)
}
w.log.Info("listening", logging.MultiAddrs("multiaddr", addrs...))
case <-w.addressChangesSub.Out():
newAddrs := w.ListenAddresses()
print := false
diff := false
if len(addrs) != len(newAddrs) {
print = true
diff = true
} else {
for i := range newAddrs {
if addrs[i].String() != newAddrs[i].String() {
print = true
diff = true
break
}
}
}
if print {
if diff {
addrs = newAddrs
w.log.Warn("Change in host multiaddresses")
for _, addr := range newAddrs {
w.log.Info("listening addresses update received", logging.MultiAddrs("multiaddr", addrs...))
for _, addr := range addrs {
w.addrChan <- addr
w.logAddress(addr)
}
_ = w.setupENR(addrs)
}
}
}
@ -306,6 +266,11 @@ func (w *WakuNode) Start() error {
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(rendezvous, w.opts.rendezvousOpts...))
}
err := w.setupENR(w.ListenAddresses())
if err != nil {
return err
}
if w.opts.enableDiscV5 {
err := w.mountDiscV5()
if err != nil {
@ -317,7 +282,7 @@ func (w *WakuNode) Start() error {
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.discoveryV5, w.opts.discV5Opts...))
}
err := w.mountRelay(w.opts.minRelayPeersToPublish, w.opts.wOpts...)
err = w.mountRelay(w.opts.minRelayPeersToPublish, w.opts.wOpts...)
if err != nil {
return err
}
@ -401,6 +366,11 @@ func (w *WakuNode) ListenAddresses() []ma.Multiaddr {
return result
}
// ENR returns the ENR address of the node
func (w *WakuNode) ENR() *enode.Node {
return w.localNode.Node()
}
// Relay is used to access any operation related to Waku Relay protocol
func (w *WakuNode) Relay() *relay.WakuRelay {
return w.relay
@ -492,7 +462,7 @@ func (w *WakuNode) mountDiscV5() error {
}
var err error
w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.ListenAddresses(), w.opts.privKey, w.wakuFlag, w.log, discV5Options...)
w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.opts.privKey, w.localNode, w.log, discV5Options...)
return err
}

View File

@ -94,6 +94,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
_, err := wakuLP.relay.PublishToTopic(wakuLP.ctx, message, pubSubTopic)
if err != nil {
logger.Error("publishing message", zap.Error(err))
response.IsSuccess = false
response.Info = "Could not publish message"
} else {

View File

@ -1,59 +1,105 @@
package rpc
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"reflect"
"strings"
"github.com/gorilla/rpc/v2"
"github.com/gorilla/rpc/v2/json"
"golang.org/x/text/cases"
"golang.org/x/text/language"
)
// Based on github.com/gorilla/rpc/v2/json which is governed by a BSD-style license
var null = json.RawMessage([]byte("null"))
// An Error is a wrapper for a JSON interface value. It can be used by either
// a service's handler func to write more complex JSON data to an error field
// of a server's response, or by a client to read it.
type Error struct {
Data interface{}
}
func (e *Error) Error() string {
return fmt.Sprintf("%v", e.Data)
}
// ----------------------------------------------------------------------------
// Request and Response
// ----------------------------------------------------------------------------
// serverRequest represents a JSON-RPC request received by the server.
type serverRequest struct {
// A String containing the name of the method to be invoked.
Method string `json:"method"`
// An Array of objects to pass as arguments to the method.
Params *json.RawMessage `json:"params"`
// The request id. This can be of any type. It is used to match the
// response with the request that it is replying to.
Id *json.RawMessage `json:"id"`
}
// serverResponse represents a JSON-RPC response returned by the server.
type serverResponse struct {
// The Object that was returned by the invoked method. This must be null
// in case there was an error invoking the method.
Result interface{} `json:"result"`
// An Error object if there was an error invoking the method. It must be
// null if there was no error.
Error interface{} `json:"error"`
// This must be the same id as the request it is responding to.
Id *json.RawMessage `json:"id"`
}
// ----------------------------------------------------------------------------
// Codec
// ----------------------------------------------------------------------------
// NewCodec returns a new SnakeCaseCodec Codec.
func NewSnakeCaseCodec() *SnakeCaseCodec {
return &SnakeCaseCodec{}
}
// SnakeCaseCodec creates a CodecRequest to process each request.
type SnakeCaseCodec struct {
}
// NewSnakeCaseCodec returns a new SnakeCaseCodec.
func NewSnakeCaseCodec() *SnakeCaseCodec {
return &SnakeCaseCodec{}
}
// NewRequest returns a new CodecRequest of type SnakeCaseCodecRequest.
// NewRequest returns a CodecRequest.
func (c *SnakeCaseCodec) NewRequest(r *http.Request) rpc.CodecRequest {
outerCR := &SnakeCaseCodecRequest{} // Our custom CR
jsonC := json.NewCodec() // json Codec to create json CR
innerCR := jsonC.NewRequest(r) // create the json CR, sort of.
// NOTE - innerCR is of the interface type rpc.CodecRequest.
// Because innerCR is of the rpc.CR interface type, we need a
// type assertion in order to assign it to our struct field's type.
// We defined the source of the interface implementation here, so
// we can be confident that innerCR will be of the correct underlying type
outerCR.CodecRequest = innerCR.(*json.CodecRequest)
return outerCR
return newCodecRequest(r)
}
// SnakeCaseCodecRequest decodes and encodes a single request. SnakeCaseCodecRequest
// implements gorilla/rpc.CodecRequest interface primarily by embedding
// the CodecRequest from gorilla/rpc/json. By selectively adding
// CodecRequest methods to SnakeCaseCodecRequest, we can modify that behaviour
// while maintaining all the other remaining CodecRequest methods from
// gorilla's rpc/json implementation
type SnakeCaseCodecRequest struct {
*json.CodecRequest
// ----------------------------------------------------------------------------
// CodecRequest
// ----------------------------------------------------------------------------
// newCodecRequest returns a new CodecRequest.
func newCodecRequest(r *http.Request) rpc.CodecRequest {
// Decode the request body and check if RPC method is valid.
req := new(serverRequest)
err := json.NewDecoder(r.Body).Decode(req)
r.Body.Close()
return &CodecRequest{request: req, err: err}
}
// Method returns the decoded method as a string of the form "Service.Method"
// after checking for, and correcting a lowercase method name
// By being of lower depth in the struct , Method will replace the implementation
// of Method() on the embedded CodecRequest. Because the request data is part
// of the embedded json.CodecRequest, and unexported, we have to get the
// requested method name via the embedded CR's own method Method().
// Essentially, this just intercepts the return value from the embedded
// gorilla/rpc/json.CodecRequest.Method(), checks/modifies it, and passes it
// on to the calling rpc server.
func (c *SnakeCaseCodecRequest) Method() (string, error) {
m, err := c.CodecRequest.Method()
return toWakuMethod(m), err
// CodecRequest decodes and encodes a single request.
type CodecRequest struct {
request *serverRequest
err error
}
// Method returns the RPC method for the current request.
//
// The method uses a dotted notation as in "Service.Method".
func (c *CodecRequest) Method() (string, error) {
if c.err == nil {
return toWakuMethod(c.request.Method), nil
}
return "", c.err
}
// toWakuMethod transform get_waku_v2_debug_v1_info to Debug.GetV1Info
@ -69,10 +115,81 @@ func toWakuMethod(input string) string {
cleanedInput := strings.Replace(input, base, "", 1)
splited := strings.Split(cleanedInput, "_")
method := strings.Title(typ)
c := cases.Title(language.AmericanEnglish)
method := c.String(typ)
for _, val := range splited[1:] {
method = method + strings.Title(val)
method = method + c.String(val)
}
return strings.Title(splited[0]) + "." + method
return c.String(splited[0]) + "." + method
}
// ReadRequest fills the request object for the RPC method.
func (c *CodecRequest) ReadRequest(args interface{}) error {
if c.err == nil {
if c.request.Params != nil {
// JSON params is array value. RPC params is struct.
// Attempt to unmarshal into array containing the request struct.
params := [1]interface{}{args}
err := json.Unmarshal(*c.request.Params, &params)
if err != nil {
// This failed so we might have received an array of parameters
// instead of a object
argsValueOf := reflect.Indirect(reflect.ValueOf(args))
if argsValueOf.Kind() == reflect.Struct {
var params []interface{}
for i := 0; i < argsValueOf.NumField(); i++ {
params = append(params, argsValueOf.Field(i).Addr().Interface())
}
c.err = json.Unmarshal(*c.request.Params, &params)
} else {
// Unknown field type...
c.err = err
}
}
} else {
c.err = errors.New("rpc: method request ill-formed: missing params field")
}
}
return c.err
}
// WriteResponse encodes the response and writes it to the ResponseWriter.
func (c *CodecRequest) WriteResponse(w http.ResponseWriter, reply interface{}) {
if c.request.Id != nil {
// Id is null for notifications and they don't have a response.
res := &serverResponse{
Result: reply,
Error: &null,
Id: c.request.Id,
}
c.writeServerResponse(w, 200, res)
}
}
func (c *CodecRequest) WriteError(w http.ResponseWriter, _ int, err error) {
res := &serverResponse{
Result: &null,
Id: c.request.Id,
}
if jsonErr, ok := err.(*Error); ok {
res.Error = jsonErr.Data
} else {
res.Error = err.Error()
}
c.writeServerResponse(w, 400, res)
}
func (c *CodecRequest) writeServerResponse(w http.ResponseWriter, status int, res *serverResponse) {
b, err := json.Marshal(res)
if err == nil {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(status)
w.Write(b)
} else {
// Not sure in which case will this happen. But seems harmless.
rpc.WriteError(w, 400, err.Error())
}
}

View File

@ -14,11 +14,15 @@ type InfoArgs struct {
}
type InfoReply struct {
Version string `json:"version,omitempty"`
ENRUri string `json:"enrUri,omitempty"`
ListenAddresses []string `json:"listenAddresses,omitempty"`
}
func (d *DebugService) GetV1Info(r *http.Request, args *InfoArgs, reply *InfoReply) error {
reply.Version = "2.0"
reply.ENRUri = d.node.ENR().String()
for _, addr := range d.node.ListenAddresses() {
reply.ListenAddresses = append(reply.ListenAddresses, addr.String())
}
return nil
}

View File

@ -2,9 +2,11 @@ package rpc
import (
"bytes"
"context"
"net/http"
"testing"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/stretchr/testify/require"
)
@ -14,9 +16,16 @@ func TestGetV1Info(t *testing.T) {
request, err := http.NewRequest(http.MethodPost, "url", bytes.NewReader([]byte("")))
require.NoError(t, err)
d := &DebugService{nil}
wakuNode1, err := node.New(context.Background())
require.NoError(t, err)
defer wakuNode1.Stop()
err = wakuNode1.Start()
require.NoError(t, err)
d := &DebugService{
node: wakuNode1,
}
err = d.GetV1Info(request, &InfoArgs{}, &reply)
require.NoError(t, err)
require.Equal(t, "2.0", reply.Version)
}

View File

@ -119,7 +119,10 @@ func (f *FilterService) GetV1Messages(req *http.Request, args *ContentTopicArgs,
return fmt.Errorf("topic %s not subscribed", args.ContentTopic)
}
reply.Messages = f.messages[args.ContentTopic]
for i := range f.messages[args.ContentTopic] {
*reply = append(*reply, ProtoWakuMessageToRPCWakuMessage(f.messages[args.ContentTopic][i]))
}
f.messages[args.ContentTopic] = make([]*pb.WakuMessage, 0)
return nil
}

View File

@ -127,20 +127,21 @@ func TestFilterGetV1Messages(t *testing.T) {
// Wait for the message to be received
time.Sleep(1 * time.Second)
var messagesReply MessagesReply
var messagesReply1 MessagesReply
err = serviceB.GetV1Messages(
makeRequest(t),
&ContentTopicArgs{"ct"},
&messagesReply,
&messagesReply1,
)
require.NoError(t, err)
require.Len(t, messagesReply.Messages, 1)
require.Len(t, messagesReply1, 1)
var messagesReply2 MessagesReply
err = serviceB.GetV1Messages(
makeRequest(t),
&ContentTopicArgs{"ct"},
&messagesReply,
&messagesReply2,
)
require.NoError(t, err)
require.Len(t, messagesReply.Messages, 0)
require.Len(t, messagesReply2, 0)
}

View File

@ -3,14 +3,17 @@ package rpc
import (
"crypto/ecdsa"
"crypto/rand"
"encoding/hex"
"fmt"
"net/http"
"strings"
"sync"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
@ -18,31 +21,28 @@ type PrivateService struct {
node *node.WakuNode
log *zap.Logger
symmetricMessages map[string][]*pb.WakuMessage
symmetricMessagesMutex sync.RWMutex
messages map[string][]*pb.WakuMessage
messagesMutex sync.RWMutex
asymmetricMessages map[string][]*pb.WakuMessage
asymmetricMessagesMutex sync.RWMutex
runner *runnerService
}
type SymmetricKeyReply struct {
Key string `json:"key"`
}
type SymmetricKeyReply string
type KeyPairReply struct {
PrivateKey string `json:"privateKey"`
PulicKey string `json:"publicKey"`
PublicKey string `json:"publicKey"`
}
type SymmetricMessageArgs struct {
Topic string `json:"topic"`
Message pb.WakuMessage `json:"message"`
Message RPCWakuMessage `json:"message"`
SymKey string `json:"symkey"`
}
type AsymmetricMessageArgs struct {
Topic string `json:"topic"`
Message pb.WakuMessage `json:"message"`
Message RPCWakuMessage `json:"message"`
PublicKey string `json:"publicKey"`
}
@ -57,12 +57,24 @@ type AsymmetricMessagesArgs struct {
}
func NewPrivateService(node *node.WakuNode, log *zap.Logger) *PrivateService {
return &PrivateService{
node: node,
symmetricMessages: make(map[string][]*pb.WakuMessage),
asymmetricMessages: make(map[string][]*pb.WakuMessage),
log: log.Named("private"),
p := &PrivateService{
node: node,
messages: make(map[string][]*pb.WakuMessage),
log: log.Named("private"),
}
p.runner = newRunnerService(node.Broadcaster(), p.addEnvelope)
return p
}
func (p *PrivateService) addEnvelope(envelope *protocol.Envelope) {
p.messagesMutex.Lock()
defer p.messagesMutex.Unlock()
if _, ok := p.messages[envelope.PubsubTopic()]; !ok {
p.messages[envelope.PubsubTopic()] = make([]*pb.WakuMessage, 0)
}
p.messages[envelope.PubsubTopic()] = append(p.messages[envelope.PubsubTopic()], envelope.Message())
}
func (p *PrivateService) GetV1SymmetricKey(req *http.Request, args *Empty, reply *SymmetricKeyReply) error {
@ -71,7 +83,7 @@ func (p *PrivateService) GetV1SymmetricKey(req *http.Request, args *Empty, reply
if err != nil {
return err
}
reply.Key = hex.EncodeToString(key[:])
*reply = SymmetricKeyReply(hexutil.Encode(key[:]))
return nil
}
@ -89,44 +101,52 @@ func (p *PrivateService) GetV1AsymmetricKeypair(req *http.Request, args *Empty,
}
publicKeyBytes := crypto.FromECDSAPub(publicKeyECDSA)
reply.PrivateKey = hex.EncodeToString(privateKeyBytes[:])
reply.PulicKey = hex.EncodeToString(publicKeyBytes[:])
reply.PrivateKey = hexutil.Encode(privateKeyBytes[:])
reply.PublicKey = hexutil.Encode(publicKeyBytes[:])
return nil
}
func (p *PrivateService) PostV1SymmetricMessage(req *http.Request, args *SymmetricMessageArgs, reply *SuccessReply) error {
symKeyBytes, err := hexutil.Decode(args.SymKey)
if err != nil {
return fmt.Errorf("invalid symmetric key: %w", err)
}
keyInfo := new(node.KeyInfo)
keyInfo.Kind = node.Symmetric
keyInfo.SymKey = []byte(args.SymKey)
keyInfo.SymKey = symKeyBytes
err := node.EncodeWakuMessage(&args.Message, keyInfo)
if err != nil {
reply.Error = err.Error()
reply.Success = false
return nil
}
err = p.node.Publish(req.Context(), &args.Message)
msg := args.Message.toProto()
msg.Version = 1
err = node.EncodeWakuMessage(msg, keyInfo)
if err != nil {
reply.Error = err.Error()
reply.Success = false
return nil
}
p.symmetricMessagesMutex.Lock()
defer p.symmetricMessagesMutex.Unlock()
if _, ok := p.symmetricMessages[args.Topic]; !ok {
p.symmetricMessages[args.Topic] = make([]*pb.WakuMessage, 0)
topic := args.Topic
if topic == "" {
topic = relay.DefaultWakuTopic
}
_, err = p.node.Relay().PublishToTopic(req.Context(), msg, topic)
if err != nil {
reply.Error = err.Error()
reply.Success = false
return nil
}
p.symmetricMessages[args.Topic] = append(p.symmetricMessages[args.Topic], &args.Message)
reply.Success = true
return nil
}
func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *AsymmetricMessageArgs, reply *SuccessReply) error {
func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *AsymmetricMessageArgs, reply *bool) error {
keyInfo := new(node.KeyInfo)
keyInfo.Kind = node.Asymmetric
pubKeyBytes, err := hex.DecodeString(args.PublicKey)
pubKeyBytes, err := hexutil.Decode(args.PublicKey)
if err != nil {
return fmt.Errorf("public key cannot be decoded: %v", err)
}
@ -135,54 +155,108 @@ func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *Asymme
if err != nil {
return fmt.Errorf("public key cannot be unmarshalled: %v", err)
}
keyInfo.PubKey = *pubKey
err = node.EncodeWakuMessage(&args.Message, keyInfo)
msg := args.Message.toProto()
msg.Version = 1
err = node.EncodeWakuMessage(msg, keyInfo)
if err != nil {
reply.Error = err.Error()
reply.Success = false
return nil
}
err = p.node.Publish(req.Context(), &args.Message)
if err != nil {
reply.Error = err.Error()
reply.Success = false
return nil
return err
}
p.asymmetricMessagesMutex.Lock()
defer p.asymmetricMessagesMutex.Unlock()
if _, ok := p.asymmetricMessages[args.Topic]; !ok {
p.asymmetricMessages[args.Topic] = make([]*pb.WakuMessage, 0)
topic := args.Topic
if topic == "" {
topic = relay.DefaultWakuTopic
}
p.asymmetricMessages[args.Topic] = append(p.asymmetricMessages[args.Topic], &args.Message)
reply.Success = true
_, err = p.node.Relay().PublishToTopic(req.Context(), msg, topic)
if err != nil {
return err
}
*reply = true
return nil
}
func (p *PrivateService) GetV1SymmetricMessages(req *http.Request, args *SymmetricMessagesArgs, reply *MessagesReply) error {
p.symmetricMessagesMutex.Lock()
defer p.symmetricMessagesMutex.Unlock()
p.messagesMutex.Lock()
defer p.messagesMutex.Unlock()
if _, ok := p.symmetricMessages[args.Topic]; !ok {
return fmt.Errorf("topic %s not subscribed", args.Topic)
if _, ok := p.messages[args.Topic]; !ok {
p.messages[args.Topic] = make([]*pb.WakuMessage, 0)
}
symKeyBytes, err := hexutil.Decode(args.SymKey)
if err != nil {
return fmt.Errorf("invalid symmetric key: %w", err)
}
messages := make([]*pb.WakuMessage, len(p.messages[args.Topic]))
copy(messages, p.messages[args.Topic])
p.messages[args.Topic] = make([]*pb.WakuMessage, 0)
var decodedMessages []*pb.WakuMessage
for _, msg := range messages {
err := node.DecodeWakuMessage(msg, &node.KeyInfo{
Kind: node.Symmetric,
SymKey: symKeyBytes,
})
if err != nil {
continue
}
decodedMessages = append(decodedMessages, msg)
}
for i := range decodedMessages {
*reply = append(*reply, ProtoWakuMessageToRPCWakuMessage(decodedMessages[i]))
}
reply.Messages = p.symmetricMessages[args.Topic]
p.symmetricMessages[args.Topic] = make([]*pb.WakuMessage, 0)
return nil
}
func (p *PrivateService) GetV1AsymmetricMessages(req *http.Request, args *AsymmetricMessagesArgs, reply *MessagesReply) error {
p.asymmetricMessagesMutex.Lock()
defer p.asymmetricMessagesMutex.Unlock()
p.messagesMutex.Lock()
defer p.messagesMutex.Unlock()
if _, ok := p.asymmetricMessages[args.Topic]; !ok {
return fmt.Errorf("topic %s not subscribed", args.Topic)
if _, ok := p.messages[args.Topic]; !ok {
p.messages[args.Topic] = make([]*pb.WakuMessage, 0)
}
messages := make([]*pb.WakuMessage, len(p.messages[args.Topic]))
copy(messages, p.messages[args.Topic])
p.messages[args.Topic] = make([]*pb.WakuMessage, 0)
privKey, err := crypto.HexToECDSA(strings.TrimPrefix(args.PrivateKey, "0x"))
if err != nil {
return fmt.Errorf("invalid asymmetric key: %w", err)
}
var decodedMessages []*pb.WakuMessage
for _, msg := range messages {
err := node.DecodeWakuMessage(msg, &node.KeyInfo{
Kind: node.Asymmetric,
PrivKey: privKey,
})
if err != nil {
continue
}
decodedMessages = append(decodedMessages, msg)
}
for i := range decodedMessages {
*reply = append(*reply, ProtoWakuMessageToRPCWakuMessage(decodedMessages[i]))
}
reply.Messages = p.asymmetricMessages[args.Topic]
p.asymmetricMessages[args.Topic] = make([]*pb.WakuMessage, 0)
return nil
}
func (p *PrivateService) Start() {
p.runner.Start()
}
func (p *PrivateService) Stop() {
p.runner.Stop()
}

View File

@ -3,9 +3,11 @@ package rpc
import (
"context"
"testing"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/stretchr/testify/require"
)
@ -30,7 +32,7 @@ func TestGetV1SymmetricKey(t *testing.T) {
&reply,
)
require.NoError(t, err)
require.NotEmpty(t, reply.Key)
require.NotEmpty(t, reply)
}
func TestGetV1AsymmetricKey(t *testing.T) {
@ -44,7 +46,7 @@ func TestGetV1AsymmetricKey(t *testing.T) {
&reply,
)
require.NoError(t, err)
require.NotEmpty(t, reply.PulicKey)
require.NotEmpty(t, reply.PublicKey)
require.NotEmpty(t, reply.PrivateKey)
}
@ -57,8 +59,8 @@ func TestPostV1SymmetricMessage(t *testing.T) {
makeRequest(t),
&SymmetricMessageArgs{
Topic: "test",
Message: pb.WakuMessage{Payload: []byte("test")},
SymKey: "abc",
Message: RPCWakuMessage{Payload: []byte("test")},
SymKey: "0x1122334455667788991011223344556677889910112233445566778899101122",
},
&reply,
)
@ -70,73 +72,94 @@ func TestPostV1AsymmetricMessage(t *testing.T) {
d := makePrivateService(t)
defer d.node.Stop()
var reply SuccessReply
var reply bool
err := d.PostV1AsymmetricMessage(
makeRequest(t),
&AsymmetricMessageArgs{
Topic: "test",
Message: pb.WakuMessage{Payload: []byte("test")},
PublicKey: "045ded6a56c88173e87a88c55b96956964b1bd3351b5fcb70950a4902fbc1bc0ceabb0ac846c3a4b8f2f6024c0e19f0a7f6a4865035187de5463f34012304fc7c5",
Message: RPCWakuMessage{Payload: []byte("test")},
PublicKey: "0x045ded6a56c88173e87a88c55b96956964b1bd3351b5fcb70950a4902fbc1bc0ceabb0ac846c3a4b8f2f6024c0e19f0a7f6a4865035187de5463f34012304fc7c5",
},
&reply,
)
require.NoError(t, err)
require.True(t, reply.Success)
require.True(t, reply)
}
func TestGetV1SymmetricMessages(t *testing.T) {
d := makePrivateService(t)
go d.Start()
defer d.node.Stop()
// Subscribing topic to test getter
_, err := d.node.Relay().SubscribeToTopic(context.TODO(), "test")
require.NoError(t, err)
var reply SuccessReply
err := d.PostV1SymmetricMessage(
err = d.PostV1SymmetricMessage(
makeRequest(t),
&SymmetricMessageArgs{
Topic: "test",
Message: pb.WakuMessage{Payload: []byte("test")},
SymKey: "abc",
Message: RPCWakuMessage{Payload: []byte("test")},
SymKey: "0x1122334455667788991011223344556677889910112233445566778899101122",
},
&reply,
)
require.NoError(t, err)
require.True(t, reply.Success)
time.Sleep(500 * time.Millisecond)
var getReply MessagesReply
err = d.GetV1SymmetricMessages(
makeRequest(t),
&SymmetricMessagesArgs{Topic: "test", SymKey: "abc"},
&SymmetricMessagesArgs{
Topic: "test",
SymKey: "0x1122334455667788991011223344556677889910112233445566778899101122",
},
&getReply,
)
require.NoError(t, err)
require.Len(t, getReply.Messages, 1)
require.Len(t, getReply, 1)
}
func TestGetV1AsymmetricMessages(t *testing.T) {
d := makePrivateService(t)
go d.Start()
defer d.node.Stop()
var reply SuccessReply
err := d.PostV1AsymmetricMessage(
// Subscribing topic to test getter
_, err := d.node.Relay().SubscribeToTopic(context.TODO(), "test")
require.NoError(t, err)
prvKey, err := crypto.GenerateKey()
require.NoError(t, err)
var reply bool
err = d.PostV1AsymmetricMessage(
makeRequest(t),
&AsymmetricMessageArgs{
Topic: "test",
Message: pb.WakuMessage{Payload: []byte("test")},
PublicKey: "045ded6a56c88173e87a88c55b96956964b1bd3351b5fcb70950a4902fbc1bc0ceabb0ac846c3a4b8f2f6024c0e19f0a7f6a4865035187de5463f34012304fc7c5",
Message: RPCWakuMessage{Payload: []byte("test")},
PublicKey: hexutil.Encode(crypto.FromECDSAPub(&prvKey.PublicKey)),
},
&reply,
)
require.NoError(t, err)
require.True(t, reply.Success)
require.True(t, reply)
time.Sleep(500 * time.Millisecond)
var getReply MessagesReply
err = d.GetV1AsymmetricMessages(
makeRequest(t),
&AsymmetricMessagesArgs{
Topic: "test",
PrivateKey: "045ded6a56c88173e87a88c55b96956964b1bd3351b5fcb70950a4902fbc1bc0ceabb0ac846c3a4b8f2f6024c0e19f0a7f6a4865035187de5463f34012304fc7c5",
PrivateKey: hexutil.Encode(crypto.FromECDSA(prvKey)),
},
&getReply,
)
require.NoError(t, err)
require.Len(t, getReply.Messages, 1)
require.Len(t, getReply, 1)
}

View File

@ -8,6 +8,7 @@ import (
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
@ -23,8 +24,8 @@ type RelayService struct {
}
type RelayMessageArgs struct {
Topic string `json:"topic,omitempty"`
Message pb.WakuMessage `json:"message,omitempty"`
Topic string `json:"topic,omitempty"`
Message RPCWakuRelayMessage `json:"message,omitempty"`
}
type TopicsArgs struct {
@ -43,6 +44,7 @@ func NewRelayService(node *node.WakuNode, log *zap.Logger) *RelayService {
}
s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)
return s
}
@ -58,6 +60,12 @@ func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
}
func (r *RelayService) Start() {
// Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
for _, topic := range r.node.Relay().Topics() {
r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic))
r.messages[topic] = make([]*pb.WakuMessage, 0)
}
r.runner.Start()
}
@ -67,10 +75,13 @@ func (r *RelayService) Stop() {
func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error {
var err error
msg := args.Message.toProto()
if args.Topic == "" {
_, err = r.node.Relay().Publish(req.Context(), &args.Message)
_, err = r.node.Relay().Publish(req.Context(), msg)
} else {
_, err = r.node.Relay().PublishToTopic(req.Context(), &args.Message, args.Topic)
_, err = r.node.Relay().PublishToTopic(req.Context(), msg, args.Topic)
}
if err != nil {
r.log.Error("publishing message", zap.Error(err))
@ -87,10 +98,13 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r
for _, topic := range args.Topics {
var err error
if topic == "" {
_, err = r.node.Relay().Subscribe(ctx)
var sub *relay.Subscription
sub, err = r.node.Relay().Subscribe(ctx)
r.node.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C)
} else {
_, err = r.node.Relay().SubscribeToTopic(ctx, topic)
var sub *relay.Subscription
sub, err = r.node.Relay().SubscribeToTopic(ctx, topic)
r.node.Broadcaster().Unregister(&topic, sub.C)
}
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
@ -121,7 +135,7 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs,
return nil
}
func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error {
func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *RelayMessagesReply) error {
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()
@ -129,7 +143,11 @@ func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *
return fmt.Errorf("topic %s not subscribed", args.Topic)
}
reply.Messages = r.messages[args.Topic]
for i := range r.messages[args.Topic] {
*reply = append(*reply, ProtoWakuMessageToRPCWakuRelayMessage(r.messages[args.Topic][i]))
}
r.messages[args.Topic] = make([]*pb.WakuMessage, 0)
return nil
}

View File

@ -8,7 +8,6 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/stretchr/testify/require"
)
@ -97,7 +96,7 @@ func TestRelayGetV1Messages(t *testing.T) {
makeRequest(t),
&RelayMessageArgs{
Topic: "test",
Message: pb.WakuMessage{
Message: RPCWakuRelayMessage{
Payload: []byte("test"),
},
},
@ -109,20 +108,21 @@ func TestRelayGetV1Messages(t *testing.T) {
// Wait for the message to be received
time.Sleep(1 * time.Second)
var messagesReply MessagesReply
var messagesReply1 RelayMessagesReply
err = serviceB.GetV1Messages(
makeRequest(t),
&TopicArgs{"test"},
&messagesReply,
&messagesReply1,
)
require.NoError(t, err)
require.Len(t, messagesReply.Messages, 1)
require.Len(t, messagesReply1, 1)
var messagesReply2 RelayMessagesReply
err = serviceB.GetV1Messages(
makeRequest(t),
&TopicArgs{"test"},
&messagesReply,
&messagesReply2,
)
require.NoError(t, err)
require.Len(t, messagesReply.Messages, 0)
require.Len(t, messagesReply2, 0)
}

View File

@ -1,7 +1,5 @@
package rpc
import "github.com/status-im/go-waku/waku/v2/protocol/pb"
type SuccessReply struct {
Success bool `json:"success,omitempty"`
Error string `json:"error,omitempty"`
@ -10,6 +8,6 @@ type SuccessReply struct {
type Empty struct {
}
type MessagesReply struct {
Messages []*pb.WakuMessage `json:"messages,omitempty"`
}
type MessagesReply []*RPCWakuMessage
type RelayMessagesReply []*RPCWakuRelayMessage

View File

@ -33,7 +33,7 @@ type StoreMessagesArgs struct {
}
type StoreMessagesReply struct {
Messages []*pb.WakuMessage `json:"messages,omitempty"`
Messages []RPCWakuMessage `json:"messages,omitempty"`
PagingInfo StorePagingOptions `json:"pagingInfo,omitempty"`
Error string `json:"error,omitempty"`
}
@ -60,7 +60,12 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs,
reply.Error = err.Error()
return nil
}
reply.Messages = res.Messages
reply.Messages = make([]RPCWakuMessage, len(res.Messages))
for i := range res.Messages {
reply.Messages[i] = *ProtoWakuMessageToRPCWakuMessage(res.Messages[i])
}
reply.PagingInfo = StorePagingOptions{
PageSize: args.PagingOptions.PageSize,
Cursor: res.Cursor(),

135
waku/v2/rpc/utils.go Normal file
View File

@ -0,0 +1,135 @@
package rpc
import (
"encoding/hex"
"fmt"
"strings"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
)
// HexBytes is marshalled to a hex string
type HexBytes []byte
// ByteArray is marshalled to a uint8 array
type ByteArray []byte
type RPCWakuMessage struct {
Payload ByteArray `json:"payload,omitempty"`
ContentTopic string `json:"contentTopic,omitempty"`
Version uint32 `json:"version"`
Timestamp int64 `json:"timestamp,omitempty"`
Proof HexBytes `json:"proof,omitempty"`
}
type RPCWakuRelayMessage struct {
Payload HexBytes `json:"payload,omitempty"`
ContentTopic string `json:"contentTopic,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
Proof HexBytes `json:"proof,omitempty"`
Version uint32 `json:"version"`
}
func ProtoWakuMessageToRPCWakuMessage(input *pb.WakuMessage) *RPCWakuMessage {
if input == nil {
return nil
}
return &RPCWakuMessage{
Payload: input.Payload,
ContentTopic: input.ContentTopic,
Version: input.Version,
Timestamp: input.Timestamp,
Proof: input.Proof,
}
}
func (r *RPCWakuMessage) toProto() *pb.WakuMessage {
if r == nil {
return nil
}
return &pb.WakuMessage{
Payload: r.Payload,
ContentTopic: r.ContentTopic,
Version: r.Version,
Timestamp: r.Timestamp,
Proof: r.Proof,
}
}
func (u HexBytes) MarshalJSON() ([]byte, error) {
var result string
if u == nil {
result = "null"
} else {
result = strings.Join(strings.Fields(fmt.Sprintf("%d", u)), ",")
}
return []byte(result), nil
}
func (h *HexBytes) UnmarshalText(b []byte) error {
hexString := ""
if b != nil {
hexString = string(b)
}
decoded, err := hex.DecodeString(hexString)
if err != nil {
return err
}
*h = decoded
return nil
}
func ProtoWakuMessageToRPCWakuRelayMessage(input *pb.WakuMessage) *RPCWakuRelayMessage {
if input == nil {
return nil
}
return &RPCWakuRelayMessage{
Payload: input.Payload,
ContentTopic: input.ContentTopic,
Timestamp: input.Timestamp,
Proof: input.Proof,
}
}
func (r *RPCWakuRelayMessage) toProto() *pb.WakuMessage {
if r == nil {
return nil
}
return &pb.WakuMessage{
Payload: r.Payload,
ContentTopic: r.ContentTopic,
Timestamp: r.Timestamp,
Proof: r.Proof,
}
}
func (h ByteArray) MarshalText() ([]byte, error) {
if h == nil {
return []byte{}, nil
}
return []byte(hex.EncodeToString(h)), nil
}
func (h *ByteArray) UnmarshalText(b []byte) error {
hexString := ""
if b != nil {
hexString = string(b)
}
decoded, err := hex.DecodeString(hexString)
if err != nil {
return err
}
*h = decoded
return nil
}

View File

@ -88,6 +88,9 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool,
server.RegisterOnShutdown(func() {
filterService.Stop()
relayService.Stop()
if wrpc.privateService != nil {
wrpc.privateService.Stop()
}
})
wrpc.node = node
@ -101,6 +104,9 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool,
func (r *WakuRpc) Start() {
go r.relayService.Start()
go r.filterService.Start()
if r.privateService != nil {
go r.privateService.Start()
}
go func() {
_ = r.server.ListenAndServe()
}()

View File

@ -111,7 +111,7 @@ func GetENRandIP(addr ma.Multiaddr, wakuFlags WakuEnrBitfield, privK *ecdsa.Priv
p2pAddr, err := ma.NewMultiaddr("/p2p/" + p2p)
if err != nil {
return nil, nil, fmt.Errorf("Could not create p2p addr: %w", err)
return nil, nil, fmt.Errorf("could not create p2p addr: %w", err)
}
var fieldRaw []byte