Lightpush protocol

- Partially implements #20. Requires some tests
- Extracts wakurelay code to separate file
- Extracts request id gen to separate file
- Initial implementation of lightpush protocol
- Adds utils functions to obtain a message hash
- Publish receives a context to send a message
This commit is contained in:
Richard Ramos 2021-04-28 16:10:44 -04:00
parent 78f8b7e609
commit 3d8aae5b81
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
11 changed files with 1457 additions and 163 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/status-im/go-waku/waku/persistence/sqlite"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
)
var log = logging.Logger("wakunode")
@ -55,7 +56,7 @@ var rootCmd = &cobra.Command{
port, _ := cmd.Flags().GetInt("port")
enableWs, _ := cmd.Flags().GetBool("ws")
wsPort, _ := cmd.Flags().GetInt("ws-port")
relay, _ := cmd.Flags().GetBool("relay")
wakuRelay, _ := cmd.Flags().GetBool("relay")
key, _ := cmd.Flags().GetString("nodekey")
store, _ := cmd.Flags().GetBool("store")
useDB, _ := cmd.Flags().GetBool("use-db")
@ -115,7 +116,7 @@ var rootCmd = &cobra.Command{
nodeOpts = append(nodeOpts, node.WithLibP2POptions(libp2pOpts...))
if relay {
if wakuRelay {
nodeOpts = append(nodeOpts, node.WithWakuRelay())
}
@ -135,7 +136,7 @@ var rootCmd = &cobra.Command{
checkError(err, "Wakunode")
for _, t := range topics {
nodeTopic := node.Topic(t)
nodeTopic := relay.Topic(t)
_, err := wakuNode.Subscribe(&nodeTopic)
checkError(err, "Error subscring to topic")
}
@ -185,7 +186,7 @@ func init() {
rootCmd.Flags().Bool("ws", false, "Enable websockets support")
rootCmd.Flags().Int("ws-port", 9001, "Libp2p TCP listening port for websocket connection (0 for random)")
rootCmd.Flags().String("nodekey", "", "P2P node private key as hex (default random)")
rootCmd.Flags().StringSlice("topics", []string{string(node.DefaultWakuTopic)}, fmt.Sprintf("List of topics to listen (default %s)", node.DefaultWakuTopic))
rootCmd.Flags().StringSlice("topics", []string{string(relay.DefaultWakuTopic)}, fmt.Sprintf("List of topics to listen (default %s)", relay.DefaultWakuTopic))
rootCmd.Flags().StringSlice("staticnodes", []string{}, "Multiaddr of peer to directly connect with. Argument may be repeated")
rootCmd.Flags().Bool("relay", true, "Enable relay protocol")
rootCmd.Flags().Bool("store", false, "Enable store protocol")

View File

@ -4,7 +4,6 @@ import (
"database/sql"
"log"
gcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/store"
@ -120,8 +119,7 @@ func (d *DBStore) GetAll() ([]*protocol.Envelope, error) {
msg.Timestamp = float64(timestamp)
msg.Version = version
data, _ := msg.Marshal()
envelope := protocol.NewEnvelope(msg, pubsubTopic, len(data), gcrypto.Keccak256(data))
envelope := protocol.NewEnvelope(msg, pubsubTopic)
result = append(result, envelope)
}

View File

@ -7,7 +7,6 @@ import (
"sync"
"time"
gcrypto "github.com/ethereum/go-ethereum/crypto"
proto "github.com/golang/protobuf/proto"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p"
@ -16,7 +15,9 @@ import (
ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/status-im/go-waku/waku/v2/protocol/store"
wakurelay "github.com/status-im/go-wakurelay-pubsub"
)
@ -26,26 +27,19 @@ var log = logging.Logger("wakunode")
// Default clientId
const clientId string = "Go Waku v2 node"
type Topic string
const DefaultWakuTopic Topic = "/waku/2/default-waku/proto"
type Message []byte
type WakuNode struct {
host host.Host
opts *WakuNodeParameters
pubsub *wakurelay.PubSub
host host.Host
opts *WakuNodeParameters
topics map[Topic]bool
topicsMutex sync.Mutex
wakuRelayTopics map[Topic]*wakurelay.Topic
relay *relay.WakuRelay
lightPush *lightpush.WakuLightPush
subscriptions map[Topic][]*Subscription
subscriptions map[relay.Topic][]*Subscription
subscriptionsMutex sync.Mutex
bcaster Broadcaster
relaySubs map[Topic]*wakurelay.Subscription
bcaster Broadcaster
ctx context.Context
cancel context.CancelFunc
@ -81,14 +75,10 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w := new(WakuNode)
w.bcaster = NewBroadcaster(1024)
w.pubsub = nil
w.host = host
w.cancel = cancel
w.ctx = ctx
w.topics = make(map[Topic]bool)
w.wakuRelayTopics = make(map[Topic]*wakurelay.Topic)
w.relaySubs = make(map[Topic]*wakurelay.Subscription)
w.subscriptions = make(map[Topic][]*Subscription)
w.subscriptions = make(map[relay.Topic][]*Subscription)
w.opts = params
if params.enableRelay {
@ -117,7 +107,7 @@ func (w *WakuNode) Stop() {
defer w.subscriptionsMutex.Unlock()
defer w.cancel()
for topic, _ := range w.topics {
for _, topic := range w.relay.Topics() {
for _, sub := range w.subscriptions[topic] {
sub.Unsubscribe()
}
@ -143,27 +133,18 @@ func (w *WakuNode) ListenAddresses() []string {
return result
}
func (w *WakuNode) PubSub() *wakurelay.PubSub {
return w.pubsub
}
func (w *WakuNode) SetPubSub(pubSub *wakurelay.PubSub) {
w.pubsub = pubSub
func (w *WakuNode) Relay() *relay.WakuRelay {
return w.relay
}
func (w *WakuNode) mountRelay(opts ...wakurelay.Option) error {
ps, err := wakurelay.NewWakuRelaySub(w.ctx, w.host, opts...)
if err != nil {
return err
}
w.pubsub = ps
var err error
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...)
// TODO: filters
// TODO: rlnRelay
log.Info("Relay protocol started")
return nil
return err
}
func (w *WakuNode) startStore() error {
@ -216,24 +197,22 @@ func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime
return result, nil
}
func getTopic(topic *Topic) Topic {
var t Topic = DefaultWakuTopic
if topic != nil {
t = *topic
}
return t
}
func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
func (node *WakuNode) Subscribe(topic *relay.Topic) (*Subscription, error) {
// Subscribes to a PubSub topic.
// NOTE The data field SHOULD be decoded as a WakuMessage.
if node.pubsub == nil {
return nil, errors.New("PubSub hasn't been set. Execute mountWakuRelay() or setPubSub() first")
if node.relay == nil {
return nil, errors.New("WakuRelay hasn't been set.")
}
t := getTopic(topic)
t := relay.GetTopic(topic)
sub, isNew, err := node.relay.Subscribe(t)
// Subscribe store to topic
if isNew && node.opts.store != nil && node.opts.storeMsgs {
log.Info("Subscribing store to topic ", t)
node.bcaster.Register(node.opts.store.MsgC)
}
sub, err := node.upsertSubscription(t)
if err != nil {
return nil, err
}
@ -246,11 +225,12 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
node.subscriptionsMutex.Lock()
defer node.subscriptionsMutex.Unlock()
node.subscriptions[t] = append(node.subscriptions[t], subscription)
node.bcaster.Register(subscription.C)
go func(t Topic) {
go func(t relay.Topic) {
nextMsgTicker := time.NewTicker(time.Millisecond * 10)
defer nextMsgTicker.Stop()
@ -265,11 +245,9 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
msg, err := sub.Next(node.ctx)
if err != nil {
subscription.mutex.Lock()
node.topicsMutex.Lock()
for _, subscription := range node.subscriptions[t] {
subscription.Unsubscribe()
}
node.topicsMutex.Unlock()
subscription.mutex.Unlock()
return
}
@ -280,7 +258,7 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
return
}
envelope := protocol.NewEnvelope(wakuMessage, string(t), len(msg.Data), gcrypto.Keccak256(msg.Data))
envelope := protocol.NewEnvelope(wakuMessage, string(t))
node.bcaster.Submit(envelope)
}
@ -290,83 +268,49 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
return subscription, nil
}
func (node *WakuNode) upsertTopic(topic Topic) (*wakurelay.Topic, error) {
defer node.topicsMutex.Unlock()
node.topicsMutex.Lock()
node.topics[topic] = true
pubSubTopic, ok := node.wakuRelayTopics[topic]
if !ok { // Joins topic if node hasn't joined yet
newTopic, err := node.pubsub.Join(string(topic))
if err != nil {
return nil, err
}
node.wakuRelayTopics[topic] = newTopic
pubSubTopic = newTopic
}
return pubSubTopic, nil
}
func (node *WakuNode) upsertSubscription(topic Topic) (*wakurelay.Subscription, error) {
sub, ok := node.relaySubs[topic]
if !ok {
pubSubTopic, err := node.upsertTopic(topic)
if err != nil {
return nil, err
}
sub, err = pubSubTopic.Subscribe()
if err != nil {
return nil, err
}
node.relaySubs[topic] = sub
log.Info("Subscribing to topic ", topic)
if node.opts.store != nil && node.opts.storeMsgs {
log.Info("Subscribing store to topic ", topic)
node.bcaster.Register(node.opts.store.MsgC)
}
}
return sub, nil
}
func (node *WakuNode) Publish(message *pb.WakuMessage, topic *Topic) ([]byte, error) {
// Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
// `contentTopic` field for light node functionality. This field may be also
// be omitted.
if node.pubsub == nil {
return nil, errors.New("PubSub hasn't been set. Execute mountWakuRelay() or setPubSub() first")
func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic) ([]byte, error) {
if node.relay == nil {
return nil, errors.New("WakuRelay hasn't been set.")
}
if message == nil {
return nil, errors.New("message can't be null")
}
pubSubTopic, err := node.upsertTopic(getTopic(topic))
hash, err := node.relay.Publish(ctx, message, topic)
if err != nil {
return nil, err
}
out, err := proto.Marshal(message)
if err != nil {
return nil, err
}
err = pubSubTopic.Publish(node.ctx, out)
if err != nil {
return nil, err
}
hash := gcrypto.Keccak256(out)
return hash, nil
}
func (node *WakuNode) LightPush(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...lightpush.LightPushOption) ([]byte, error) {
if node.lightPush == nil {
return nil, errors.New("WakuLightPush hasn't been set.")
}
if message == nil {
return nil, errors.New("message can't be null")
}
req := new(pb.PushRequest)
req.Message = message
req.PubsubTopic = string(relay.GetTopic(topic))
response, err := node.lightPush.Request(ctx, req, opts...)
if err != nil {
return nil, err
}
if response.IsSuccess {
hash, _ := message.Hash()
return hash, nil
} else {
return nil, errors.New(response.Info)
}
}
func (w *WakuNode) DialPeer(address string) error {
p, err := ma.NewMultiaddr(address)
if err != nil {

View File

@ -9,12 +9,13 @@ type Envelope struct {
hash []byte
}
func NewEnvelope(msg *pb.WakuMessage, pubSubTopic string, size int, hash []byte) *Envelope {
func NewEnvelope(msg *pb.WakuMessage, pubSubTopic string) *Envelope {
data, _ := msg.Marshal()
return &Envelope{
msg: msg,
pubsubTopic: pubSubTopic,
size: size,
hash: hash,
size: len(data),
hash: pb.Hash(data),
}
}

View File

@ -0,0 +1,234 @@
package lightpush
import (
"context"
"encoding/hex"
"errors"
"fmt"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-msgio/protoio"
ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
)
var log = logging.Logger("waku_lightpush")
const WakuLightPushProtocolId = libp2pProtocol.ID("/vac/waku/lightpush/2.0.0-alpha1")
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrInvalidId = errors.New("invalid request id")
)
type WakuLightPush struct {
h host.Host
relay *relay.WakuRelay
ctx context.Context
}
func NewWakuLightPush(ctx context.Context, relay *relay.WakuRelay) *WakuLightPush {
wakuLP := new(WakuLightPush)
wakuLP.relay = relay
wakuLP.ctx = ctx
return wakuLP
}
func (wakuLP *WakuLightPush) Start(h host.Host) {
wakuLP.h = h
wakuLP.h.SetStreamHandler(WakuLightPushProtocolId, wakuLP.onRequest)
log.Info("Light Push protocol started")
}
func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
defer s.Close()
requestPushRPC := &pb.PushRPC{}
writer := protoio.NewDelimitedWriter(s)
reader := protoio.NewDelimitedReader(s, 64*1024)
err := reader.ReadMsg(requestPushRPC)
if err != nil {
log.Error("error reading request", err)
return
}
log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer()))
if requestPushRPC.Query != nil {
log.Info("lightpush push request")
pubSubTopic := relay.Topic(requestPushRPC.Query.PubsubTopic)
message := requestPushRPC.Query.Message
response := new(pb.PushResponse)
if wakuLP.relay != nil {
// XXX Assumes success, should probably be extended to check for network, peers, etc
_, err := wakuLP.relay.Publish(wakuLP.ctx, message, &pubSubTopic)
if err != nil {
response.IsSuccess = false
response.Info = "Could not publish message"
} else {
response.IsSuccess = true
response.Info = "Totally"
}
} else {
log.Debug("No relay protocol present, unsuccessful push")
response.IsSuccess = false
response.Info = "No relay protocol"
}
responsePushRPC := &pb.PushRPC{}
responsePushRPC.RequestId = requestPushRPC.RequestId
responsePushRPC.Response = response
err = writer.WriteMsg(responsePushRPC)
if err != nil {
log.Error("error writing response", err)
s.Reset()
} else {
log.Info(fmt.Sprintf("%s: Response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String()))
}
}
if requestPushRPC.Response != nil {
if requestPushRPC.Response.IsSuccess {
log.Info("lightpush message success")
} else {
log.Info(fmt.Sprintf("lightpush message failure. info=%s", requestPushRPC.Response.Info))
}
}
}
// TODO: AddPeer and selectPeer are duplicated in wakustore too. Refactor this code
func (wakuLP *WakuLightPush) AddPeer(p peer.ID, addrs []ma.Multiaddr) error {
for _, addr := range addrs {
wakuLP.h.Peerstore().AddAddr(p, addr, peerstore.PermanentAddrTTL)
}
err := wakuLP.h.Peerstore().AddProtocols(p, string(WakuLightPushProtocolId))
if err != nil {
return err
}
return nil
}
func (wakuLP *WakuLightPush) selectPeer() *peer.ID {
var peers peer.IDSlice
for _, peer := range wakuLP.h.Peerstore().Peers() {
protocols, err := wakuLP.h.Peerstore().SupportsProtocols(peer, string(WakuLightPushProtocolId))
if err != nil {
log.Error("error obtaining the protocols supported by peers", err)
return nil
}
if len(protocols) > 0 {
peers = append(peers, peer)
}
}
if len(peers) >= 1 {
// TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
return &peers[0]
}
return nil
}
type LightPushParameters struct {
selectedPeer peer.ID
requestId []byte
lp *WakuLightPush
}
type LightPushOption func(*LightPushParameters)
func WithPeer(p peer.ID) LightPushOption {
return func(params *LightPushParameters) {
params.selectedPeer = p
}
}
func WithAutomaticPeerSelection() LightPushOption {
return func(params *LightPushParameters) {
p := params.lp.selectPeer()
params.selectedPeer = *p
}
}
func WithRequestId(requestId []byte) LightPushOption {
return func(params *LightPushParameters) {
params.requestId = requestId
}
}
func WithAutomaticRequestId() LightPushOption {
return func(params *LightPushParameters) {
params.requestId = protocol.GenerateRequestId()
}
}
func DefaultOptions() []LightPushOption {
return []LightPushOption{
WithAutomaticRequestId(),
WithAutomaticPeerSelection(),
}
}
func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) {
params := new(LightPushParameters)
optList := DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}
if params.selectedPeer == "" {
return nil, ErrNoPeersAvailable
}
if len(params.requestId) == 0 {
return nil, ErrInvalidId
}
connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, WakuLightPushProtocolId)
if err != nil {
log.Info("failed to connect to remote peer", err)
return nil, err
}
defer connOpt.Close()
defer connOpt.Reset()
pushRequestRPC := &pb.PushRPC{RequestId: hex.EncodeToString(params.requestId), Query: req}
writer := protoio.NewDelimitedWriter(connOpt)
reader := protoio.NewDelimitedReader(connOpt, 64*1024)
err = writer.WriteMsg(pushRequestRPC)
if err != nil {
log.Error("could not write request", err)
return nil, err
}
pushResponseRPC := &pb.PushRPC{}
err = reader.ReadMsg(pushResponseRPC)
if err != nil {
log.Error("could not read response", err)
return nil, err
}
return pushResponseRPC.Response, nil
}

View File

@ -0,0 +1,19 @@
package pb
import (
gcrypto "github.com/ethereum/go-ethereum/crypto"
proto "github.com/golang/protobuf/proto"
)
func (msg *WakuMessage) Hash() ([]byte, error) {
out, err := proto.Marshal(msg)
if err != nil {
return nil, err
}
return Hash(out), nil
}
func Hash(data []byte) []byte {
return gcrypto.Keccak256(data)
}

View File

@ -0,0 +1,916 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: waku_lightpush.proto
package pb
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type PushRequest struct {
PubsubTopic string `protobuf:"bytes,1,opt,name=pubsub_topic,json=pubsubTopic,proto3" json:"pubsub_topic,omitempty"`
Message *WakuMessage `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PushRequest) Reset() { *m = PushRequest{} }
func (m *PushRequest) String() string { return proto.CompactTextString(m) }
func (*PushRequest) ProtoMessage() {}
func (*PushRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_0edfa2f8ec212684, []int{0}
}
func (m *PushRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *PushRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_PushRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *PushRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_PushRequest.Merge(m, src)
}
func (m *PushRequest) XXX_Size() int {
return m.Size()
}
func (m *PushRequest) XXX_DiscardUnknown() {
xxx_messageInfo_PushRequest.DiscardUnknown(m)
}
var xxx_messageInfo_PushRequest proto.InternalMessageInfo
func (m *PushRequest) GetPubsubTopic() string {
if m != nil {
return m.PubsubTopic
}
return ""
}
func (m *PushRequest) GetMessage() *WakuMessage {
if m != nil {
return m.Message
}
return nil
}
type PushResponse struct {
IsSuccess bool `protobuf:"varint,1,opt,name=is_success,json=isSuccess,proto3" json:"is_success,omitempty"`
// Error messages, etc
Info string `protobuf:"bytes,2,opt,name=info,proto3" json:"info,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PushResponse) Reset() { *m = PushResponse{} }
func (m *PushResponse) String() string { return proto.CompactTextString(m) }
func (*PushResponse) ProtoMessage() {}
func (*PushResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_0edfa2f8ec212684, []int{1}
}
func (m *PushResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *PushResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_PushResponse.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *PushResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_PushResponse.Merge(m, src)
}
func (m *PushResponse) XXX_Size() int {
return m.Size()
}
func (m *PushResponse) XXX_DiscardUnknown() {
xxx_messageInfo_PushResponse.DiscardUnknown(m)
}
var xxx_messageInfo_PushResponse proto.InternalMessageInfo
func (m *PushResponse) GetIsSuccess() bool {
if m != nil {
return m.IsSuccess
}
return false
}
func (m *PushResponse) GetInfo() string {
if m != nil {
return m.Info
}
return ""
}
type PushRPC struct {
RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"`
Query *PushRequest `protobuf:"bytes,2,opt,name=query,proto3" json:"query,omitempty"`
Response *PushResponse `protobuf:"bytes,3,opt,name=response,proto3" json:"response,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PushRPC) Reset() { *m = PushRPC{} }
func (m *PushRPC) String() string { return proto.CompactTextString(m) }
func (*PushRPC) ProtoMessage() {}
func (*PushRPC) Descriptor() ([]byte, []int) {
return fileDescriptor_0edfa2f8ec212684, []int{2}
}
func (m *PushRPC) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *PushRPC) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_PushRPC.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *PushRPC) XXX_Merge(src proto.Message) {
xxx_messageInfo_PushRPC.Merge(m, src)
}
func (m *PushRPC) XXX_Size() int {
return m.Size()
}
func (m *PushRPC) XXX_DiscardUnknown() {
xxx_messageInfo_PushRPC.DiscardUnknown(m)
}
var xxx_messageInfo_PushRPC proto.InternalMessageInfo
func (m *PushRPC) GetRequestId() string {
if m != nil {
return m.RequestId
}
return ""
}
func (m *PushRPC) GetQuery() *PushRequest {
if m != nil {
return m.Query
}
return nil
}
func (m *PushRPC) GetResponse() *PushResponse {
if m != nil {
return m.Response
}
return nil
}
func init() {
proto.RegisterType((*PushRequest)(nil), "pb.PushRequest")
proto.RegisterType((*PushResponse)(nil), "pb.PushResponse")
proto.RegisterType((*PushRPC)(nil), "pb.PushRPC")
}
func init() { proto.RegisterFile("waku_lightpush.proto", fileDescriptor_0edfa2f8ec212684) }
var fileDescriptor_0edfa2f8ec212684 = []byte{
// 268 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x29, 0x4f, 0xcc, 0x2e,
0x8d, 0xcf, 0xc9, 0x4c, 0xcf, 0x28, 0x29, 0x28, 0x2d, 0xce, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9,
0x17, 0x62, 0x2a, 0x48, 0x92, 0x12, 0x02, 0xcb, 0xe4, 0xa6, 0x16, 0x17, 0x27, 0xa6, 0xa7, 0x42,
0xc4, 0x95, 0xa2, 0xb9, 0xb8, 0x03, 0x4a, 0x8b, 0x33, 0x82, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b,
0x84, 0x14, 0xb9, 0x78, 0x0a, 0x4a, 0x93, 0x8a, 0x4b, 0x93, 0xe2, 0x4b, 0xf2, 0x0b, 0x32, 0x93,
0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, 0xb8, 0x21, 0x62, 0x21, 0x20, 0x21, 0x21, 0x4d, 0x2e,
0x76, 0xa8, 0x11, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xdc, 0x46, 0xfc, 0x7a, 0x05, 0x49, 0x7a, 0xe1,
0x89, 0xd9, 0xa5, 0xbe, 0x10, 0xe1, 0x20, 0x98, 0xbc, 0x92, 0x23, 0x17, 0x0f, 0xc4, 0xf0, 0xe2,
0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x21, 0x59, 0x2e, 0xae, 0xcc, 0xe2, 0xf8, 0xe2, 0xd2, 0xe4, 0xe4,
0xd4, 0xe2, 0x62, 0xb0, 0xd9, 0x1c, 0x41, 0x9c, 0x99, 0xc5, 0xc1, 0x10, 0x01, 0x21, 0x21, 0x2e,
0x96, 0xcc, 0xbc, 0xb4, 0x7c, 0xb0, 0xb1, 0x9c, 0x41, 0x60, 0xb6, 0x52, 0x2d, 0x17, 0x3b, 0xd8,
0x88, 0x00, 0x67, 0x90, 0xee, 0x22, 0x88, 0x33, 0xe3, 0x33, 0x53, 0xa0, 0x2e, 0xe3, 0x84, 0x8a,
0x78, 0xa6, 0x08, 0xa9, 0x72, 0xb1, 0x16, 0x96, 0xa6, 0x16, 0x55, 0x22, 0xbb, 0x0a, 0xc9, 0x6b,
0x41, 0x10, 0x59, 0x21, 0x1d, 0x2e, 0x8e, 0x22, 0xa8, 0x7b, 0x24, 0x98, 0xc1, 0x2a, 0x05, 0x10,
0x2a, 0x21, 0xe2, 0x41, 0x70, 0x15, 0x4e, 0x02, 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7,
0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x8c, 0xc7, 0x72, 0x0c, 0x49, 0x6c, 0xe0, 0x70, 0x33, 0x06, 0x04,
0x00, 0x00, 0xff, 0xff, 0x76, 0x20, 0x2e, 0xed, 0x67, 0x01, 0x00, 0x00,
}
func (m *PushRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PushRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *PushRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Message != nil {
{
size, err := m.Message.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintWakuLightpush(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
if len(m.PubsubTopic) > 0 {
i -= len(m.PubsubTopic)
copy(dAtA[i:], m.PubsubTopic)
i = encodeVarintWakuLightpush(dAtA, i, uint64(len(m.PubsubTopic)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *PushResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PushResponse) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *PushResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Info) > 0 {
i -= len(m.Info)
copy(dAtA[i:], m.Info)
i = encodeVarintWakuLightpush(dAtA, i, uint64(len(m.Info)))
i--
dAtA[i] = 0x12
}
if m.IsSuccess {
i--
if m.IsSuccess {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func (m *PushRPC) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PushRPC) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *PushRPC) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Response != nil {
{
size, err := m.Response.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintWakuLightpush(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
if m.Query != nil {
{
size, err := m.Query.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintWakuLightpush(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
if len(m.RequestId) > 0 {
i -= len(m.RequestId)
copy(dAtA[i:], m.RequestId)
i = encodeVarintWakuLightpush(dAtA, i, uint64(len(m.RequestId)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintWakuLightpush(dAtA []byte, offset int, v uint64) int {
offset -= sovWakuLightpush(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *PushRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.PubsubTopic)
if l > 0 {
n += 1 + l + sovWakuLightpush(uint64(l))
}
if m.Message != nil {
l = m.Message.Size()
n += 1 + l + sovWakuLightpush(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *PushResponse) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.IsSuccess {
n += 2
}
l = len(m.Info)
if l > 0 {
n += 1 + l + sovWakuLightpush(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *PushRPC) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.RequestId)
if l > 0 {
n += 1 + l + sovWakuLightpush(uint64(l))
}
if m.Query != nil {
l = m.Query.Size()
n += 1 + l + sovWakuLightpush(uint64(l))
}
if m.Response != nil {
l = m.Response.Size()
n += 1 + l + sovWakuLightpush(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovWakuLightpush(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozWakuLightpush(x uint64) (n int) {
return sovWakuLightpush(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *PushRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuLightpush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PushRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PushRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PubsubTopic", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuLightpush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthWakuLightpush
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthWakuLightpush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.PubsubTopic = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Message", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuLightpush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthWakuLightpush
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthWakuLightpush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Message == nil {
m.Message = &WakuMessage{}
}
if err := m.Message.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipWakuLightpush(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthWakuLightpush
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *PushResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuLightpush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PushResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PushResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field IsSuccess", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuLightpush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.IsSuccess = bool(v != 0)
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Info", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuLightpush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthWakuLightpush
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthWakuLightpush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Info = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipWakuLightpush(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthWakuLightpush
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *PushRPC) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuLightpush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PushRPC: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PushRPC: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field RequestId", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuLightpush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthWakuLightpush
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthWakuLightpush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.RequestId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuLightpush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthWakuLightpush
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthWakuLightpush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Query == nil {
m.Query = &PushRequest{}
}
if err := m.Query.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Response", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuLightpush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthWakuLightpush
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthWakuLightpush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Response == nil {
m.Response = &PushResponse{}
}
if err := m.Response.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipWakuLightpush(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthWakuLightpush
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipWakuLightpush(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowWakuLightpush
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowWakuLightpush
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowWakuLightpush
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthWakuLightpush
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupWakuLightpush
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthWakuLightpush
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthWakuLightpush = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowWakuLightpush = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupWakuLightpush = fmt.Errorf("proto: unexpected end of group")
)

View File

@ -0,0 +1,22 @@
syntax = "proto3";
package pb;
import "waku_message.proto";
message PushRequest {
string pubsub_topic = 1;
WakuMessage message = 2;
}
message PushResponse {
bool is_success = 1;
// Error messages, etc
string info = 2;
}
message PushRPC {
string request_id = 1;
PushRequest query = 2;
PushResponse response = 3;
}

View File

@ -0,0 +1,147 @@
package relay
import (
"context"
"errors"
"sync"
proto "github.com/golang/protobuf/proto"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
wakurelay "github.com/status-im/go-wakurelay-pubsub"
)
var log = logging.Logger("wakurelay")
type Topic string
const DefaultWakuTopic Topic = "/waku/2/default-waku/proto"
type WakuRelay struct {
host host.Host
pubsub *wakurelay.PubSub
topics map[Topic]bool
topicsMutex sync.Mutex
wakuRelayTopics map[Topic]*wakurelay.Topic
relaySubs map[Topic]*wakurelay.Subscription
}
func NewWakuRelay(ctx context.Context, h host.Host, opts ...wakurelay.Option) (*WakuRelay, error) {
w := new(WakuRelay)
w.host = h
w.topics = make(map[Topic]bool)
w.wakuRelayTopics = make(map[Topic]*wakurelay.Topic)
w.relaySubs = make(map[Topic]*wakurelay.Subscription)
ps, err := wakurelay.NewWakuRelaySub(ctx, h, opts...)
if err != nil {
return nil, err
}
w.pubsub = ps
log.Info("Relay protocol started")
return w, nil
}
func (w *WakuRelay) PubSub() *wakurelay.PubSub {
return w.pubsub
}
func (w *WakuRelay) Topics() []Topic {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
var result []Topic
for topic, _ := range w.topics {
result = append(result, topic)
}
return result
}
func (w *WakuRelay) SetPubSub(pubSub *wakurelay.PubSub) {
w.pubsub = pubSub
}
func (w *WakuRelay) upsertTopic(topic Topic) (*wakurelay.Topic, error) {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
w.topics[topic] = true
pubSubTopic, ok := w.wakuRelayTopics[topic]
if !ok { // Joins topic if node hasn't joined yet
newTopic, err := w.pubsub.Join(string(topic))
if err != nil {
return nil, err
}
w.wakuRelayTopics[topic] = newTopic
pubSubTopic = newTopic
}
return pubSubTopic, nil
}
func (w *WakuRelay) Subscribe(topic Topic) (subs *wakurelay.Subscription, isNew bool, err error) {
sub, ok := w.relaySubs[topic]
if !ok {
pubSubTopic, err := w.upsertTopic(topic)
if err != nil {
return nil, false, err
}
sub, err = pubSubTopic.Subscribe()
if err != nil {
return nil, false, err
}
w.relaySubs[topic] = sub
log.Info("Subscribing to topic ", topic)
}
isNew = !ok // ok will be true if subscription already exists
return sub, isNew, nil
}
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *Topic) ([]byte, error) {
// Publish a `WakuMessage` to a PubSub topic.
if w.pubsub == nil {
return nil, errors.New("PubSub hasn't been set.")
}
if message == nil {
return nil, errors.New("message can't be null")
}
pubSubTopic, err := w.upsertTopic(GetTopic(topic))
if err != nil {
return nil, err
}
out, err := proto.Marshal(message)
if err != nil {
return nil, err
}
err = pubSubTopic.Publish(ctx, out)
if err != nil {
return nil, err
}
hash := pb.Hash(out)
return hash, nil
}
func GetTopic(topic *Topic) Topic {
var t Topic = DefaultWakuTopic
if topic != nil {
t = *topic
}
return t
}

View File

@ -0,0 +1,45 @@
package protocol
import (
"crypto/rand"
"sync"
"github.com/cruxic/go-hmac-drbg/hmacdrbg"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("request-gen")
var brHmacDrbgPool = sync.Pool{New: func() interface{} {
seed := make([]byte, 48)
_, err := rand.Read(seed)
if err != nil {
log.Fatal(err)
}
return hmacdrbg.NewHmacDrbg(256, seed, nil)
}}
func GenerateRequestId() []byte {
rng := brHmacDrbgPool.Get().(*hmacdrbg.HmacDrbg)
defer brHmacDrbgPool.Put(rng)
randData := make([]byte, 32)
if !rng.Generate(randData) {
//Reseed is required every 10,000 calls
seed := make([]byte, 48)
_, err := rand.Read(seed)
if err != nil {
log.Fatal(err)
}
err = rng.Reseed(seed)
if err != nil {
//only happens if seed < security-level
log.Fatal(err)
}
if !rng.Generate(randData) {
log.Error("could not generate random request id")
}
}
return randData
}

View File

@ -3,7 +3,6 @@ package store
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"errors"
@ -12,7 +11,6 @@ import (
"sync"
"time"
"github.com/cruxic/go-hmac-drbg/hmacdrbg"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
@ -395,40 +393,6 @@ func (store *WakuStore) selectPeer() *peer.ID {
return nil
}
var brHmacDrbgPool = sync.Pool{New: func() interface{} {
seed := make([]byte, 48)
_, err := rand.Read(seed)
if err != nil {
log.Fatal(err)
}
return hmacdrbg.NewHmacDrbg(256, seed, nil)
}}
func GenerateRequestId() []byte {
rng := brHmacDrbgPool.Get().(*hmacdrbg.HmacDrbg)
defer brHmacDrbgPool.Put(rng)
randData := make([]byte, 32)
if !rng.Generate(randData) {
//Reseed is required every 10,000 calls
seed := make([]byte, 48)
_, err := rand.Read(seed)
if err != nil {
log.Fatal(err)
}
err = rng.Reseed(seed)
if err != nil {
//only happens if seed < security-level
log.Fatal(err)
}
if !rng.Generate(randData) {
log.Error("could not generate random request id")
}
}
return randData
}
type HistoryRequestParameters struct {
selectedPeer peer.ID
requestId []byte
@ -463,7 +427,7 @@ func WithRequestId(requestId []byte) HistoryRequestOption {
func WithAutomaticRequestId() HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.requestId = GenerateRequestId()
params.requestId = protocol.GenerateRequestId()
}
}
@ -491,7 +455,10 @@ func DefaultOptions() []HistoryRequestOption {
func (store *WakuStore) Query(ctx context.Context, q *pb.HistoryQuery, opts ...HistoryRequestOption) (*pb.HistoryResponse, error) {
params := new(HistoryRequestParameters)
params.s = store
for _, opt := range opts {
optList := DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}