WakuRelay

This commit is contained in:
Richard Ramos 2021-03-12 15:06:20 -04:00
parent ccff3dc8f8
commit 241733e6a4
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
5 changed files with 149 additions and 16 deletions

1
go.mod
View File

@ -7,6 +7,7 @@ require (
github.com/ipfs/go-log/v2 v2.1.1
github.com/libp2p/go-libp2p v0.13.0
github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-libp2p-pubsub v0.4.1
github.com/multiformats/go-multiaddr v0.3.1
github.com/multiformats/go-multiaddr-net v0.2.0
google.golang.org/protobuf v1.25.0

8
go.sum
View File

@ -54,6 +54,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aws/aws-sdk-go v1.25.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
@ -383,6 +385,7 @@ github.com/libp2p/go-libp2p-circuit v0.1.4/go.mod h1:CY67BrEjKNDhdTk8UgBX1Y/H5c3
github.com/libp2p/go-libp2p-circuit v0.2.1/go.mod h1:BXPwYDN5A8z4OEY9sOfr2DUQMLQvKt/6oku45YUmjIo=
github.com/libp2p/go-libp2p-circuit v0.4.0 h1:eqQ3sEYkGTtybWgr6JLqJY6QLtPWRErvFjFDfAOO1wc=
github.com/libp2p/go-libp2p-circuit v0.4.0/go.mod h1:t/ktoFIUzM6uLQ+o1G6NuBl2ANhBKN9Bc8jRIk31MoA=
github.com/libp2p/go-libp2p-connmgr v0.2.4/go.mod h1:YV0b/RIm8NGPnnNWM7hG9Q38OeQiQfKhHCCs1++ufn0=
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/libp2p/go-libp2p-core v0.0.4/go.mod h1:jyuCQP356gzfCFtRKyvAbNkyeuxb7OlyhWZ3nls5d2I=
github.com/libp2p/go-libp2p-core v0.2.0/go.mod h1:X0eyB0Gy93v0DZtSYbEM7RnMChm9Uv3j7yRXjO77xSI=
@ -432,6 +435,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.6 h1:2ACefBX23iMdJU9Ke+dcXt3w86MIryes
github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-pubsub v0.4.1 h1:j4umIg5nyus+sqNfU+FWvb9aeYFQH/A+nDFhWj+8yy8=
github.com/libp2p/go-libp2p-pubsub v0.4.1/go.mod h1:izkeMLvz6Ht8yAISXjx60XUQZMq9ZMe5h2ih4dLIBIQ=
github.com/libp2p/go-libp2p-secio v0.1.0/go.mod h1:tMJo2w7h3+wN4pgU2LSYeiKPrfqBgkOsdiKK77hE7c8=
github.com/libp2p/go-libp2p-secio v0.2.0/go.mod h1:2JdZepB8J5V9mBp79BmwsaPQhRPNN2NrnB2lKQcdy6g=
github.com/libp2p/go-libp2p-secio v0.2.1/go.mod h1:cWtZpILJqkqrSkiYcDBh5lA3wbT2Q+hz3rJQq3iftD8=
@ -441,6 +446,7 @@ github.com/libp2p/go-libp2p-swarm v0.2.2/go.mod h1:fvmtQ0T1nErXym1/aa1uJEyN7JzaT
github.com/libp2p/go-libp2p-swarm v0.2.3/go.mod h1:P2VO/EpxRyDxtChXz/VPVXyTnszHvokHKRhfkEgFKNM=
github.com/libp2p/go-libp2p-swarm v0.2.8/go.mod h1:JQKMGSth4SMqonruY0a8yjlPVIkb0mdNSwckW7OYziM=
github.com/libp2p/go-libp2p-swarm v0.3.0/go.mod h1:hdv95GWCTmzkgeJpP+GK/9D9puJegb7H57B5hWQR5Kk=
github.com/libp2p/go-libp2p-swarm v0.3.1/go.mod h1:hdv95GWCTmzkgeJpP+GK/9D9puJegb7H57B5hWQR5Kk=
github.com/libp2p/go-libp2p-swarm v0.4.0 h1:hahq/ijRoeH6dgROOM8x7SeaKK5VgjjIr96vdrT+NUA=
github.com/libp2p/go-libp2p-swarm v0.4.0/go.mod h1:XVFcO52VoLoo0eitSxNQWYq4D6sydGOweTOAjJNraCw=
github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
@ -738,6 +744,8 @@ github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvX
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg=
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208/go.mod h1:IotVbo4F+mw0EzQ08zFqg7pK3FebNXpaMsRy2RT+Ees=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=

97
main.go
View File

@ -1,17 +1,90 @@
package main
import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io"
"net"
"os"
"os/signal"
"syscall"
"time"
golog "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol"
//node "waku/v2/node"
)
func topicName(name string) string {
return "topic:" + name
}
func JoinSomeTopic(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, t string) error {
// join the pubsub topic
topic, err := ps.Join(topicName(t))
if err != nil {
return err
}
// and subscribe to it
sub, err := topic.Subscribe()
if err != nil {
return err
}
// start reading messages from the subscription in a loop
go readLoop(sub, ctx)
go writeLoop(topic, ctx)
return nil
}
type Test struct {
Message string
}
func writeLoop(topic *pubsub.Topic, ctx context.Context) {
m := Test{
Message: "Hello",
}
msgBytes, err := json.Marshal(m)
if err != nil {
fmt.Println(err)
return
}
for {
time.Sleep(2 * time.Second)
fmt.Println("Send 'Hello'...")
topic.Publish(ctx, msgBytes)
}
}
func readLoop(sub *pubsub.Subscription, ctx context.Context) {
for {
msg, err := sub.Next(ctx)
if err != nil {
fmt.Println(err)
return
}
cm := new(Test)
err = json.Unmarshal(msg.Data, cm)
if err != nil {
return
}
fmt.Println("Received: " + cm.Message)
}
}
func main() {
golog.SetAllLoggers(golog.LevelInfo) // Change to INFO for extra info
@ -25,14 +98,32 @@ func main() {
panic(err)
}
wakuNode, err := node.New(prvKey, hostAddr, extAddr)
ctx := context.Background()
wakuNode, err := node.New(ctx, prvKey, hostAddr, extAddr)
if err != nil {
fmt.Print(err)
}
_ = wakuNode // TODO: Just to shut up the compiler. Do a proper test case and remove this
ps, err := protocol.NewWakuRelaySub(ctx, wakuNode.Host)
if err != nil {
panic(err)
}
select {} // Run forever
err = JoinSomeTopic(ctx, ps, wakuNode.Host.ID(), "test")
if err != nil {
panic(err)
}
// wait for a SIGINT or SIGTERM signal
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
<-ch
fmt.Println("Received signal, shutting down...")
// shut the node down
if err := wakuNode.Stop(); err != nil {
panic(err)
}
}

