This commit is contained in:
Richard Ramos 2021-03-22 12:45:13 -04:00
parent 1345809aba
commit c1bcba756c
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
9 changed files with 163 additions and 79 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
store "github.com/status-im/go-waku/waku/v2/protocol/waku_store"
ethNodeCrypto "github.com/status-im/status-go/eth-node/crypto" ethNodeCrypto "github.com/status-im/status-go/eth-node/crypto"
) )
@ -27,7 +28,8 @@ func randomHex(n int) (string, error) {
} }
func write(wakuNode *node.WakuNode, msgContent string) { func write(wakuNode *node.WakuNode, msgContent string) {
var contentTopic uint32 = 1
var contentTopic uint32 = 1735289188
var version uint32 = 0 var version uint32 = 0
payload, err := node.Encode([]byte(wakuNode.ID()+" says "+msgContent), &node.KeyInfo{Kind: node.None}, 0) payload, err := node.Encode([]byte(wakuNode.ID()+" says "+msgContent), &node.KeyInfo{Kind: node.None}, 0)
@ -43,7 +45,7 @@ func write(wakuNode *node.WakuNode, msgContent string) {
func writeLoop(wakuNode *node.WakuNode) { func writeLoop(wakuNode *node.WakuNode) {
for { for {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
write(wakuNode, "Hey!") write(wakuNode, fmt.Sprint("Hey - ", time.Now().Unix()))
} }
} }
@ -64,6 +66,27 @@ func readLoop(wakuNode *node.WakuNode) {
} }
} }
type DBStore struct {
store.MessageProvider
}
func (dbStore *DBStore) Put(message *protocol.WakuMessage) error {
fmt.Println("TODO: Implement MessageProvider.Put")
return nil
}
func (dbStore *DBStore) GetAll() ([]*protocol.WakuMessage, error) {
fmt.Println("TODO: Implement MessageProvider.GetAll. Returning a sample message")
exampleMessage := new(protocol.WakuMessage)
var contentTopic uint32 = 1
var version uint32 = 0
exampleMessage.ContentTopic = &contentTopic
exampleMessage.Payload = []byte("Hello!")
exampleMessage.Version = &version
return []*protocol.WakuMessage{exampleMessage}, nil
}
var rootCmd = &cobra.Command{ var rootCmd = &cobra.Command{
Use: "waku", Use: "waku",
Short: "Start a waku node", Short: "Start a waku node",
@ -110,7 +133,7 @@ var rootCmd = &cobra.Command{
} }
if store { if store {
wakuNode.MountStore() wakuNode.MountStore(new(DBStore))
} }
if startStore { if startStore {
@ -177,9 +200,7 @@ var rootCmd = &cobra.Command{
fmt.Println("\n\n\nReceived signal, shutting down...") fmt.Println("\n\n\nReceived signal, shutting down...")
// shut the node down // shut the node down
if err := wakuNode.Stop(); err != nil { wakuNode.Stop()
panic(err)
}
}, },
} }

2
go.mod
View File

