From 3c0c3c4eeb5059f607a062baef34b994f907b856 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 13 Jun 2022 14:30:35 -0400 Subject: [PATCH] fix: js-waku / nwaku interop (#252) --- go.mod | 2 + waku.go | 10 + waku/node.go | 12 +- waku/options.go | 2 + waku/v2/discv5/discover.go | 163 +------------ waku/v2/discv5/discover_test.go | 75 +++++- waku/v2/node/localnode.go | 236 +++++++++++++++++++ waku/v2/node/wakunode2.go | 104 +++----- waku/v2/protocol/lightpush/waku_lightpush.go | 1 + waku/v2/rpc/codec.go | 199 ++++++++++++---- waku/v2/rpc/debug.go | 8 +- waku/v2/rpc/debug_test.go | 13 +- waku/v2/rpc/filter.go | 5 +- waku/v2/rpc/filter_test.go | 11 +- waku/v2/rpc/private.go | 198 +++++++++++----- waku/v2/rpc/private_test.go | 65 +++-- waku/v2/rpc/relay.go | 36 ++- waku/v2/rpc/relay_test.go | 14 +- waku/v2/rpc/rpc_type.go | 8 +- waku/v2/rpc/store.go | 9 +- waku/v2/rpc/utils.go | 135 +++++++++++ waku/v2/rpc/waku_rpc.go | 6 + waku/v2/utils/enr.go | 2 +- 23 files changed, 920 insertions(+), 394 deletions(-) create mode 100644 waku/v2/node/localnode.go create mode 100644 waku/v2/rpc/utils.go diff --git a/go.mod b/go.mod index a5cf7fb2..cd04b6e3 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,8 @@ require ( golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 ) +require golang.org/x/text v0.3.7 + require ( github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/waku.go b/waku.go index 8dad165a..43fd158d 100644 --- a/waku.go +++ b/waku.go @@ -111,6 +111,16 @@ func main() { Usage: "Use SQLiteDB to persist information", Destination: &options.UseDB, }, + &cli.BoolFlag{ + Name: "persist-messages", + Usage: "Enable message persistence", + Destination: &options.Store.PersistMessages, + }, + &cli.StringFlag{ + Name: "nat", + Usage: "TODO - Not implemented yet.", // This was added so js-waku test don't fail + Destination: &options.NAT, + }, &cli.StringFlag{ Name: "db-path", Aliases: []string{"dbpath"}, diff --git a/waku/node.go b/waku/node.go index 911d9eb1..0b023993 100644 --- a/waku/node.go +++ b/waku/node.go @@ -193,10 +193,14 @@ func Execute(options Options) { } if options.Store.Enable { - nodeOpts = append(nodeOpts, node.WithWakuStoreAndRetentionPolicy(options.Store.ShouldResume, options.Store.RetentionMaxDaysDuration(), options.Store.RetentionMaxMessages)) - dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxDaysDuration())) - failOnErr(err, "DBStore") - nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) + if options.Store.PersistMessages { + nodeOpts = append(nodeOpts, node.WithWakuStoreAndRetentionPolicy(options.Store.ShouldResume, options.Store.RetentionMaxDaysDuration(), options.Store.RetentionMaxMessages)) + dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxDaysDuration())) + failOnErr(err, "DBStore") + nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) + } else { + nodeOpts = append(nodeOpts, node.WithWakuStore(false, false)) + } } if options.LightPush.Enable { diff --git a/waku/options.go b/waku/options.go index 94b0d0d9..3f5b6426 100644 --- a/waku/options.go +++ b/waku/options.go @@ -53,6 +53,7 @@ type LightpushOptions struct { // node and provide message history to nodes that ask for it. type StoreOptions struct { Enable bool + PersistMessages bool ShouldResume bool RetentionMaxDays int RetentionMaxMessages int @@ -123,6 +124,7 @@ type Options struct { ShowAddresses bool LogLevel string LogEncoding string + NAT string Websocket WSOptions Relay RelayOptions diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 60d3acaa..9912bc72 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -3,11 +3,8 @@ package discv5 import ( "context" "crypto/ecdsa" - "errors" - "math" "math/rand" "net" - "strconv" "sync" "time" @@ -17,7 +14,6 @@ import ( "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" - ma "github.com/multiformats/go-multiaddr" "github.com/status-im/go-discover/discover" "github.com/status-im/go-waku/logging" "github.com/status-im/go-waku/waku/v2/utils" @@ -43,12 +39,6 @@ type DiscoveryV5 struct { wg *sync.WaitGroup peerCache peerCache - - // Used for those weird cases where updateAddress - // receives the same external address twice both with the original port - // and the nat port. Ideally this attribute should be removed by doing - // hole punching before starting waku - ogTCPPort int } type peerCache struct { @@ -101,7 +91,7 @@ func DefaultOptions() []DiscoveryV5Option { } } -func NewDiscoveryV5(host host.Host, addresses []ma.Multiaddr, priv *ecdsa.PrivateKey, wakuFlags utils.WakuEnrBitfield, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { +func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.LocalNode, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { params := new(discV5Parameters) optList := DefaultOptions() optList = append(optList, opts...) @@ -111,16 +101,6 @@ func NewDiscoveryV5(host host.Host, addresses []ma.Multiaddr, priv *ecdsa.Privat logger := log.Named("discv5") - ipAddr, err := selectIPAddr(addresses) - if err != nil { - return nil, err - } - - localnode, err := newLocalnode(priv, ipAddr, params.udpPort, wakuFlags, params.advertiseAddr, logger) - if err != nil { - return nil, err - } - var NAT nat.Interface = nil if params.advertiseAddr == nil { NAT = nat.Any() @@ -151,41 +131,10 @@ func NewDiscoveryV5(host host.Host, addresses []ma.Multiaddr, priv *ecdsa.Privat IP: net.IPv4zero, Port: params.udpPort, }, - log: logger, - ogTCPPort: ipAddr.Port, + log: logger, }, nil } -func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) { - db, err := enode.OpenDB("") - if err != nil { - return nil, err - } - localnode := enode.NewLocalNode(db, priv) - localnode.SetFallbackUDP(udpPort) - localnode.Set(enr.WithEntry(utils.WakuENRField, wakuFlags)) - localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) - localnode.SetStaticIP(ipAddr.IP) - - if udpPort > 0 && udpPort <= math.MaxUint16 { - localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] - } else { - log.Error("setting udpPort", zap.Int("port", udpPort)) - } - - if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 { - localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion] - } else { - log.Error("setting tcpPort", zap.Int("port", ipAddr.Port)) - } - - if advertiseAddr != nil { - localnode.SetStaticIP(*advertiseAddr) - } - - return localnode, nil -} - func (d *DiscoveryV5) listen() error { conn, err := net.ListenUDP("udp", d.udpAddr) if err != nil { @@ -193,7 +142,6 @@ func (d *DiscoveryV5) listen() error { } d.udpAddr = conn.LocalAddr().(*net.UDPAddr) - if d.NAT != nil && !d.udpAddr.IP.IsLoopback() { d.wg.Add(1) go func() { @@ -250,34 +198,6 @@ func (d *DiscoveryV5) Stop() { d.wg.Wait() } -func (d *DiscoveryV5) UpdateAddr(addr *net.TCPAddr) error { - if !d.params.autoUpdate { - return nil - } - - d.Lock() - defer d.Unlock() - - // TODO: This code is not elegant and should be improved - - if !isExternal(addr) && !isExternal(&net.TCPAddr{IP: d.localnode.Node().IP()}) { - if !((d.localnode.Node().IP().IsLoopback() && isPrivate(addr)) || (isPrivate(&net.TCPAddr{IP: d.localnode.Node().IP()}) && isExternal(addr))) { - return nil - } - } - - if addr.IP.IsUnspecified() || (d.localnode.Node().IP().Equal(addr.IP) && addr.Port == d.ogTCPPort) { - return nil - } - - d.localnode.SetStaticIP(addr.IP) - d.localnode.Set(enr.TCP(uint16(addr.Port))) // lgtm [go/incorrect-integer-conversion] - d.log.Info("updated Discovery V5 node address", logging.TCPAddr("address", d.localnode.Node().IP(), d.localnode.Node().TCP())) - d.log.Info("Discovery V5", logging.ENode("enr", d.localnode.Node())) - - return nil -} - /* func isWakuNode(node *enode.Node) bool { enrField := new(utils.WakuEnrBitfield) @@ -472,82 +392,3 @@ func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...disco return chPeer, err } - -// IsPrivate reports whether ip is a private address, according to -// RFC 1918 (IPv4 addresses) and RFC 4193 (IPv6 addresses). -// Copied/Adapted from https://go-review.googlesource.com/c/go/+/272668/11/src/net/ip.go -// Copyright (c) The Go Authors. All rights reserved. -// @TODO: once Go 1.17 is released in Q42021, remove this function as it will become part of the language -func isPrivate(ip *net.TCPAddr) bool { - if ip4 := ip.IP.To4(); ip4 != nil { - // Following RFC 4193, Section 3. Local IPv6 Unicast Addresses which says: - // The Internet Assigned Numbers Authority (IANA) has reserved the - // following three blocks of the IPv4 address space for private internets: - // 10.0.0.0 - 10.255.255.255 (10/8 prefix) - // 172.16.0.0 - 172.31.255.255 (172.16/12 prefix) - // 192.168.0.0 - 192.168.255.255 (192.168/16 prefix) - return ip4[0] == 10 || - (ip4[0] == 172 && ip4[1]&0xf0 == 16) || - (ip4[0] == 192 && ip4[1] == 168) - } - // Following RFC 4193, Section 3. Private Address Space which says: - // The Internet Assigned Numbers Authority (IANA) has reserved the - // following block of the IPv6 address space for local internets: - // FC00:: - FDFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF (FC00::/7 prefix) - return len(ip.IP) == net.IPv6len && ip.IP[0]&0xfe == 0xfc -} - -func isExternal(ip *net.TCPAddr) bool { - return !isPrivate(ip) && !ip.IP.IsLoopback() && !ip.IP.IsUnspecified() -} - -func isLoopback(ip *net.TCPAddr) bool { - return ip.IP.IsLoopback() -} -func filter(ss []*net.TCPAddr, fn func(*net.TCPAddr) bool) (ret []*net.TCPAddr) { - for _, s := range ss { - if fn(s) { - ret = append(ret, s) - } - } - return -} - -func selectIPAddr(addresses []ma.Multiaddr) (*net.TCPAddr, error) { - var ipAddrs []*net.TCPAddr - for _, addr := range addresses { - ipStr, err := addr.ValueForProtocol(ma.P_IP4) - if err != nil { - continue - } - portStr, err := addr.ValueForProtocol(ma.P_TCP) - if err != nil { - continue - } - port, err := strconv.Atoi(portStr) - if err != nil { - continue - } - ipAddrs = append(ipAddrs, &net.TCPAddr{ - IP: net.ParseIP(ipStr), - Port: port, - }) - } - - externalIPs := filter(ipAddrs, isExternal) - if len(externalIPs) > 0 { - return externalIPs[0], nil - } - - privateIPs := filter(ipAddrs, isPrivate) - if len(privateIPs) > 0 { - return privateIPs[0], nil - } - - loopback := filter(ipAddrs, isLoopback) - if len(loopback) > 0 { - return loopback[0], nil - } - - return nil, errors.New("could not obtain ip address") -} diff --git a/waku/v2/discv5/discover_test.go b/waku/v2/discv5/discover_test.go index 5f894d9f..f25fccde 100644 --- a/waku/v2/discv5/discover_test.go +++ b/waku/v2/discv5/discover_test.go @@ -4,15 +4,20 @@ import ( "context" "crypto/ecdsa" "fmt" + "math" + "net" + "strconv" "testing" "time" gcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/multiformats/go-multiaddr" "github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/waku/v2/utils" "github.com/stretchr/testify/require" + "go.uber.org/zap" "github.com/libp2p/go-libp2p" libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto" @@ -41,25 +46,87 @@ func createHost(t *testing.T) (host.Host, int, *ecdsa.PrivateKey) { return host, port, privKey } +func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) { + db, err := enode.OpenDB("") + if err != nil { + return nil, err + } + localnode := enode.NewLocalNode(db, priv) + localnode.SetFallbackUDP(udpPort) + localnode.Set(enr.WithEntry(utils.WakuENRField, wakuFlags)) + localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) + localnode.SetStaticIP(ipAddr.IP) + + if udpPort > 0 && udpPort <= math.MaxUint16 { + localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("setting udpPort", zap.Int("port", udpPort)) + } + + if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 { + localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("setting tcpPort", zap.Int("port", ipAddr.Port)) + } + + if advertiseAddr != nil { + localnode.SetStaticIP(*advertiseAddr) + } + + return localnode, nil +} + +func extractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) { + ipStr, err := addr.ValueForProtocol(multiaddr.P_IP4) + if err != nil { + return nil, err + } + + portStr, err := addr.ValueForProtocol(multiaddr.P_TCP) + if err != nil { + return nil, err + } + port, err := strconv.Atoi(portStr) + if err != nil { + return nil, err + } + return &net.TCPAddr{ + IP: net.ParseIP(ipStr), + Port: port, + }, nil +} + func TestDiscV5(t *testing.T) { // Host1 <-> Host2 <-> Host3 + // H1 host1, _, prvKey1 := createHost(t) udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - d1, err := NewDiscoveryV5(host1, host1.Addrs(), prvKey1, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort1)) + ip1, _ := extractIP(host1.Addrs()[0]) + l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) + require.NoError(t, err) + d1, err := NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), WithUDPPort(udpPort1)) require.NoError(t, err) + // H2 host2, _, prvKey2 := createHost(t) + ip2, _ := extractIP(host2.Addrs()[0]) udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - d2, err := NewDiscoveryV5(host2, host2.Addrs(), prvKey2, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()})) + l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) + require.NoError(t, err) + d2, err := NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()})) require.NoError(t, err) + // H3 host3, _, prvKey3 := createHost(t) + ip3, _ := extractIP(host3.Addrs()[0]) udpPort3, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - d3, err := NewDiscoveryV5(host3, host3.Addrs(), prvKey3, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()})) + l3, err := newLocalnode(prvKey3, ip3, udpPort3, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) + require.NoError(t, err) + d3, err := NewDiscoveryV5(host3, prvKey3, l3, utils.Logger(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()})) require.NoError(t, err) defer d1.Stop() @@ -75,7 +142,7 @@ func TestDiscV5(t *testing.T) { err = d3.Start() require.NoError(t, err) - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() peerChan, err := d3.FindPeers(ctx, "", discovery.Limit(2)) diff --git a/waku/v2/node/localnode.go b/waku/v2/node/localnode.go new file mode 100644 index 00000000..44de6d43 --- /dev/null +++ b/waku/v2/node/localnode.go @@ -0,0 +1,236 @@ +package node + +import ( + "crypto/ecdsa" + "encoding/binary" + "errors" + "fmt" + "math" + "net" + "strconv" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + ma "github.com/multiformats/go-multiaddr" + "github.com/status-im/go-waku/logging" + "github.com/status-im/go-waku/waku/v2/utils" + "go.uber.org/zap" +) + +func (w *WakuNode) newLocalnode(priv *ecdsa.PrivateKey, wsAddr []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) { + db, err := enode.OpenDB("") + if err != nil { + return nil, err + } + localnode := enode.NewLocalNode(db, priv) + localnode.SetFallbackUDP(udpPort) + localnode.Set(enr.WithEntry(utils.WakuENRField, wakuFlags)) + localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) + localnode.SetStaticIP(ipAddr.IP) + + if udpPort > 0 && udpPort <= math.MaxUint16 { + localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] + } + + if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 { + localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("setting tcpPort", zap.Int("port", ipAddr.Port)) + } + + if advertiseAddr != nil { + localnode.SetStaticIP(*advertiseAddr) + } + + // Adding websocket multiaddresses + var fieldRaw []byte + + for _, addr := range wsAddr { + p2p, err := addr.ValueForProtocol(ma.P_P2P) + if err != nil { + return nil, err + } + + p2pAddr, err := ma.NewMultiaddr("/p2p/" + p2p) + if err != nil { + return nil, fmt.Errorf("could not create p2p addr: %w", err) + } + + maRaw := addr.Decapsulate(p2pAddr).Bytes() + maSize := make([]byte, 2) + binary.BigEndian.PutUint16(maSize, uint16(len(maRaw))) + + fieldRaw = append(fieldRaw, maSize...) + fieldRaw = append(fieldRaw, maRaw...) + } + + if len(fieldRaw) != 0 { + localnode.Set(enr.WithEntry(utils.MultiaddrENRField, fieldRaw)) + } + + return localnode, nil +} + +func isPrivate(addr candidateAddr) bool { + return addr.ip.IP.IsPrivate() +} + +func isExternal(addr candidateAddr) bool { + return !isPrivate(addr) && !addr.ip.IP.IsLoopback() && !addr.ip.IP.IsUnspecified() +} + +func isLoopback(addr candidateAddr) bool { + return addr.ip.IP.IsLoopback() +} + +func filterIP(ss []candidateAddr, fn func(candidateAddr) bool) (ret []candidateAddr) { + for _, s := range ss { + if fn(s) { + ret = append(ret, s) + } + } + return +} + +type candidateAddr struct { + ip *net.TCPAddr + maddr ma.Multiaddr +} + +func extractIP(addr ma.Multiaddr) (*net.TCPAddr, error) { + var ipStr string + dns4, err := addr.ValueForProtocol(ma.P_DNS4) + if err != nil { + ipStr, err = addr.ValueForProtocol(ma.P_IP4) + if err != nil { + return nil, err + } + } else { + netIP, err := net.ResolveIPAddr("ip4", dns4) + if err != nil { + return nil, err + } + ipStr = netIP.String() + } + + portStr, err := addr.ValueForProtocol(ma.P_TCP) + if err != nil { + return nil, err + } + port, err := strconv.Atoi(portStr) + if err != nil { + return nil, err + } + return &net.TCPAddr{ + IP: net.ParseIP(ipStr), + Port: port, + }, nil +} + +func selectMostExternalAddress(addresses []ma.Multiaddr) (ma.Multiaddr, *net.TCPAddr, error) { + var ipAddrs []candidateAddr + + for _, addr := range addresses { + ipAddr, err := extractIP(addr) + if err != nil { + continue + } + + ipAddrs = append(ipAddrs, candidateAddr{ + ip: ipAddr, + maddr: addr, + }) + } + + externalIPs := filterIP(ipAddrs, isExternal) + if len(externalIPs) > 0 { + return externalIPs[0].maddr, externalIPs[0].ip, nil + } + + privateIPs := filterIP(ipAddrs, isPrivate) + if len(privateIPs) > 0 { + return privateIPs[0].maddr, privateIPs[0].ip, nil + } + + loopback := filterIP(ipAddrs, isLoopback) + if len(loopback) > 0 { + return loopback[0].maddr, loopback[0].ip, nil + } + + return nil, nil, errors.New("could not obtain ip address") +} + +func selectWSListenAddress(addresses []ma.Multiaddr, extAddr ma.Multiaddr) ([]ma.Multiaddr, error) { + extAddrDNS, err := extAddr.ValueForProtocol(ma.P_DNS4) + var extAddrIP string + if err != nil { + extAddrIP, err = extAddr.ValueForProtocol(ma.P_IP4) + if err != nil { + return nil, err + } + } + + var result []ma.Multiaddr + for _, addr := range addresses { + // Filter addresses that match the extAddr + if extAddrDNS != "" { + dns4, err := addr.ValueForProtocol(ma.P_DNS4) + if err != nil { + continue + } + if dns4 != extAddrDNS { + continue + } + } else { + ip4, err := addr.ValueForProtocol(ma.P_IP4) + if err != nil { + continue + } + if ip4 != extAddrIP { + continue + } + } + + _, err := addr.ValueForProtocol(ma.P_WS) + if err == nil { + result = append(result, addr) + } + + _, err = addr.ValueForProtocol(ma.P_WSS) + if err == nil { + result = append(result, addr) + } + } + + return result, nil +} + +func (w *WakuNode) setupENR(addrs []ma.Multiaddr) error { + extAddr, ipAddr, err := selectMostExternalAddress(addrs) + if err != nil { + w.log.Error("obtaining external address", zap.Error(err)) + return err + } + + wsAddresses, err := selectWSListenAddress(addrs, extAddr) + if err != nil { + w.log.Error("obtaining websocket addresses", zap.Error(err)) + return err + } + + // TODO: make this optional depending on DNS Disc being enabled + if w.opts.privKey != nil { + localNode, err := w.newLocalnode(w.opts.privKey, wsAddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddr, w.log) + if err != nil { + w.log.Error("obtaining ENR record from multiaddress", logging.MultiAddrs("multiaddr", extAddr), zap.Error(err)) + return err + } else { + if w.localNode == nil || w.localNode.Node().String() != localNode.Node().String() { + w.localNode = localNode + w.log.Info("enr record", logging.ENode("enr", w.localNode.Node())) + } + } + } + + return nil +} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 65467e7c..e1b64c7a 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -5,10 +5,11 @@ import ( "errors" "fmt" "net" - "strconv" "sync" "time" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p" "go.uber.org/zap" @@ -59,6 +60,8 @@ type WakuNode struct { swap *swap.WakuSwap wakuFlag utils.WakuEnrBitfield + localNode *enode.LocalNode + addrChan chan ma.Multiaddr discoveryV5 *discv5.DiscoveryV5 @@ -93,19 +96,24 @@ func defaultStoreFactory(w *WakuNode) store.Store { func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { params := new(WakuNodeParameters) - ctx, cancel := context.WithCancel(ctx) - params.libP2POpts = DefaultLibP2POptions opts = append(DefaultWakuNodeOptions, opts...) for _, opt := range opts { err := opt(params) if err != nil { - cancel() return nil, err } } + if params.privKey == nil { + prvKey, err := crypto.GenerateKey() + if err != nil { + return nil, err + } + params.privKey = prvKey + } + if params.enableWSS { params.libP2POpts = append(params.libP2POpts, libp2p.Transport(ws.New, ws.WithTLSConfig(params.tlsConfig))) } else if params.enableWS { @@ -116,7 +124,6 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { if params.hostAddr == nil { err := WithHostAddress(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})(params) if err != nil { - cancel() return nil, err } } @@ -124,9 +131,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { params.libP2POpts = append(params.libP2POpts, libp2p.ListenAddrs(params.multiAddr...)) } - if params.privKey != nil { - params.libP2POpts = append(params.libP2POpts, params.Identity()) - } + params.libP2POpts = append(params.libP2POpts, params.Identity()) if params.addressFactory != nil { params.libP2POpts = append(params.libP2POpts, libp2p.AddrsFactory(params.addressFactory)) @@ -134,10 +139,11 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { host, err := libp2p.New(params.libP2POpts...) if err != nil { - cancel() return nil, err } + ctx, cancel := context.WithCancel(ctx) + w := new(WakuNode) w.bcaster = v2.NewBroadcaster(1024) w.host = host @@ -191,52 +197,8 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { func (w *WakuNode) onAddrChange() { for m := range w.addrChan { - ipStr, err := m.ValueForProtocol(ma.P_IP4) - if err != nil { - w.log.Error("extracting ip from ma", logging.MultiAddrs("ma", m), zap.Error(err)) - continue - } - - portStr, err := m.ValueForProtocol(ma.P_TCP) - if err != nil { - w.log.Error("extracting port from ma", logging.MultiAddrs("ma", m), zap.Error(err)) - continue - } - - port, err := strconv.Atoi(portStr) - if err != nil { - w.log.Error("converting port to int", zap.Error(err)) - continue - } - - addr := &net.TCPAddr{ - IP: net.ParseIP(ipStr), - Port: port, - } - - if !addr.IP.IsLoopback() && !addr.IP.IsUnspecified() { - if w.opts.enableDiscV5 { - err := w.discoveryV5.UpdateAddr(addr) - if err != nil { - w.log.Error("updating DiscV5 address with IP", zap.Stringer("address", addr), zap.Error(err)) - continue - } - } - } - } -} - -func (w *WakuNode) logAddress(addr ma.Multiaddr) { - logger := w.log.With(logging.MultiAddrs("multiaddr", addr)) - - // TODO: make this optional depending on DNS Disc being enabled - if w.opts.privKey != nil { - enr, ip, err := utils.GetENRandIP(addr, w.wakuFlag, w.opts.privKey) - if err != nil { - logger.Error("obtaining ENR record from multiaddress", zap.Error(err)) - } else { - logger.Info("listening", logging.ENode("enr", enr), zap.Stringer("ip", ip)) - } + _ = m + // TODO: determine if still needed. Otherwise remove } } @@ -251,29 +213,27 @@ func (w *WakuNode) checkForAddressChanges() { case <-w.quit: return case <-first: - for _, addr := range addrs { - w.logAddress(addr) - } + w.log.Info("listening", logging.MultiAddrs("multiaddr", addrs...)) case <-w.addressChangesSub.Out(): newAddrs := w.ListenAddresses() - print := false + diff := false if len(addrs) != len(newAddrs) { - print = true + diff = true } else { for i := range newAddrs { if addrs[i].String() != newAddrs[i].String() { - print = true + diff = true break } } } - if print { + if diff { addrs = newAddrs - w.log.Warn("Change in host multiaddresses") - for _, addr := range newAddrs { + w.log.Info("listening addresses update received", logging.MultiAddrs("multiaddr", addrs...)) + for _, addr := range addrs { w.addrChan <- addr - w.logAddress(addr) } + _ = w.setupENR(addrs) } } } @@ -306,6 +266,11 @@ func (w *WakuNode) Start() error { w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(rendezvous, w.opts.rendezvousOpts...)) } + err := w.setupENR(w.ListenAddresses()) + if err != nil { + return err + } + if w.opts.enableDiscV5 { err := w.mountDiscV5() if err != nil { @@ -317,7 +282,7 @@ func (w *WakuNode) Start() error { w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.discoveryV5, w.opts.discV5Opts...)) } - err := w.mountRelay(w.opts.minRelayPeersToPublish, w.opts.wOpts...) + err = w.mountRelay(w.opts.minRelayPeersToPublish, w.opts.wOpts...) if err != nil { return err } @@ -401,6 +366,11 @@ func (w *WakuNode) ListenAddresses() []ma.Multiaddr { return result } +// ENR returns the ENR address of the node +func (w *WakuNode) ENR() *enode.Node { + return w.localNode.Node() +} + // Relay is used to access any operation related to Waku Relay protocol func (w *WakuNode) Relay() *relay.WakuRelay { return w.relay @@ -492,7 +462,7 @@ func (w *WakuNode) mountDiscV5() error { } var err error - w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.ListenAddresses(), w.opts.privKey, w.wakuFlag, w.log, discV5Options...) + w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.opts.privKey, w.localNode, w.log, discV5Options...) return err } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index f836f105..dabe428c 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -94,6 +94,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { _, err := wakuLP.relay.PublishToTopic(wakuLP.ctx, message, pubSubTopic) if err != nil { + logger.Error("publishing message", zap.Error(err)) response.IsSuccess = false response.Info = "Could not publish message" } else { diff --git a/waku/v2/rpc/codec.go b/waku/v2/rpc/codec.go index ca979541..2e9e9477 100644 --- a/waku/v2/rpc/codec.go +++ b/waku/v2/rpc/codec.go @@ -1,59 +1,105 @@ package rpc import ( + "encoding/json" + "errors" + "fmt" "net/http" + "reflect" "strings" "github.com/gorilla/rpc/v2" - "github.com/gorilla/rpc/v2/json" + "golang.org/x/text/cases" + "golang.org/x/text/language" ) +// Based on github.com/gorilla/rpc/v2/json which is governed by a BSD-style license + +var null = json.RawMessage([]byte("null")) + +// An Error is a wrapper for a JSON interface value. It can be used by either +// a service's handler func to write more complex JSON data to an error field +// of a server's response, or by a client to read it. +type Error struct { + Data interface{} +} + +func (e *Error) Error() string { + return fmt.Sprintf("%v", e.Data) +} + +// ---------------------------------------------------------------------------- +// Request and Response +// ---------------------------------------------------------------------------- + +// serverRequest represents a JSON-RPC request received by the server. +type serverRequest struct { + // A String containing the name of the method to be invoked. + Method string `json:"method"` + // An Array of objects to pass as arguments to the method. + Params *json.RawMessage `json:"params"` + // The request id. This can be of any type. It is used to match the + // response with the request that it is replying to. + Id *json.RawMessage `json:"id"` +} + +// serverResponse represents a JSON-RPC response returned by the server. +type serverResponse struct { + // The Object that was returned by the invoked method. This must be null + // in case there was an error invoking the method. + Result interface{} `json:"result"` + // An Error object if there was an error invoking the method. It must be + // null if there was no error. + Error interface{} `json:"error"` + // This must be the same id as the request it is responding to. + Id *json.RawMessage `json:"id"` +} + +// ---------------------------------------------------------------------------- +// Codec +// ---------------------------------------------------------------------------- + +// NewCodec returns a new SnakeCaseCodec Codec. +func NewSnakeCaseCodec() *SnakeCaseCodec { + return &SnakeCaseCodec{} +} + // SnakeCaseCodec creates a CodecRequest to process each request. type SnakeCaseCodec struct { } -// NewSnakeCaseCodec returns a new SnakeCaseCodec. -func NewSnakeCaseCodec() *SnakeCaseCodec { - return &SnakeCaseCodec{} -} - -// NewRequest returns a new CodecRequest of type SnakeCaseCodecRequest. +// NewRequest returns a CodecRequest. func (c *SnakeCaseCodec) NewRequest(r *http.Request) rpc.CodecRequest { - outerCR := &SnakeCaseCodecRequest{} // Our custom CR - jsonC := json.NewCodec() // json Codec to create json CR - innerCR := jsonC.NewRequest(r) // create the json CR, sort of. - - // NOTE - innerCR is of the interface type rpc.CodecRequest. - // Because innerCR is of the rpc.CR interface type, we need a - // type assertion in order to assign it to our struct field's type. - // We defined the source of the interface implementation here, so - // we can be confident that innerCR will be of the correct underlying type - outerCR.CodecRequest = innerCR.(*json.CodecRequest) - return outerCR + return newCodecRequest(r) } -// SnakeCaseCodecRequest decodes and encodes a single request. SnakeCaseCodecRequest -// implements gorilla/rpc.CodecRequest interface primarily by embedding -// the CodecRequest from gorilla/rpc/json. By selectively adding -// CodecRequest methods to SnakeCaseCodecRequest, we can modify that behaviour -// while maintaining all the other remaining CodecRequest methods from -// gorilla's rpc/json implementation -type SnakeCaseCodecRequest struct { - *json.CodecRequest +// ---------------------------------------------------------------------------- +// CodecRequest +// ---------------------------------------------------------------------------- + +// newCodecRequest returns a new CodecRequest. +func newCodecRequest(r *http.Request) rpc.CodecRequest { + // Decode the request body and check if RPC method is valid. + req := new(serverRequest) + err := json.NewDecoder(r.Body).Decode(req) + r.Body.Close() + return &CodecRequest{request: req, err: err} } -// Method returns the decoded method as a string of the form "Service.Method" -// after checking for, and correcting a lowercase method name -// By being of lower depth in the struct , Method will replace the implementation -// of Method() on the embedded CodecRequest. Because the request data is part -// of the embedded json.CodecRequest, and unexported, we have to get the -// requested method name via the embedded CR's own method Method(). -// Essentially, this just intercepts the return value from the embedded -// gorilla/rpc/json.CodecRequest.Method(), checks/modifies it, and passes it -// on to the calling rpc server. -func (c *SnakeCaseCodecRequest) Method() (string, error) { - m, err := c.CodecRequest.Method() - return toWakuMethod(m), err +// CodecRequest decodes and encodes a single request. +type CodecRequest struct { + request *serverRequest + err error +} + +// Method returns the RPC method for the current request. +// +// The method uses a dotted notation as in "Service.Method". +func (c *CodecRequest) Method() (string, error) { + if c.err == nil { + return toWakuMethod(c.request.Method), nil + } + return "", c.err } // toWakuMethod transform get_waku_v2_debug_v1_info to Debug.GetV1Info @@ -69,10 +115,81 @@ func toWakuMethod(input string) string { cleanedInput := strings.Replace(input, base, "", 1) splited := strings.Split(cleanedInput, "_") - method := strings.Title(typ) + c := cases.Title(language.AmericanEnglish) + + method := c.String(typ) for _, val := range splited[1:] { - method = method + strings.Title(val) + method = method + c.String(val) } - return strings.Title(splited[0]) + "." + method + return c.String(splited[0]) + "." + method +} + +// ReadRequest fills the request object for the RPC method. +func (c *CodecRequest) ReadRequest(args interface{}) error { + if c.err == nil { + if c.request.Params != nil { + // JSON params is array value. RPC params is struct. + // Attempt to unmarshal into array containing the request struct. + params := [1]interface{}{args} + err := json.Unmarshal(*c.request.Params, ¶ms) + if err != nil { + // This failed so we might have received an array of parameters + // instead of a object + argsValueOf := reflect.Indirect(reflect.ValueOf(args)) + if argsValueOf.Kind() == reflect.Struct { + var params []interface{} + for i := 0; i < argsValueOf.NumField(); i++ { + params = append(params, argsValueOf.Field(i).Addr().Interface()) + } + c.err = json.Unmarshal(*c.request.Params, ¶ms) + } else { + // Unknown field type... + c.err = err + } + } + + } else { + c.err = errors.New("rpc: method request ill-formed: missing params field") + } + } + return c.err +} + +// WriteResponse encodes the response and writes it to the ResponseWriter. +func (c *CodecRequest) WriteResponse(w http.ResponseWriter, reply interface{}) { + if c.request.Id != nil { + // Id is null for notifications and they don't have a response. + res := &serverResponse{ + Result: reply, + Error: &null, + Id: c.request.Id, + } + c.writeServerResponse(w, 200, res) + } +} + +func (c *CodecRequest) WriteError(w http.ResponseWriter, _ int, err error) { + res := &serverResponse{ + Result: &null, + Id: c.request.Id, + } + if jsonErr, ok := err.(*Error); ok { + res.Error = jsonErr.Data + } else { + res.Error = err.Error() + } + c.writeServerResponse(w, 400, res) +} + +func (c *CodecRequest) writeServerResponse(w http.ResponseWriter, status int, res *serverResponse) { + b, err := json.Marshal(res) + if err == nil { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(status) + w.Write(b) + } else { + // Not sure in which case will this happen. But seems harmless. + rpc.WriteError(w, 400, err.Error()) + } } diff --git a/waku/v2/rpc/debug.go b/waku/v2/rpc/debug.go index 7edeb2d7..c7a16501 100644 --- a/waku/v2/rpc/debug.go +++ b/waku/v2/rpc/debug.go @@ -14,11 +14,15 @@ type InfoArgs struct { } type InfoReply struct { - Version string `json:"version,omitempty"` + ENRUri string `json:"enrUri,omitempty"` + ListenAddresses []string `json:"listenAddresses,omitempty"` } func (d *DebugService) GetV1Info(r *http.Request, args *InfoArgs, reply *InfoReply) error { - reply.Version = "2.0" + reply.ENRUri = d.node.ENR().String() + for _, addr := range d.node.ListenAddresses() { + reply.ListenAddresses = append(reply.ListenAddresses, addr.String()) + } return nil } diff --git a/waku/v2/rpc/debug_test.go b/waku/v2/rpc/debug_test.go index e84696d0..de03a096 100644 --- a/waku/v2/rpc/debug_test.go +++ b/waku/v2/rpc/debug_test.go @@ -2,9 +2,11 @@ package rpc import ( "bytes" + "context" "net/http" "testing" + "github.com/status-im/go-waku/waku/v2/node" "github.com/stretchr/testify/require" ) @@ -14,9 +16,16 @@ func TestGetV1Info(t *testing.T) { request, err := http.NewRequest(http.MethodPost, "url", bytes.NewReader([]byte(""))) require.NoError(t, err) - d := &DebugService{nil} + wakuNode1, err := node.New(context.Background()) + require.NoError(t, err) + defer wakuNode1.Stop() + err = wakuNode1.Start() + require.NoError(t, err) + + d := &DebugService{ + node: wakuNode1, + } err = d.GetV1Info(request, &InfoArgs{}, &reply) require.NoError(t, err) - require.Equal(t, "2.0", reply.Version) } diff --git a/waku/v2/rpc/filter.go b/waku/v2/rpc/filter.go index 4a9ab262..a06e2e98 100644 --- a/waku/v2/rpc/filter.go +++ b/waku/v2/rpc/filter.go @@ -119,7 +119,10 @@ func (f *FilterService) GetV1Messages(req *http.Request, args *ContentTopicArgs, return fmt.Errorf("topic %s not subscribed", args.ContentTopic) } - reply.Messages = f.messages[args.ContentTopic] + for i := range f.messages[args.ContentTopic] { + *reply = append(*reply, ProtoWakuMessageToRPCWakuMessage(f.messages[args.ContentTopic][i])) + } + f.messages[args.ContentTopic] = make([]*pb.WakuMessage, 0) return nil } diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index 5edaba5b..0955a369 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -127,20 +127,21 @@ func TestFilterGetV1Messages(t *testing.T) { // Wait for the message to be received time.Sleep(1 * time.Second) - var messagesReply MessagesReply + var messagesReply1 MessagesReply err = serviceB.GetV1Messages( makeRequest(t), &ContentTopicArgs{"ct"}, - &messagesReply, + &messagesReply1, ) require.NoError(t, err) - require.Len(t, messagesReply.Messages, 1) + require.Len(t, messagesReply1, 1) + var messagesReply2 MessagesReply err = serviceB.GetV1Messages( makeRequest(t), &ContentTopicArgs{"ct"}, - &messagesReply, + &messagesReply2, ) require.NoError(t, err) - require.Len(t, messagesReply.Messages, 0) + require.Len(t, messagesReply2, 0) } diff --git a/waku/v2/rpc/private.go b/waku/v2/rpc/private.go index 1f7b678e..c2fdb7c5 100644 --- a/waku/v2/rpc/private.go +++ b/waku/v2/rpc/private.go @@ -3,14 +3,17 @@ package rpc import ( "crypto/ecdsa" "crypto/rand" - "encoding/hex" "fmt" "net/http" + "strings" "sync" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" ) @@ -18,31 +21,28 @@ type PrivateService struct { node *node.WakuNode log *zap.Logger - symmetricMessages map[string][]*pb.WakuMessage - symmetricMessagesMutex sync.RWMutex + messages map[string][]*pb.WakuMessage + messagesMutex sync.RWMutex - asymmetricMessages map[string][]*pb.WakuMessage - asymmetricMessagesMutex sync.RWMutex + runner *runnerService } -type SymmetricKeyReply struct { - Key string `json:"key"` -} +type SymmetricKeyReply string type KeyPairReply struct { PrivateKey string `json:"privateKey"` - PulicKey string `json:"publicKey"` + PublicKey string `json:"publicKey"` } type SymmetricMessageArgs struct { Topic string `json:"topic"` - Message pb.WakuMessage `json:"message"` + Message RPCWakuMessage `json:"message"` SymKey string `json:"symkey"` } type AsymmetricMessageArgs struct { Topic string `json:"topic"` - Message pb.WakuMessage `json:"message"` + Message RPCWakuMessage `json:"message"` PublicKey string `json:"publicKey"` } @@ -57,12 +57,24 @@ type AsymmetricMessagesArgs struct { } func NewPrivateService(node *node.WakuNode, log *zap.Logger) *PrivateService { - return &PrivateService{ - node: node, - symmetricMessages: make(map[string][]*pb.WakuMessage), - asymmetricMessages: make(map[string][]*pb.WakuMessage), - log: log.Named("private"), + p := &PrivateService{ + node: node, + messages: make(map[string][]*pb.WakuMessage), + log: log.Named("private"), } + p.runner = newRunnerService(node.Broadcaster(), p.addEnvelope) + + return p +} + +func (p *PrivateService) addEnvelope(envelope *protocol.Envelope) { + p.messagesMutex.Lock() + defer p.messagesMutex.Unlock() + if _, ok := p.messages[envelope.PubsubTopic()]; !ok { + p.messages[envelope.PubsubTopic()] = make([]*pb.WakuMessage, 0) + } + + p.messages[envelope.PubsubTopic()] = append(p.messages[envelope.PubsubTopic()], envelope.Message()) } func (p *PrivateService) GetV1SymmetricKey(req *http.Request, args *Empty, reply *SymmetricKeyReply) error { @@ -71,7 +83,7 @@ func (p *PrivateService) GetV1SymmetricKey(req *http.Request, args *Empty, reply if err != nil { return err } - reply.Key = hex.EncodeToString(key[:]) + *reply = SymmetricKeyReply(hexutil.Encode(key[:])) return nil } @@ -89,44 +101,52 @@ func (p *PrivateService) GetV1AsymmetricKeypair(req *http.Request, args *Empty, } publicKeyBytes := crypto.FromECDSAPub(publicKeyECDSA) - reply.PrivateKey = hex.EncodeToString(privateKeyBytes[:]) - reply.PulicKey = hex.EncodeToString(publicKeyBytes[:]) + reply.PrivateKey = hexutil.Encode(privateKeyBytes[:]) + reply.PublicKey = hexutil.Encode(publicKeyBytes[:]) return nil } func (p *PrivateService) PostV1SymmetricMessage(req *http.Request, args *SymmetricMessageArgs, reply *SuccessReply) error { + symKeyBytes, err := hexutil.Decode(args.SymKey) + if err != nil { + return fmt.Errorf("invalid symmetric key: %w", err) + } + keyInfo := new(node.KeyInfo) keyInfo.Kind = node.Symmetric - keyInfo.SymKey = []byte(args.SymKey) + keyInfo.SymKey = symKeyBytes - err := node.EncodeWakuMessage(&args.Message, keyInfo) - if err != nil { - reply.Error = err.Error() - reply.Success = false - return nil - } - err = p.node.Publish(req.Context(), &args.Message) + msg := args.Message.toProto() + msg.Version = 1 + + err = node.EncodeWakuMessage(msg, keyInfo) if err != nil { reply.Error = err.Error() reply.Success = false return nil } - p.symmetricMessagesMutex.Lock() - defer p.symmetricMessagesMutex.Unlock() - if _, ok := p.symmetricMessages[args.Topic]; !ok { - p.symmetricMessages[args.Topic] = make([]*pb.WakuMessage, 0) + topic := args.Topic + if topic == "" { + topic = relay.DefaultWakuTopic + } + + _, err = p.node.Relay().PublishToTopic(req.Context(), msg, topic) + if err != nil { + reply.Error = err.Error() + reply.Success = false + return nil } - p.symmetricMessages[args.Topic] = append(p.symmetricMessages[args.Topic], &args.Message) reply.Success = true return nil } -func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *AsymmetricMessageArgs, reply *SuccessReply) error { +func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *AsymmetricMessageArgs, reply *bool) error { keyInfo := new(node.KeyInfo) keyInfo.Kind = node.Asymmetric - pubKeyBytes, err := hex.DecodeString(args.PublicKey) + + pubKeyBytes, err := hexutil.Decode(args.PublicKey) if err != nil { return fmt.Errorf("public key cannot be decoded: %v", err) } @@ -135,54 +155,108 @@ func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *Asymme if err != nil { return fmt.Errorf("public key cannot be unmarshalled: %v", err) } + keyInfo.PubKey = *pubKey - err = node.EncodeWakuMessage(&args.Message, keyInfo) + msg := args.Message.toProto() + msg.Version = 1 + + err = node.EncodeWakuMessage(msg, keyInfo) if err != nil { - reply.Error = err.Error() - reply.Success = false - return nil - } - err = p.node.Publish(req.Context(), &args.Message) - if err != nil { - reply.Error = err.Error() - reply.Success = false - return nil + return err } - p.asymmetricMessagesMutex.Lock() - defer p.asymmetricMessagesMutex.Unlock() - if _, ok := p.asymmetricMessages[args.Topic]; !ok { - p.asymmetricMessages[args.Topic] = make([]*pb.WakuMessage, 0) + topic := args.Topic + if topic == "" { + topic = relay.DefaultWakuTopic } - p.asymmetricMessages[args.Topic] = append(p.asymmetricMessages[args.Topic], &args.Message) - reply.Success = true + _, err = p.node.Relay().PublishToTopic(req.Context(), msg, topic) + if err != nil { + return err + } + + *reply = true + return nil } func (p *PrivateService) GetV1SymmetricMessages(req *http.Request, args *SymmetricMessagesArgs, reply *MessagesReply) error { - p.symmetricMessagesMutex.Lock() - defer p.symmetricMessagesMutex.Unlock() + p.messagesMutex.Lock() + defer p.messagesMutex.Unlock() - if _, ok := p.symmetricMessages[args.Topic]; !ok { - return fmt.Errorf("topic %s not subscribed", args.Topic) + if _, ok := p.messages[args.Topic]; !ok { + p.messages[args.Topic] = make([]*pb.WakuMessage, 0) + } + + symKeyBytes, err := hexutil.Decode(args.SymKey) + if err != nil { + return fmt.Errorf("invalid symmetric key: %w", err) + } + + messages := make([]*pb.WakuMessage, len(p.messages[args.Topic])) + copy(messages, p.messages[args.Topic]) + p.messages[args.Topic] = make([]*pb.WakuMessage, 0) + + var decodedMessages []*pb.WakuMessage + for _, msg := range messages { + err := node.DecodeWakuMessage(msg, &node.KeyInfo{ + Kind: node.Symmetric, + SymKey: symKeyBytes, + }) + if err != nil { + continue + } + decodedMessages = append(decodedMessages, msg) + } + + for i := range decodedMessages { + *reply = append(*reply, ProtoWakuMessageToRPCWakuMessage(decodedMessages[i])) } - reply.Messages = p.symmetricMessages[args.Topic] - p.symmetricMessages[args.Topic] = make([]*pb.WakuMessage, 0) return nil } func (p *PrivateService) GetV1AsymmetricMessages(req *http.Request, args *AsymmetricMessagesArgs, reply *MessagesReply) error { - p.asymmetricMessagesMutex.Lock() - defer p.asymmetricMessagesMutex.Unlock() + p.messagesMutex.Lock() + defer p.messagesMutex.Unlock() - if _, ok := p.asymmetricMessages[args.Topic]; !ok { - return fmt.Errorf("topic %s not subscribed", args.Topic) + if _, ok := p.messages[args.Topic]; !ok { + p.messages[args.Topic] = make([]*pb.WakuMessage, 0) + } + + messages := make([]*pb.WakuMessage, len(p.messages[args.Topic])) + copy(messages, p.messages[args.Topic]) + p.messages[args.Topic] = make([]*pb.WakuMessage, 0) + + privKey, err := crypto.HexToECDSA(strings.TrimPrefix(args.PrivateKey, "0x")) + if err != nil { + return fmt.Errorf("invalid asymmetric key: %w", err) + } + + var decodedMessages []*pb.WakuMessage + for _, msg := range messages { + err := node.DecodeWakuMessage(msg, &node.KeyInfo{ + Kind: node.Asymmetric, + PrivKey: privKey, + }) + if err != nil { + continue + } + decodedMessages = append(decodedMessages, msg) + } + + for i := range decodedMessages { + *reply = append(*reply, ProtoWakuMessageToRPCWakuMessage(decodedMessages[i])) } - reply.Messages = p.asymmetricMessages[args.Topic] - p.asymmetricMessages[args.Topic] = make([]*pb.WakuMessage, 0) return nil } + +func (p *PrivateService) Start() { + p.runner.Start() +} + +func (p *PrivateService) Stop() { + p.runner.Stop() +} diff --git a/waku/v2/rpc/private_test.go b/waku/v2/rpc/private_test.go index 41c7a86e..8c7fd0ba 100644 --- a/waku/v2/rpc/private_test.go +++ b/waku/v2/rpc/private_test.go @@ -3,9 +3,11 @@ package rpc import ( "context" "testing" + "time" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto" "github.com/status-im/go-waku/waku/v2/node" - "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/utils" "github.com/stretchr/testify/require" ) @@ -30,7 +32,7 @@ func TestGetV1SymmetricKey(t *testing.T) { &reply, ) require.NoError(t, err) - require.NotEmpty(t, reply.Key) + require.NotEmpty(t, reply) } func TestGetV1AsymmetricKey(t *testing.T) { @@ -44,7 +46,7 @@ func TestGetV1AsymmetricKey(t *testing.T) { &reply, ) require.NoError(t, err) - require.NotEmpty(t, reply.PulicKey) + require.NotEmpty(t, reply.PublicKey) require.NotEmpty(t, reply.PrivateKey) } @@ -57,8 +59,8 @@ func TestPostV1SymmetricMessage(t *testing.T) { makeRequest(t), &SymmetricMessageArgs{ Topic: "test", - Message: pb.WakuMessage{Payload: []byte("test")}, - SymKey: "abc", + Message: RPCWakuMessage{Payload: []byte("test")}, + SymKey: "0x1122334455667788991011223344556677889910112233445566778899101122", }, &reply, ) @@ -70,73 +72,94 @@ func TestPostV1AsymmetricMessage(t *testing.T) { d := makePrivateService(t) defer d.node.Stop() - var reply SuccessReply + var reply bool err := d.PostV1AsymmetricMessage( makeRequest(t), &AsymmetricMessageArgs{ Topic: "test", - Message: pb.WakuMessage{Payload: []byte("test")}, - PublicKey: "045ded6a56c88173e87a88c55b96956964b1bd3351b5fcb70950a4902fbc1bc0ceabb0ac846c3a4b8f2f6024c0e19f0a7f6a4865035187de5463f34012304fc7c5", + Message: RPCWakuMessage{Payload: []byte("test")}, + PublicKey: "0x045ded6a56c88173e87a88c55b96956964b1bd3351b5fcb70950a4902fbc1bc0ceabb0ac846c3a4b8f2f6024c0e19f0a7f6a4865035187de5463f34012304fc7c5", }, &reply, ) require.NoError(t, err) - require.True(t, reply.Success) + require.True(t, reply) } func TestGetV1SymmetricMessages(t *testing.T) { d := makePrivateService(t) + go d.Start() defer d.node.Stop() + // Subscribing topic to test getter + _, err := d.node.Relay().SubscribeToTopic(context.TODO(), "test") + require.NoError(t, err) + var reply SuccessReply - err := d.PostV1SymmetricMessage( + err = d.PostV1SymmetricMessage( makeRequest(t), &SymmetricMessageArgs{ Topic: "test", - Message: pb.WakuMessage{Payload: []byte("test")}, - SymKey: "abc", + Message: RPCWakuMessage{Payload: []byte("test")}, + SymKey: "0x1122334455667788991011223344556677889910112233445566778899101122", }, &reply, ) require.NoError(t, err) require.True(t, reply.Success) + time.Sleep(500 * time.Millisecond) + var getReply MessagesReply err = d.GetV1SymmetricMessages( makeRequest(t), - &SymmetricMessagesArgs{Topic: "test", SymKey: "abc"}, + &SymmetricMessagesArgs{ + Topic: "test", + SymKey: "0x1122334455667788991011223344556677889910112233445566778899101122", + }, &getReply, ) require.NoError(t, err) - require.Len(t, getReply.Messages, 1) + require.Len(t, getReply, 1) } func TestGetV1AsymmetricMessages(t *testing.T) { d := makePrivateService(t) + go d.Start() defer d.node.Stop() - var reply SuccessReply - err := d.PostV1AsymmetricMessage( + // Subscribing topic to test getter + _, err := d.node.Relay().SubscribeToTopic(context.TODO(), "test") + require.NoError(t, err) + + prvKey, err := crypto.GenerateKey() + require.NoError(t, err) + + var reply bool + err = d.PostV1AsymmetricMessage( makeRequest(t), &AsymmetricMessageArgs{ Topic: "test", - Message: pb.WakuMessage{Payload: []byte("test")}, - PublicKey: "045ded6a56c88173e87a88c55b96956964b1bd3351b5fcb70950a4902fbc1bc0ceabb0ac846c3a4b8f2f6024c0e19f0a7f6a4865035187de5463f34012304fc7c5", + Message: RPCWakuMessage{Payload: []byte("test")}, + PublicKey: hexutil.Encode(crypto.FromECDSAPub(&prvKey.PublicKey)), }, &reply, ) require.NoError(t, err) - require.True(t, reply.Success) + require.True(t, reply) + + time.Sleep(500 * time.Millisecond) var getReply MessagesReply err = d.GetV1AsymmetricMessages( makeRequest(t), &AsymmetricMessagesArgs{ Topic: "test", - PrivateKey: "045ded6a56c88173e87a88c55b96956964b1bd3351b5fcb70950a4902fbc1bc0ceabb0ac846c3a4b8f2f6024c0e19f0a7f6a4865035187de5463f34012304fc7c5", + PrivateKey: hexutil.Encode(crypto.FromECDSA(prvKey)), }, &getReply, ) + require.NoError(t, err) - require.Len(t, getReply.Messages, 1) + require.Len(t, getReply, 1) } diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go index 610b1dc1..35f5a653 100644 --- a/waku/v2/rpc/relay.go +++ b/waku/v2/rpc/relay.go @@ -8,6 +8,7 @@ import ( "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" ) @@ -23,8 +24,8 @@ type RelayService struct { } type RelayMessageArgs struct { - Topic string `json:"topic,omitempty"` - Message pb.WakuMessage `json:"message,omitempty"` + Topic string `json:"topic,omitempty"` + Message RPCWakuRelayMessage `json:"message,omitempty"` } type TopicsArgs struct { @@ -43,6 +44,7 @@ func NewRelayService(node *node.WakuNode, log *zap.Logger) *RelayService { } s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) + return s } @@ -58,6 +60,12 @@ func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { } func (r *RelayService) Start() { + // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these + for _, topic := range r.node.Relay().Topics() { + r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic)) + r.messages[topic] = make([]*pb.WakuMessage, 0) + } + r.runner.Start() } @@ -67,10 +75,13 @@ func (r *RelayService) Stop() { func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error { var err error + + msg := args.Message.toProto() + if args.Topic == "" { - _, err = r.node.Relay().Publish(req.Context(), &args.Message) + _, err = r.node.Relay().Publish(req.Context(), msg) } else { - _, err = r.node.Relay().PublishToTopic(req.Context(), &args.Message, args.Topic) + _, err = r.node.Relay().PublishToTopic(req.Context(), msg, args.Topic) } if err != nil { r.log.Error("publishing message", zap.Error(err)) @@ -87,10 +98,13 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r for _, topic := range args.Topics { var err error if topic == "" { - _, err = r.node.Relay().Subscribe(ctx) - + var sub *relay.Subscription + sub, err = r.node.Relay().Subscribe(ctx) + r.node.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C) } else { - _, err = r.node.Relay().SubscribeToTopic(ctx, topic) + var sub *relay.Subscription + sub, err = r.node.Relay().SubscribeToTopic(ctx, topic) + r.node.Broadcaster().Unregister(&topic, sub.C) } if err != nil { r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err)) @@ -121,7 +135,7 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, return nil } -func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error { +func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *RelayMessagesReply) error { r.messagesMutex.Lock() defer r.messagesMutex.Unlock() @@ -129,7 +143,11 @@ func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply * return fmt.Errorf("topic %s not subscribed", args.Topic) } - reply.Messages = r.messages[args.Topic] + for i := range r.messages[args.Topic] { + *reply = append(*reply, ProtoWakuMessageToRPCWakuRelayMessage(r.messages[args.Topic][i])) + } + r.messages[args.Topic] = make([]*pb.WakuMessage, 0) + return nil } diff --git a/waku/v2/rpc/relay_test.go b/waku/v2/rpc/relay_test.go index b6a7c6ff..cbcae1cc 100644 --- a/waku/v2/rpc/relay_test.go +++ b/waku/v2/rpc/relay_test.go @@ -8,7 +8,6 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/status-im/go-waku/waku/v2/node" - "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/utils" "github.com/stretchr/testify/require" ) @@ -97,7 +96,7 @@ func TestRelayGetV1Messages(t *testing.T) { makeRequest(t), &RelayMessageArgs{ Topic: "test", - Message: pb.WakuMessage{ + Message: RPCWakuRelayMessage{ Payload: []byte("test"), }, }, @@ -109,20 +108,21 @@ func TestRelayGetV1Messages(t *testing.T) { // Wait for the message to be received time.Sleep(1 * time.Second) - var messagesReply MessagesReply + var messagesReply1 RelayMessagesReply err = serviceB.GetV1Messages( makeRequest(t), &TopicArgs{"test"}, - &messagesReply, + &messagesReply1, ) require.NoError(t, err) - require.Len(t, messagesReply.Messages, 1) + require.Len(t, messagesReply1, 1) + var messagesReply2 RelayMessagesReply err = serviceB.GetV1Messages( makeRequest(t), &TopicArgs{"test"}, - &messagesReply, + &messagesReply2, ) require.NoError(t, err) - require.Len(t, messagesReply.Messages, 0) + require.Len(t, messagesReply2, 0) } diff --git a/waku/v2/rpc/rpc_type.go b/waku/v2/rpc/rpc_type.go index 391bee20..322cef3f 100644 --- a/waku/v2/rpc/rpc_type.go +++ b/waku/v2/rpc/rpc_type.go @@ -1,7 +1,5 @@ package rpc -import "github.com/status-im/go-waku/waku/v2/protocol/pb" - type SuccessReply struct { Success bool `json:"success,omitempty"` Error string `json:"error,omitempty"` @@ -10,6 +8,6 @@ type SuccessReply struct { type Empty struct { } -type MessagesReply struct { - Messages []*pb.WakuMessage `json:"messages,omitempty"` -} +type MessagesReply []*RPCWakuMessage + +type RelayMessagesReply []*RPCWakuRelayMessage diff --git a/waku/v2/rpc/store.go b/waku/v2/rpc/store.go index 690dcc20..f5d011de 100644 --- a/waku/v2/rpc/store.go +++ b/waku/v2/rpc/store.go @@ -33,7 +33,7 @@ type StoreMessagesArgs struct { } type StoreMessagesReply struct { - Messages []*pb.WakuMessage `json:"messages,omitempty"` + Messages []RPCWakuMessage `json:"messages,omitempty"` PagingInfo StorePagingOptions `json:"pagingInfo,omitempty"` Error string `json:"error,omitempty"` } @@ -60,7 +60,12 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs, reply.Error = err.Error() return nil } - reply.Messages = res.Messages + + reply.Messages = make([]RPCWakuMessage, len(res.Messages)) + for i := range res.Messages { + reply.Messages[i] = *ProtoWakuMessageToRPCWakuMessage(res.Messages[i]) + } + reply.PagingInfo = StorePagingOptions{ PageSize: args.PagingOptions.PageSize, Cursor: res.Cursor(), diff --git a/waku/v2/rpc/utils.go b/waku/v2/rpc/utils.go new file mode 100644 index 00000000..96fa1446 --- /dev/null +++ b/waku/v2/rpc/utils.go @@ -0,0 +1,135 @@ +package rpc + +import ( + "encoding/hex" + "fmt" + "strings" + + "github.com/status-im/go-waku/waku/v2/protocol/pb" +) + +// HexBytes is marshalled to a hex string +type HexBytes []byte + +// ByteArray is marshalled to a uint8 array +type ByteArray []byte + +type RPCWakuMessage struct { + Payload ByteArray `json:"payload,omitempty"` + ContentTopic string `json:"contentTopic,omitempty"` + Version uint32 `json:"version"` + Timestamp int64 `json:"timestamp,omitempty"` + Proof HexBytes `json:"proof,omitempty"` +} + +type RPCWakuRelayMessage struct { + Payload HexBytes `json:"payload,omitempty"` + ContentTopic string `json:"contentTopic,omitempty"` + Timestamp int64 `json:"timestamp,omitempty"` + Proof HexBytes `json:"proof,omitempty"` + Version uint32 `json:"version"` +} + +func ProtoWakuMessageToRPCWakuMessage(input *pb.WakuMessage) *RPCWakuMessage { + if input == nil { + return nil + } + + return &RPCWakuMessage{ + Payload: input.Payload, + ContentTopic: input.ContentTopic, + Version: input.Version, + Timestamp: input.Timestamp, + Proof: input.Proof, + } +} + +func (r *RPCWakuMessage) toProto() *pb.WakuMessage { + if r == nil { + return nil + } + + return &pb.WakuMessage{ + Payload: r.Payload, + ContentTopic: r.ContentTopic, + Version: r.Version, + Timestamp: r.Timestamp, + Proof: r.Proof, + } +} + +func (u HexBytes) MarshalJSON() ([]byte, error) { + var result string + if u == nil { + result = "null" + } else { + result = strings.Join(strings.Fields(fmt.Sprintf("%d", u)), ",") + } + return []byte(result), nil +} + +func (h *HexBytes) UnmarshalText(b []byte) error { + hexString := "" + if b != nil { + hexString = string(b) + } + + decoded, err := hex.DecodeString(hexString) + if err != nil { + return err + } + + *h = decoded + + return nil +} + +func ProtoWakuMessageToRPCWakuRelayMessage(input *pb.WakuMessage) *RPCWakuRelayMessage { + if input == nil { + return nil + } + + return &RPCWakuRelayMessage{ + Payload: input.Payload, + ContentTopic: input.ContentTopic, + Timestamp: input.Timestamp, + Proof: input.Proof, + } +} + +func (r *RPCWakuRelayMessage) toProto() *pb.WakuMessage { + if r == nil { + return nil + } + + return &pb.WakuMessage{ + Payload: r.Payload, + ContentTopic: r.ContentTopic, + Timestamp: r.Timestamp, + Proof: r.Proof, + } +} + +func (h ByteArray) MarshalText() ([]byte, error) { + if h == nil { + return []byte{}, nil + } + + return []byte(hex.EncodeToString(h)), nil +} + +func (h *ByteArray) UnmarshalText(b []byte) error { + hexString := "" + if b != nil { + hexString = string(b) + } + + decoded, err := hex.DecodeString(hexString) + if err != nil { + return err + } + + *h = decoded + + return nil +} diff --git a/waku/v2/rpc/waku_rpc.go b/waku/v2/rpc/waku_rpc.go index affde951..b1e0845e 100644 --- a/waku/v2/rpc/waku_rpc.go +++ b/waku/v2/rpc/waku_rpc.go @@ -88,6 +88,9 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, server.RegisterOnShutdown(func() { filterService.Stop() relayService.Stop() + if wrpc.privateService != nil { + wrpc.privateService.Stop() + } }) wrpc.node = node @@ -101,6 +104,9 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, func (r *WakuRpc) Start() { go r.relayService.Start() go r.filterService.Start() + if r.privateService != nil { + go r.privateService.Start() + } go func() { _ = r.server.ListenAndServe() }() diff --git a/waku/v2/utils/enr.go b/waku/v2/utils/enr.go index 7895a82b..490a9825 100644 --- a/waku/v2/utils/enr.go +++ b/waku/v2/utils/enr.go @@ -111,7 +111,7 @@ func GetENRandIP(addr ma.Multiaddr, wakuFlags WakuEnrBitfield, privK *ecdsa.Priv p2pAddr, err := ma.NewMultiaddr("/p2p/" + p2p) if err != nil { - return nil, nil, fmt.Errorf("Could not create p2p addr: %w", err) + return nil, nil, fmt.Errorf("could not create p2p addr: %w", err) } var fieldRaw []byte