View File

@ -39,7 +39,7 @@ type MessagePair struct {
// NOTE based on Eth2Node in NBC eth2_network.nim
type WakuNode struct {
peerManager *PeerManager
sw host.Host
Host host.Host
// wakuRelay *WakuRelay
// wakuStore *WakuStore
// wakuFilter *WakuFilter
@ -52,14 +52,12 @@ type WakuNode struct {
//filters *Filters
subscriptions protocol.MessageNotificationSubscriptions
// rng *BrHmacDrbgContext // ???
cancel context.CancelFunc
}
// Public API
//
func New(nodeKey crypto.PrivKey, hostAddr net.Addr, extAddr net.Addr) (*WakuNode, error) {
func New(ctx context.Context, nodeKey crypto.PrivKey, hostAddr net.Addr, extAddr net.Addr) (*WakuNode, error) {
// Creates a Waku Node.
if hostAddr == nil {
return nil, errors.New("Host address cannot be null")
@ -89,20 +87,14 @@ func New(nodeKey crypto.PrivKey, hostAddr net.Addr, extAddr net.Addr) (*WakuNode
libp2p.EnableNATService(), // TODO: what is this?
}
// The context governs the lifetime of the libp2p node.
// Cancelling it will stop the the host.
ctx, cancel := context.WithCancel(context.Background())
host, err := libp2p.New(ctx, opts...)
if err != nil {
cancel()
return nil, err
}
w := new(WakuNode)
w.peerManager = NewPeerManager(host)
w.sw = host
w.cancel = cancel
w.Host = host
// w.filters = new(Filters)
hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().Pretty()))
@ -114,10 +106,10 @@ func New(nodeKey crypto.PrivKey, hostAddr net.Addr, extAddr net.Addr) (*WakuNode
return w, nil
}
func (node *WakuNode) Stop() {
func (node *WakuNode) Stop() error {
// TODO:
//if not node.wakuRelay.isNil:
// await node.wakuRelay.stop()
node.cancel()
return node.Host.Close()
}

View File

@ -0,0 +1,41 @@
// Waku Relay module. Thin layer on top of GossipSub.
//
// See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md
// for spec.
package protocol
import (
"context"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
const WakuRelayCodec = libp2pProtocol.ID("/vac/waku/relay/2.0.0-beta2")
type WakuRelaySubRouter struct {
*pubsub.GossipSubRouter
p *pubsub.PubSub
}
func NewWakuRelaySub(ctx context.Context, h host.Host) (*pubsub.PubSub, error) {
opts := []pubsub.Option{
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
}
gossipSub, err := pubsub.NewGossipSub(ctx, h, opts...)
if err != nil {
return nil, err
}
w := new(WakuRelaySubRouter)
w.p = gossipSub
return gossipSub, nil
}
func (ws *WakuRelaySubRouter) Protocols() []protocol.ID {
return []libp2pProtocol.ID{WakuRelayCodec, pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID}
}