@ -9,8 +9,10 @@ require (
github.com/ethereum/go-ethereum v1.10.1 github.com/ethereum/go-ethereum v1.10.1
github.com/gogo/protobuf v1.3.1 github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.4.3 github.com/golang/protobuf v1.4.3
github.com/ipfs/go-log v1.0.4
github.com/ipfs/go-log/v2 v2.1.1 github.com/ipfs/go-log/v2 v2.1.1
github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p v0.13.0
github.com/libp2p/go-libp2p-connmgr v0.2.4
github.com/libp2p/go-libp2p-core v0.8.5 github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-libp2p-peerstore v0.2.6 github.com/libp2p/go-libp2p-peerstore v0.2.6
github.com/libp2p/go-libp2p-pubsub v0.4.1 github.com/libp2p/go-libp2p-pubsub v0.4.1

2
go.sum
View File

@ -67,6 +67,7 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aws/aws-sdk-go v1.25.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.25.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@ -434,6 +435,7 @@ github.com/libp2p/go-libp2p-circuit v0.1.4/go.mod h1:CY67BrEjKNDhdTk8UgBX1Y/H5c3
github.com/libp2p/go-libp2p-circuit v0.2.1/go.mod h1:BXPwYDN5A8z4OEY9sOfr2DUQMLQvKt/6oku45YUmjIo= github.com/libp2p/go-libp2p-circuit v0.2.1/go.mod h1:BXPwYDN5A8z4OEY9sOfr2DUQMLQvKt/6oku45YUmjIo=
github.com/libp2p/go-libp2p-circuit v0.4.0 h1:eqQ3sEYkGTtybWgr6JLqJY6QLtPWRErvFjFDfAOO1wc= github.com/libp2p/go-libp2p-circuit v0.4.0 h1:eqQ3sEYkGTtybWgr6JLqJY6QLtPWRErvFjFDfAOO1wc=
github.com/libp2p/go-libp2p-circuit v0.4.0/go.mod h1:t/ktoFIUzM6uLQ+o1G6NuBl2ANhBKN9Bc8jRIk31MoA= github.com/libp2p/go-libp2p-circuit v0.4.0/go.mod h1:t/ktoFIUzM6uLQ+o1G6NuBl2ANhBKN9Bc8jRIk31MoA=
github.com/libp2p/go-libp2p-connmgr v0.2.4 h1:TMS0vc0TCBomtQJyWr7fYxcVYYhx+q/2gF++G5Jkl/w=
github.com/libp2p/go-libp2p-connmgr v0.2.4/go.mod h1:YV0b/RIm8NGPnnNWM7hG9Q38OeQiQfKhHCCs1++ufn0= github.com/libp2p/go-libp2p-connmgr v0.2.4/go.mod h1:YV0b/RIm8NGPnnNWM7hG9Q38OeQiQfKhHCCs1++ufn0=
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/libp2p/go-libp2p-core v0.0.4/go.mod h1:jyuCQP356gzfCFtRKyvAbNkyeuxb7OlyhWZ3nls5d2I= github.com/libp2p/go-libp2p-core v0.0.4/go.mod h1:jyuCQP356gzfCFtRKyvAbNkyeuxb7OlyhWZ3nls5d2I=

11
waku.go
View File

@ -1,7 +1,16 @@
package main package main
import "github.com/status-im/go-waku/cmd" import (
logging "github.com/ipfs/go-log"
"github.com/status-im/go-waku/cmd"
)
func main() { func main() {
lvl, err := logging.LevelFromString("info")
if err != nil {
panic(err)
}
logging.SetAllLoggers(lvl)
cmd.Execute() cmd.Execute()
} }

View File

@ -5,24 +5,26 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"errors" "errors"
"fmt" "fmt"
"log"
"net" "net"
"sync" "sync"
"time" "time"
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net" manet "github.com/multiformats/go-multiaddr-net"
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
store "github.com/status-im/go-waku/waku/v2/protocol/waku_store" store "github.com/status-im/go-waku/waku/v2/protocol/waku_store"
) )
var log = logging.Logger("wakunode")
// Default clientId // Default clientId
const clientId string = "Go Waku v2 node" const clientId string = "Go Waku v2 node"
@ -52,28 +54,25 @@ type Subscription struct {
} }
type WakuNode struct { type WakuNode struct {
// peerManager *PeerManager
host host.Host host host.Host
pubsub *pubsub.PubSub pubsub *pubsub.PubSub
store *store.WakuStore store *store.WakuStore
topics map[Topic]*pubsub.Topic topics map[Topic]*pubsub.Topic
topicsLock sync.Mutex topicsMutex sync.Mutex
// peerInfo peer.AddrInfo subscriptions []*Subscription
// TODO Revisit messages field indexing as well as if this should be Message or WakuMessage subscriptionsMutex sync.Mutex
// messages []MessagePair
subscriptions protocol.MessageNotificationSubscriptions ctx context.Context
ctx context.Context cancel context.CancelFunc
cancel context.CancelFunc privKey crypto.PrivKey
privKey crypto.PrivKey
} }
func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr net.Addr, extAddr net.Addr) (*WakuNode, error) { func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr net.Addr, extAddr net.Addr, opts ...libp2p.Option) (*WakuNode, error) {
// Creates a Waku Node. // Creates a Waku Node.
if hostAddr == nil { if hostAddr == nil {
return nil, errors.New("Host address cannot be null") return nil, errors.New("host address cannot be null")
} }
var multiAddresses []ma.Multiaddr var multiAddresses []ma.Multiaddr
@ -93,14 +92,14 @@ func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr net.Addr, extA
nodeKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(privKey)) nodeKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(privKey))
opts := []libp2p.Option{ opts = append(opts,
libp2p.ListenAddrs(multiAddresses...), libp2p.ListenAddrs(multiAddresses...),
libp2p.Identity(nodeKey), libp2p.Identity(nodeKey),
libp2p.DefaultTransports, // libp2p.DefaultTransports,
libp2p.NATPortMap(), // Attempt to open ports using uPNP for NATed hosts. libp2p.NATPortMap(), // Attempt to open ports using uPNP for NATed hosts.
//libp2p.DisableRelay(), // TODO: is this needed? libp2p.EnableNATService(), // TODO: is this needed?)
//libp2p.EnableNATService(), // TODO: is this needed? libp2p.ConnectionManager(connmgr.NewConnManager(200, 300, 0)), // ?
} )
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
_ = cancel _ = cancel
@ -111,7 +110,6 @@ func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr net.Addr, extA
} }
w := new(WakuNode) w := new(WakuNode)
//w.peerManager = NewPeerManager(host)
w.pubsub = nil w.pubsub = nil
w.host = host w.host = host
w.cancel = cancel w.cancel = cancel
@ -122,16 +120,22 @@ func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr net.Addr, extA
hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().Pretty())) hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().Pretty()))
for _, addr := range host.Addrs() { for _, addr := range host.Addrs() {
fullAddr := addr.Encapsulate(hostInfo) fullAddr := addr.Encapsulate(hostInfo)
log.Printf("Listening on %s\n", fullAddr) log.Info("Listening on", fullAddr)
} }
return w, nil return w, nil
} }
func (w *WakuNode) Stop() error { func (w *WakuNode) Stop() {
w.subscriptionsMutex.Lock()
defer w.subscriptionsMutex.Unlock()
defer w.cancel() defer w.cancel()
// TODO: Is it necessary to stop WakuRelaySubRouter?
return nil for _, sub := range w.subscriptions {
sub.Unsubscribe()
}
w.subscriptions = nil
} }
func (w *WakuNode) Host() host.Host { func (w *WakuNode) Host() host.Host {
@ -163,14 +167,12 @@ func (w *WakuNode) MountRelay() error {
return nil return nil
} }
func (w *WakuNode) MountStore() error { func (w *WakuNode) MountStore(s store.MessageProvider) error {
sub, err := w.Subscribe(nil) sub, err := w.Subscribe(nil)
if err != nil { if err != nil {
return err return err
} }
w.store = store.NewWakuStore(w.ctx, w.host, sub.C, s)
// TODO: kill subscription on close
w.store = store.NewWakuStore(w.ctx, w.host, sub.C, new(store.MessageProvider)) // TODO: pass store
return nil return nil
} }
@ -263,14 +265,14 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
case <-nextMsgTicker.C: case <-nextMsgTicker.C:
msg, err := sub.Next(ctx) msg, err := sub.Next(ctx)
if err != nil { if err != nil {
fmt.Println("Error receiving message", err) log.Error("error receiving message", err)
close(subscription.quit) close(subscription.quit)
return return
} }
wakuMessage := &protocol.WakuMessage{} wakuMessage := &protocol.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
fmt.Println("Error decoding WakuMessage: ", err) // TODO: use log lib log.Error("could not decode message", err) // TODO: use log lib
return return
} }
@ -279,6 +281,11 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
} }
}(node.ctx, sub) }(node.ctx, sub)
node.subscriptionsMutex.Lock()
defer node.subscriptionsMutex.Unlock()
node.subscriptions = append(node.subscriptions, subscription)
return subscription, nil return subscription, nil
} }
@ -298,8 +305,8 @@ func (subs *Subscription) IsClosed() bool {
} }
func (node *WakuNode) upsertTopic(topic *Topic) (*pubsub.Topic, error) { func (node *WakuNode) upsertTopic(topic *Topic) (*pubsub.Topic, error) {
defer node.topicsLock.Unlock() defer node.topicsMutex.Unlock()
node.topicsLock.Lock() node.topicsMutex.Lock()
var t Topic = DefaultWakuTopic var t Topic = DefaultWakuTopic
if topic != nil { if topic != nil {
@ -328,10 +335,11 @@ func (node *WakuNode) Publish(message *protocol.WakuMessage, topic *Topic) error
} }
if message == nil { if message == nil {
return errors.New("Message can't be null") return errors.New("message can't be null")
} }
pubSubTopic, err := node.upsertTopic(topic) pubSubTopic, err := node.upsertTopic(topic)
if err != nil { if err != nil {
return err return err
} }
@ -342,6 +350,7 @@ func (node *WakuNode) Publish(message *protocol.WakuMessage, topic *Topic) error
} }
err = pubSubTopic.Publish(node.ctx, out) err = pubSubTopic.Publish(node.ctx, out)
if err != nil { if err != nil {
return err return err
} }
@ -350,7 +359,6 @@ func (node *WakuNode) Publish(message *protocol.WakuMessage, topic *Topic) error
} }
func (w *WakuNode) DialPeer(address string) error { func (w *WakuNode) DialPeer(address string) error {
p, err := ma.NewMultiaddr(address) p, err := ma.NewMultiaddr(address)
if err != nil { if err != nil {
return err return err

View File

@ -26,8 +26,9 @@ type WakuMessage struct {
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3,oneof" json:"payload,omitempty"` Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3,oneof" json:"payload,omitempty"`
ContentTopic *uint32 `protobuf:"fixed32,2,opt,name=contentTopic,proto3,oneof" json:"contentTopic,omitempty"` ContentTopic *uint32 `protobuf:"varint,2,opt,name=contentTopic,proto3,oneof" json:"contentTopic,omitempty"`
Version *uint32 `protobuf:"varint,3,opt,name=version,proto3,oneof" json:"version,omitempty"` Version *uint32 `protobuf:"varint,3,opt,name=version,proto3,oneof" json:"version,omitempty"`
Proof []byte `protobuf:"bytes,4,opt,name=proof,proto3,oneof" json:"proof,omitempty"`
} }
func (x *WakuMessage) Reset() { func (x *WakuMessage) Reset() {
@ -83,22 +84,31 @@ func (x *WakuMessage) GetVersion() uint32 {
return 0 return 0
} }
func (x *WakuMessage) GetProof() []byte {
if x != nil {
return x.Proof
}
return nil
}
var File_waku_message_proto protoreflect.FileDescriptor var File_waku_message_proto protoreflect.FileDescriptor
var file_waku_message_proto_rawDesc = []byte{ var file_waku_message_proto_rawDesc = []byte{
0x0a, 0x12, 0x77, 0x61, 0x6b, 0x75, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x0a, 0x12, 0x77, 0x61, 0x6b, 0x75, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x22, 0x9d, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x22, 0xc2,
0x01, 0x0a, 0x0b, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1d, 0x01, 0x0a, 0x0b, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1d,
0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x48,
0x00, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x88, 0x01, 0x01, 0x12, 0x27, 0x0a, 0x00, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x88, 0x01, 0x01, 0x12, 0x27, 0x0a,
0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20,
0x01, 0x28, 0x07, 0x48, 0x01, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x01, 0x28, 0x0d, 0x48, 0x01, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f,
0x70, 0x69, 0x63, 0x88, 0x01, 0x01, 0x12, 0x1d, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x70, 0x69, 0x63, 0x88, 0x01, 0x01, 0x12, 0x1d, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f,
0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x02, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x02, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69,
0x6f, 0x6e, 0x88, 0x01, 0x01, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x6f, 0x6e, 0x88, 0x01, 0x01, 0x12, 0x19, 0x0a, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x04,
0x64, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x03, 0x52, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x88, 0x01, 0x01,
0x69, 0x63, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x62, 0x06, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x42, 0x0f, 0x0a, 0x0d,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x0a, 0x0a,
0x08, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x08, 0x0a, 0x06, 0x5f, 0x70, 0x72,
0x6f, 0x6f, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (

View File

@ -4,6 +4,7 @@ package protocol;
message WakuMessage { message WakuMessage {
optional bytes payload = 1; optional bytes payload = 1;
optional fixed32 contentTopic = 2; optional uint32 contentTopic = 2;
optional uint32 version = 3; optional uint32 version = 3;
optional bytes proof = 4;
} }

View File

@ -6,10 +6,12 @@ package protocol
import ( import (
"context" "context"
"crypto/sha256"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
) )
const WakuRelayProtocol = libp2pProtocol.ID("/vac/waku/relay/2.0.0-beta2") const WakuRelayProtocol = libp2pProtocol.ID("/vac/waku/relay/2.0.0-beta2")
@ -18,11 +20,16 @@ type WakuRelay struct {
p *pubsub.PubSub p *pubsub.PubSub
} }
// Once https://github.com/status-im/nim-waku/issues/420 is fixed, implement a custom messageIdFn
func msgIdFn(pmsg *pubsub_pb.Message) string {
hash := sha256.Sum256(pmsg.Data)
return string(hash[:])
}
func NewWakuRelay(ctx context.Context, h host.Host, opts ...pubsub.Option) (*pubsub.PubSub, error) { func NewWakuRelay(ctx context.Context, h host.Host, opts ...pubsub.Option) (*pubsub.PubSub, error) {
// Once https://github.com/status-im/nim-waku/issues/420 is fixed, implement a custom messageIdFn
//opts = append(opts, pubsub.WithNoAuthor())
//opts = append(opts, pubsub.WithMessageIdFn(messageIdFn))
opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
opts = append(opts, pubsub.WithNoAuthor())
opts = append(opts, pubsub.WithMessageIdFn(msgIdFn))
gossipSub, err := pubsub.NewGossipSub(ctx, h, []libp2pProtocol.ID{WakuRelayProtocol}, opts...) gossipSub, err := pubsub.NewGossipSub(ctx, h, []libp2pProtocol.ID{WakuRelayProtocol}, opts...)

View File

@ -7,23 +7,27 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt"
"io/ioutil" "io/ioutil"
"log"
"sort" "sort"
"sync" "sync"
"time" "time"
"github.com/cruxic/go-hmac-drbg/hmacdrbg" "github.com/cruxic/go-hmac-drbg/hmacdrbg"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/peerstore"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
var log = logging.Logger("wakustore")
const WakuStoreProtocolId = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta2") const WakuStoreProtocolId = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta2")
const MaxPageSize = 100 // Maximum number of waku messages in each page const MaxPageSize = 100 // Maximum number of waku messages in each page
const ConnectionTimeout = 10 * time.Second const ConnectionTimeout = 10 * time.Second
@ -148,6 +152,7 @@ func (w *WakuStore) FindMessages(query *protocol.HistoryQuery) *protocol.History
type MessageProvider interface { type MessageProvider interface {
GetAll() ([]*protocol.WakuMessage, error) GetAll() ([]*protocol.WakuMessage, error)
Put(message *protocol.WakuMessage) error
} }
type IndexedWakuMessage struct { type IndexedWakuMessage struct {
@ -156,14 +161,16 @@ type IndexedWakuMessage struct {
} }
type WakuStore struct { type WakuStore struct {
msg chan *protocol.WakuMessage msg chan *protocol.WakuMessage
messages []IndexedWakuMessage messages []IndexedWakuMessage
msgProvider *MessageProvider messagesMutex sync.Mutex
msgProvider MessageProvider
h host.Host h host.Host
ctx context.Context ctx context.Context
} }
func NewWakuStore(ctx context.Context, h host.Host, msg chan *protocol.WakuMessage, p *MessageProvider) *WakuStore { func NewWakuStore(ctx context.Context, h host.Host, msg chan *protocol.WakuMessage, p MessageProvider) *WakuStore {
wakuStore := new(WakuStore) wakuStore := new(WakuStore)
wakuStore.msg = msg wakuStore.msg = msg
wakuStore.msgProvider = p wakuStore.msgProvider = p
@ -175,30 +182,47 @@ func NewWakuStore(ctx context.Context, h host.Host, msg chan *protocol.WakuMessa
func (store *WakuStore) Start() { func (store *WakuStore) Start() {
store.h.SetStreamHandler(WakuStoreProtocolId, store.onRequest) store.h.SetStreamHandler(WakuStoreProtocolId, store.onRequest)
go store.processMessages()
// TODO: Load all messages messages, err := store.msgProvider.GetAll()
// proc onData(timestamp: uint64, msg: WakuMessage) = if err != nil {
// ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex())) log.Error("could not load DBProvider messages")
// let res = ws.store.getAll(onData) return
}
for _, msg := range messages {
idx, err := computeIndex(msg)
if err != nil {
log.Error("could not calculate message index", err)
continue
}
store.messages = append(store.messages, IndexedWakuMessage{msg: msg, index: idx})
}
go store.storeIncomingMessages()
} }
func (store *WakuStore) processMessages() { func (store *WakuStore) storeIncomingMessages() {
for message := range store.msg { for message := range store.msg {
index, err := computeIndex(message) index, err := computeIndex(message)
if err != nil { if err != nil {
log.Println(err) log.Error("could not calculate message index", err)
continue continue
} }
store.messagesMutex.Lock()
store.messages = append(store.messages, IndexedWakuMessage{msg: message, index: index}) store.messages = append(store.messages, IndexedWakuMessage{msg: message, index: index})
store.messagesMutex.Unlock()
if store.msgProvider == nil { if store.msgProvider == nil {
continue continue
} }
// let res = store.msgProvider.put(index, msg) // TODO: store in the DB err = store.msgProvider.Put(message) // Should the index be stored?
if err != nil {
log.Error("could not store message", err)
continue
}
} }
} }
@ -212,17 +236,17 @@ func (store *WakuStore) onRequest(s network.Stream) {
if err != nil { if err != nil {
s.Reset() s.Reset()
log.Println(err) log.Error("error reading request", err)
return return
} }
proto.Unmarshal(buf, historyRPCRequest) proto.Unmarshal(buf, historyRPCRequest)
if err != nil { if err != nil {
log.Println(err) log.Error("error decoding request", err)
return return
} }
log.Printf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer()) log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer()))
historyResponseRPC := &protocol.HistoryRPC{} historyResponseRPC := &protocol.HistoryRPC{}
historyResponseRPC.RequestId = historyRPCRequest.RequestId historyResponseRPC.RequestId = historyRPCRequest.RequestId
@ -230,16 +254,16 @@ func (store *WakuStore) onRequest(s network.Stream) {
message, err := proto.Marshal(historyResponseRPC) message, err := proto.Marshal(historyResponseRPC)
if err != nil { if err != nil {
log.Println(err) log.Error("error encoding response", err)
return return
} }
_, err = s.Write(message) _, err = s.Write(message)
if err != nil { if err != nil {
log.Println(err) log.Error("error writing response", err)
s.Reset() s.Reset()
} else { } else {
log.Printf("%s: Response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String()) log.Info(fmt.Sprintf("%s: Response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String()))
} }
} }
@ -319,7 +343,7 @@ func (store *WakuStore) selectPeer() *peer.ID {
for _, peer := range store.h.Peerstore().Peers() { for _, peer := range store.h.Peerstore().Peers() {
protocols, err := store.h.Peerstore().SupportsProtocols(peer, string(WakuStoreProtocolId)) protocols, err := store.h.Peerstore().SupportsProtocols(peer, string(WakuStoreProtocolId))
if err != nil { if err != nil {
log.Println("error obtaining the protocols supported by peers", err) log.Error("error obtaining the protocols supported by peers", err)
return nil return nil
} }
@ -364,7 +388,7 @@ func GenerateRequestId() string {
} }
if !rng.Generate(randData) { if !rng.Generate(randData) {
log.Fatal("could not generate random request id") log.Error("could not generate random request id")
} }
} }
return hex.EncodeToString(randData) return hex.EncodeToString(randData)
@ -381,7 +405,7 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery) (*protocol.HistoryRespon
connOpt, err := store.h.NewStream(ctx, *peer, WakuStoreProtocolId) connOpt, err := store.h.NewStream(ctx, *peer, WakuStoreProtocolId)
if err != nil { if err != nil {
log.Println("failed to connect to remote peer", err) log.Info("failed to connect to remote peer", err)
return nil, err return nil, err
} }
@ -389,7 +413,7 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery) (*protocol.HistoryRespon
message, err := proto.Marshal(historyRequest) message, err := proto.Marshal(historyRequest)
if err != nil { if err != nil {
log.Println(err) log.Error("could not encode request", err)
return nil, err return nil, err
} }
@ -398,20 +422,20 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery) (*protocol.HistoryRespon
_, err = connOpt.Write(message) _, err = connOpt.Write(message)
if err != nil { if err != nil {
log.Println(err) log.Error("could not write request", err)
return nil, err return nil, err
} }
buf, err := ioutil.ReadAll(connOpt) buf, err := ioutil.ReadAll(connOpt)
if err != nil { if err != nil {
log.Println("failed to read response", err) log.Error("could not read response", err)
return nil, err return nil, err
} }
historyResponseRPC := &protocol.HistoryRPC{} historyResponseRPC := &protocol.HistoryRPC{}
proto.Unmarshal(buf, historyResponseRPC) proto.Unmarshal(buf, historyResponseRPC)
if err != nil { if err != nil {
log.Println("failed to decode response", err) log.Error("could not decode response", err)
return nil, err return nil, err
} }