From 241733e6a46c7d5a4b6c38a202e9dc9a5b5b07cd Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 12 Mar 2021 15:06:20 -0400 Subject: [PATCH] WakuRelay --- go.mod | 1 + go.sum | 8 +++ main.go | 97 ++++++++++++++++++++++++++++++++-- waku/v2/node/wakunode2.go | 18 ++----- waku/v2/protocol/waku_relay.go | 41 ++++++++++++++ 5 files changed, 149 insertions(+), 16 deletions(-) create mode 100644 waku/v2/protocol/waku_relay.go diff --git a/go.mod b/go.mod index bf6f021..3dd1394 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/ipfs/go-log/v2 v2.1.1 github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p-core v0.8.5 + github.com/libp2p/go-libp2p-pubsub v0.4.1 github.com/multiformats/go-multiaddr v0.3.1 github.com/multiformats/go-multiaddr-net v0.2.0 google.golang.org/protobuf v1.25.0 diff --git a/go.sum b/go.sum index 84164af..c17695d 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/aws/aws-sdk-go v1.25.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= +github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= @@ -383,6 +385,7 @@ github.com/libp2p/go-libp2p-circuit v0.1.4/go.mod h1:CY67BrEjKNDhdTk8UgBX1Y/H5c3 github.com/libp2p/go-libp2p-circuit v0.2.1/go.mod h1:BXPwYDN5A8z4OEY9sOfr2DUQMLQvKt/6oku45YUmjIo= github.com/libp2p/go-libp2p-circuit v0.4.0 h1:eqQ3sEYkGTtybWgr6JLqJY6QLtPWRErvFjFDfAOO1wc= github.com/libp2p/go-libp2p-circuit v0.4.0/go.mod h1:t/ktoFIUzM6uLQ+o1G6NuBl2ANhBKN9Bc8jRIk31MoA= +github.com/libp2p/go-libp2p-connmgr v0.2.4/go.mod h1:YV0b/RIm8NGPnnNWM7hG9Q38OeQiQfKhHCCs1++ufn0= github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= github.com/libp2p/go-libp2p-core v0.0.4/go.mod h1:jyuCQP356gzfCFtRKyvAbNkyeuxb7OlyhWZ3nls5d2I= github.com/libp2p/go-libp2p-core v0.2.0/go.mod h1:X0eyB0Gy93v0DZtSYbEM7RnMChm9Uv3j7yRXjO77xSI= @@ -432,6 +435,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.6 h1:2ACefBX23iMdJU9Ke+dcXt3w86MIryes github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= +github.com/libp2p/go-libp2p-pubsub v0.4.1 h1:j4umIg5nyus+sqNfU+FWvb9aeYFQH/A+nDFhWj+8yy8= +github.com/libp2p/go-libp2p-pubsub v0.4.1/go.mod h1:izkeMLvz6Ht8yAISXjx60XUQZMq9ZMe5h2ih4dLIBIQ= github.com/libp2p/go-libp2p-secio v0.1.0/go.mod h1:tMJo2w7h3+wN4pgU2LSYeiKPrfqBgkOsdiKK77hE7c8= github.com/libp2p/go-libp2p-secio v0.2.0/go.mod h1:2JdZepB8J5V9mBp79BmwsaPQhRPNN2NrnB2lKQcdy6g= github.com/libp2p/go-libp2p-secio v0.2.1/go.mod h1:cWtZpILJqkqrSkiYcDBh5lA3wbT2Q+hz3rJQq3iftD8= @@ -441,6 +446,7 @@ github.com/libp2p/go-libp2p-swarm v0.2.2/go.mod h1:fvmtQ0T1nErXym1/aa1uJEyN7JzaT github.com/libp2p/go-libp2p-swarm v0.2.3/go.mod h1:P2VO/EpxRyDxtChXz/VPVXyTnszHvokHKRhfkEgFKNM= github.com/libp2p/go-libp2p-swarm v0.2.8/go.mod h1:JQKMGSth4SMqonruY0a8yjlPVIkb0mdNSwckW7OYziM= github.com/libp2p/go-libp2p-swarm v0.3.0/go.mod h1:hdv95GWCTmzkgeJpP+GK/9D9puJegb7H57B5hWQR5Kk= +github.com/libp2p/go-libp2p-swarm v0.3.1/go.mod h1:hdv95GWCTmzkgeJpP+GK/9D9puJegb7H57B5hWQR5Kk= github.com/libp2p/go-libp2p-swarm v0.4.0 h1:hahq/ijRoeH6dgROOM8x7SeaKK5VgjjIr96vdrT+NUA= github.com/libp2p/go-libp2p-swarm v0.4.0/go.mod h1:XVFcO52VoLoo0eitSxNQWYq4D6sydGOweTOAjJNraCw= github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= @@ -738,6 +744,8 @@ github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvX github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= +github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= +github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208/go.mod h1:IotVbo4F+mw0EzQ08zFqg7pK3FebNXpaMsRy2RT+Ees= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= diff --git a/main.go b/main.go index 8b28cea..7952677 100644 --- a/main.go +++ b/main.go @@ -1,17 +1,90 @@ package main import ( + "context" "crypto/rand" + "encoding/json" "fmt" "io" "net" + "os" + "os/signal" + "syscall" + "time" golog "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/peer" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol" //node "waku/v2/node" ) +func topicName(name string) string { + return "topic:" + name +} +func JoinSomeTopic(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, t string) error { + // join the pubsub topic + topic, err := ps.Join(topicName(t)) + if err != nil { + return err + } + + // and subscribe to it + sub, err := topic.Subscribe() + if err != nil { + return err + } + + // start reading messages from the subscription in a loop + go readLoop(sub, ctx) + + go writeLoop(topic, ctx) + + return nil +} + +type Test struct { + Message string +} + +func writeLoop(topic *pubsub.Topic, ctx context.Context) { + m := Test{ + Message: "Hello", + } + msgBytes, err := json.Marshal(m) + if err != nil { + fmt.Println(err) + return + } + + for { + time.Sleep(2 * time.Second) + fmt.Println("Send 'Hello'...") + topic.Publish(ctx, msgBytes) + } + +} + +func readLoop(sub *pubsub.Subscription, ctx context.Context) { + for { + msg, err := sub.Next(ctx) + if err != nil { + fmt.Println(err) + return + } + + cm := new(Test) + err = json.Unmarshal(msg.Data, cm) + if err != nil { + return + } + + fmt.Println("Received: " + cm.Message) + } +} + func main() { golog.SetAllLoggers(golog.LevelInfo) // Change to INFO for extra info @@ -25,14 +98,32 @@ func main() { panic(err) } - wakuNode, err := node.New(prvKey, hostAddr, extAddr) + ctx := context.Background() + + wakuNode, err := node.New(ctx, prvKey, hostAddr, extAddr) if err != nil { fmt.Print(err) } - _ = wakuNode // TODO: Just to shut up the compiler. Do a proper test case and remove this + ps, err := protocol.NewWakuRelaySub(ctx, wakuNode.Host) + if err != nil { + panic(err) + } - select {} // Run forever + err = JoinSomeTopic(ctx, ps, wakuNode.Host.ID(), "test") + if err != nil { + panic(err) + } + // wait for a SIGINT or SIGTERM signal + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + <-ch + fmt.Println("Received signal, shutting down...") + + // shut the node down + if err := wakuNode.Stop(); err != nil { + panic(err) + } } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index fbfd5b5..4b513a6 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -39,7 +39,7 @@ type MessagePair struct { // NOTE based on Eth2Node in NBC eth2_network.nim type WakuNode struct { peerManager *PeerManager - sw host.Host + Host host.Host // wakuRelay *WakuRelay // wakuStore *WakuStore // wakuFilter *WakuFilter @@ -52,14 +52,12 @@ type WakuNode struct { //filters *Filters subscriptions protocol.MessageNotificationSubscriptions // rng *BrHmacDrbgContext // ??? - - cancel context.CancelFunc } // Public API // -func New(nodeKey crypto.PrivKey, hostAddr net.Addr, extAddr net.Addr) (*WakuNode, error) { +func New(ctx context.Context, nodeKey crypto.PrivKey, hostAddr net.Addr, extAddr net.Addr) (*WakuNode, error) { // Creates a Waku Node. if hostAddr == nil { return nil, errors.New("Host address cannot be null") @@ -89,20 +87,14 @@ func New(nodeKey crypto.PrivKey, hostAddr net.Addr, extAddr net.Addr) (*WakuNode libp2p.EnableNATService(), // TODO: what is this? } - // The context governs the lifetime of the libp2p node. - // Cancelling it will stop the the host. - ctx, cancel := context.WithCancel(context.Background()) - host, err := libp2p.New(ctx, opts...) if err != nil { - cancel() return nil, err } w := new(WakuNode) w.peerManager = NewPeerManager(host) - w.sw = host - w.cancel = cancel + w.Host = host // w.filters = new(Filters) hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().Pretty())) @@ -114,10 +106,10 @@ func New(nodeKey crypto.PrivKey, hostAddr net.Addr, extAddr net.Addr) (*WakuNode return w, nil } -func (node *WakuNode) Stop() { +func (node *WakuNode) Stop() error { // TODO: //if not node.wakuRelay.isNil: // await node.wakuRelay.stop() - node.cancel() + return node.Host.Close() } diff --git a/waku/v2/protocol/waku_relay.go b/waku/v2/protocol/waku_relay.go new file mode 100644 index 0000000..b09b068 --- /dev/null +++ b/waku/v2/protocol/waku_relay.go @@ -0,0 +1,41 @@ +// Waku Relay module. Thin layer on top of GossipSub. +// +// See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md +// for spec. + +package protocol + +import ( + "context" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/protocol" + libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" + pubsub "github.com/libp2p/go-libp2p-pubsub" +) + +const WakuRelayCodec = libp2pProtocol.ID("/vac/waku/relay/2.0.0-beta2") + +type WakuRelaySubRouter struct { + *pubsub.GossipSubRouter + p *pubsub.PubSub +} + +func NewWakuRelaySub(ctx context.Context, h host.Host) (*pubsub.PubSub, error) { + opts := []pubsub.Option{ + pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), + } + gossipSub, err := pubsub.NewGossipSub(ctx, h, opts...) + + if err != nil { + return nil, err + } + + w := new(WakuRelaySubRouter) + w.p = gossipSub + return gossipSub, nil +} + +func (ws *WakuRelaySubRouter) Protocols() []protocol.ID { + return []libp2pProtocol.ID{WakuRelayCodec, pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID} +}