From 3d8aae5b81b95835ed469048e6967c1110b04050 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 28 Apr 2021 16:10:44 -0400 Subject: [PATCH] Lightpush protocol - Partially implements #20. Requires some tests - Extracts wakurelay code to separate file - Extracts request id gen to separate file - Initial implementation of lightpush protocol - Adds utils functions to obtain a message hash - Publish receives a context to send a message --- waku/node.go | 9 +- waku/persistence/store.go | 4 +- waku/v2/node/wakunode2.go | 174 ++-- waku/v2/protocol/envelope.go | 7 +- waku/v2/protocol/lightpush/waku_lightpush.go | 234 +++++ waku/v2/protocol/pb/utils.go | 19 + waku/v2/protocol/pb/waku_lightpush.pb.go | 916 +++++++++++++++++++ waku/v2/protocol/pb/waku_lightpush.proto | 22 + waku/v2/protocol/relay/waku_relay.go | 147 +++ waku/v2/protocol/requestId.go | 45 + waku/v2/protocol/store/waku_store.go | 43 +- 11 files changed, 1457 insertions(+), 163 deletions(-) create mode 100644 waku/v2/protocol/lightpush/waku_lightpush.go create mode 100644 waku/v2/protocol/pb/utils.go create mode 100644 waku/v2/protocol/pb/waku_lightpush.pb.go create mode 100644 waku/v2/protocol/pb/waku_lightpush.proto create mode 100644 waku/v2/protocol/relay/waku_relay.go create mode 100644 waku/v2/protocol/requestId.go diff --git a/waku/node.go b/waku/node.go index 5e162169..d307cb75 100644 --- a/waku/node.go +++ b/waku/node.go @@ -24,6 +24,7 @@ import ( "github.com/status-im/go-waku/waku/persistence/sqlite" "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol/relay" ) var log = logging.Logger("wakunode") @@ -55,7 +56,7 @@ var rootCmd = &cobra.Command{ port, _ := cmd.Flags().GetInt("port") enableWs, _ := cmd.Flags().GetBool("ws") wsPort, _ := cmd.Flags().GetInt("ws-port") - relay, _ := cmd.Flags().GetBool("relay") + wakuRelay, _ := cmd.Flags().GetBool("relay") key, _ := cmd.Flags().GetString("nodekey") store, _ := cmd.Flags().GetBool("store") useDB, _ := cmd.Flags().GetBool("use-db") @@ -115,7 +116,7 @@ var rootCmd = &cobra.Command{ nodeOpts = append(nodeOpts, node.WithLibP2POptions(libp2pOpts...)) - if relay { + if wakuRelay { nodeOpts = append(nodeOpts, node.WithWakuRelay()) } @@ -135,7 +136,7 @@ var rootCmd = &cobra.Command{ checkError(err, "Wakunode") for _, t := range topics { - nodeTopic := node.Topic(t) + nodeTopic := relay.Topic(t) _, err := wakuNode.Subscribe(&nodeTopic) checkError(err, "Error subscring to topic") } @@ -185,7 +186,7 @@ func init() { rootCmd.Flags().Bool("ws", false, "Enable websockets support") rootCmd.Flags().Int("ws-port", 9001, "Libp2p TCP listening port for websocket connection (0 for random)") rootCmd.Flags().String("nodekey", "", "P2P node private key as hex (default random)") - rootCmd.Flags().StringSlice("topics", []string{string(node.DefaultWakuTopic)}, fmt.Sprintf("List of topics to listen (default %s)", node.DefaultWakuTopic)) + rootCmd.Flags().StringSlice("topics", []string{string(relay.DefaultWakuTopic)}, fmt.Sprintf("List of topics to listen (default %s)", relay.DefaultWakuTopic)) rootCmd.Flags().StringSlice("staticnodes", []string{}, "Multiaddr of peer to directly connect with. Argument may be repeated") rootCmd.Flags().Bool("relay", true, "Enable relay protocol") rootCmd.Flags().Bool("store", false, "Enable store protocol") diff --git a/waku/persistence/store.go b/waku/persistence/store.go index c0921f3a..f32e5a71 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -4,7 +4,6 @@ import ( "database/sql" "log" - gcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/store" @@ -120,8 +119,7 @@ func (d *DBStore) GetAll() ([]*protocol.Envelope, error) { msg.Timestamp = float64(timestamp) msg.Version = version - data, _ := msg.Marshal() - envelope := protocol.NewEnvelope(msg, pubsubTopic, len(data), gcrypto.Keccak256(data)) + envelope := protocol.NewEnvelope(msg, pubsubTopic) result = append(result, envelope) } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 22ebc25b..76007477 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -7,7 +7,6 @@ import ( "sync" "time" - gcrypto "github.com/ethereum/go-ethereum/crypto" proto "github.com/golang/protobuf/proto" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" @@ -16,7 +15,9 @@ import ( ma "github.com/multiformats/go-multiaddr" "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/protocol/lightpush" "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/store" wakurelay "github.com/status-im/go-wakurelay-pubsub" ) @@ -26,26 +27,19 @@ var log = logging.Logger("wakunode") // Default clientId const clientId string = "Go Waku v2 node" -type Topic string - -const DefaultWakuTopic Topic = "/waku/2/default-waku/proto" - type Message []byte type WakuNode struct { - host host.Host - opts *WakuNodeParameters - pubsub *wakurelay.PubSub + host host.Host + opts *WakuNodeParameters - topics map[Topic]bool - topicsMutex sync.Mutex - wakuRelayTopics map[Topic]*wakurelay.Topic + relay *relay.WakuRelay + lightPush *lightpush.WakuLightPush - subscriptions map[Topic][]*Subscription + subscriptions map[relay.Topic][]*Subscription subscriptionsMutex sync.Mutex - bcaster Broadcaster - relaySubs map[Topic]*wakurelay.Subscription + bcaster Broadcaster ctx context.Context cancel context.CancelFunc @@ -81,14 +75,10 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w := new(WakuNode) w.bcaster = NewBroadcaster(1024) - w.pubsub = nil w.host = host w.cancel = cancel w.ctx = ctx - w.topics = make(map[Topic]bool) - w.wakuRelayTopics = make(map[Topic]*wakurelay.Topic) - w.relaySubs = make(map[Topic]*wakurelay.Subscription) - w.subscriptions = make(map[Topic][]*Subscription) + w.subscriptions = make(map[relay.Topic][]*Subscription) w.opts = params if params.enableRelay { @@ -117,7 +107,7 @@ func (w *WakuNode) Stop() { defer w.subscriptionsMutex.Unlock() defer w.cancel() - for topic, _ := range w.topics { + for _, topic := range w.relay.Topics() { for _, sub := range w.subscriptions[topic] { sub.Unsubscribe() } @@ -143,27 +133,18 @@ func (w *WakuNode) ListenAddresses() []string { return result } -func (w *WakuNode) PubSub() *wakurelay.PubSub { - return w.pubsub -} - -func (w *WakuNode) SetPubSub(pubSub *wakurelay.PubSub) { - w.pubsub = pubSub +func (w *WakuNode) Relay() *relay.WakuRelay { + return w.relay } func (w *WakuNode) mountRelay(opts ...wakurelay.Option) error { - ps, err := wakurelay.NewWakuRelaySub(w.ctx, w.host, opts...) - if err != nil { - return err - } - w.pubsub = ps + var err error + w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...) // TODO: filters // TODO: rlnRelay - log.Info("Relay protocol started") - - return nil + return err } func (w *WakuNode) startStore() error { @@ -216,24 +197,22 @@ func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime return result, nil } -func getTopic(topic *Topic) Topic { - var t Topic = DefaultWakuTopic - if topic != nil { - t = *topic - } - return t -} - -func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { +func (node *WakuNode) Subscribe(topic *relay.Topic) (*Subscription, error) { // Subscribes to a PubSub topic. // NOTE The data field SHOULD be decoded as a WakuMessage. - if node.pubsub == nil { - return nil, errors.New("PubSub hasn't been set. Execute mountWakuRelay() or setPubSub() first") + if node.relay == nil { + return nil, errors.New("WakuRelay hasn't been set.") } - t := getTopic(topic) + t := relay.GetTopic(topic) + sub, isNew, err := node.relay.Subscribe(t) + + // Subscribe store to topic + if isNew && node.opts.store != nil && node.opts.storeMsgs { + log.Info("Subscribing store to topic ", t) + node.bcaster.Register(node.opts.store.MsgC) + } - sub, err := node.upsertSubscription(t) if err != nil { return nil, err } @@ -246,11 +225,12 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { node.subscriptionsMutex.Lock() defer node.subscriptionsMutex.Unlock() + node.subscriptions[t] = append(node.subscriptions[t], subscription) node.bcaster.Register(subscription.C) - go func(t Topic) { + go func(t relay.Topic) { nextMsgTicker := time.NewTicker(time.Millisecond * 10) defer nextMsgTicker.Stop() @@ -265,11 +245,9 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { msg, err := sub.Next(node.ctx) if err != nil { subscription.mutex.Lock() - node.topicsMutex.Lock() for _, subscription := range node.subscriptions[t] { subscription.Unsubscribe() } - node.topicsMutex.Unlock() subscription.mutex.Unlock() return } @@ -280,7 +258,7 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { return } - envelope := protocol.NewEnvelope(wakuMessage, string(t), len(msg.Data), gcrypto.Keccak256(msg.Data)) + envelope := protocol.NewEnvelope(wakuMessage, string(t)) node.bcaster.Submit(envelope) } @@ -290,83 +268,49 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { return subscription, nil } -func (node *WakuNode) upsertTopic(topic Topic) (*wakurelay.Topic, error) { - defer node.topicsMutex.Unlock() - node.topicsMutex.Lock() - - node.topics[topic] = true - pubSubTopic, ok := node.wakuRelayTopics[topic] - if !ok { // Joins topic if node hasn't joined yet - newTopic, err := node.pubsub.Join(string(topic)) - if err != nil { - return nil, err - } - node.wakuRelayTopics[topic] = newTopic - pubSubTopic = newTopic - } - return pubSubTopic, nil -} - -func (node *WakuNode) upsertSubscription(topic Topic) (*wakurelay.Subscription, error) { - sub, ok := node.relaySubs[topic] - if !ok { - pubSubTopic, err := node.upsertTopic(topic) - if err != nil { - return nil, err - } - - sub, err = pubSubTopic.Subscribe() - if err != nil { - return nil, err - } - node.relaySubs[topic] = sub - - log.Info("Subscribing to topic ", topic) - - if node.opts.store != nil && node.opts.storeMsgs { - log.Info("Subscribing store to topic ", topic) - node.bcaster.Register(node.opts.store.MsgC) - } - } - - return sub, nil -} - -func (node *WakuNode) Publish(message *pb.WakuMessage, topic *Topic) ([]byte, error) { - // Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a - // `contentTopic` field for light node functionality. This field may be also - // be omitted. - - if node.pubsub == nil { - return nil, errors.New("PubSub hasn't been set. Execute mountWakuRelay() or setPubSub() first") +func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic) ([]byte, error) { + if node.relay == nil { + return nil, errors.New("WakuRelay hasn't been set.") } if message == nil { return nil, errors.New("message can't be null") } - pubSubTopic, err := node.upsertTopic(getTopic(topic)) - + hash, err := node.relay.Publish(ctx, message, topic) if err != nil { return nil, err } - out, err := proto.Marshal(message) - if err != nil { - return nil, err - } - - err = pubSubTopic.Publish(node.ctx, out) - - if err != nil { - return nil, err - } - - hash := gcrypto.Keccak256(out) - return hash, nil } +func (node *WakuNode) LightPush(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...lightpush.LightPushOption) ([]byte, error) { + if node.lightPush == nil { + return nil, errors.New("WakuLightPush hasn't been set.") + } + + if message == nil { + return nil, errors.New("message can't be null") + } + + req := new(pb.PushRequest) + req.Message = message + req.PubsubTopic = string(relay.GetTopic(topic)) + + response, err := node.lightPush.Request(ctx, req, opts...) + if err != nil { + return nil, err + } + + if response.IsSuccess { + hash, _ := message.Hash() + return hash, nil + } else { + return nil, errors.New(response.Info) + } +} + func (w *WakuNode) DialPeer(address string) error { p, err := ma.NewMultiaddr(address) if err != nil { diff --git a/waku/v2/protocol/envelope.go b/waku/v2/protocol/envelope.go index 01fdaa05..52fc51a1 100644 --- a/waku/v2/protocol/envelope.go +++ b/waku/v2/protocol/envelope.go @@ -9,12 +9,13 @@ type Envelope struct { hash []byte } -func NewEnvelope(msg *pb.WakuMessage, pubSubTopic string, size int, hash []byte) *Envelope { +func NewEnvelope(msg *pb.WakuMessage, pubSubTopic string) *Envelope { + data, _ := msg.Marshal() return &Envelope{ msg: msg, pubsubTopic: pubSubTopic, - size: size, - hash: hash, + size: len(data), + hash: pb.Hash(data), } } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go new file mode 100644 index 00000000..298eaf20 --- /dev/null +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -0,0 +1,234 @@ +package lightpush + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" + "github.com/libp2p/go-msgio/protoio" + + ma "github.com/multiformats/go-multiaddr" + "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/protocol/relay" + + "github.com/status-im/go-waku/waku/v2/protocol/pb" +) + +var log = logging.Logger("waku_lightpush") + +const WakuLightPushProtocolId = libp2pProtocol.ID("/vac/waku/lightpush/2.0.0-alpha1") + +var ( + ErrNoPeersAvailable = errors.New("no suitable remote peers") + ErrInvalidId = errors.New("invalid request id") +) + +type WakuLightPush struct { + h host.Host + relay *relay.WakuRelay + ctx context.Context +} + +func NewWakuLightPush(ctx context.Context, relay *relay.WakuRelay) *WakuLightPush { + wakuLP := new(WakuLightPush) + wakuLP.relay = relay + wakuLP.ctx = ctx + return wakuLP +} + +func (wakuLP *WakuLightPush) Start(h host.Host) { + wakuLP.h = h + wakuLP.h.SetStreamHandler(WakuLightPushProtocolId, wakuLP.onRequest) + log.Info("Light Push protocol started") +} + +func (wakuLP *WakuLightPush) onRequest(s network.Stream) { + defer s.Close() + + requestPushRPC := &pb.PushRPC{} + + writer := protoio.NewDelimitedWriter(s) + reader := protoio.NewDelimitedReader(s, 64*1024) + + err := reader.ReadMsg(requestPushRPC) + if err != nil { + log.Error("error reading request", err) + return + } + + log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) + + if requestPushRPC.Query != nil { + log.Info("lightpush push request") + pubSubTopic := relay.Topic(requestPushRPC.Query.PubsubTopic) + message := requestPushRPC.Query.Message + + response := new(pb.PushResponse) + if wakuLP.relay != nil { + // XXX Assumes success, should probably be extended to check for network, peers, etc + _, err := wakuLP.relay.Publish(wakuLP.ctx, message, &pubSubTopic) + + if err != nil { + response.IsSuccess = false + response.Info = "Could not publish message" + } else { + response.IsSuccess = true + response.Info = "Totally" + } + } else { + log.Debug("No relay protocol present, unsuccessful push") + response.IsSuccess = false + response.Info = "No relay protocol" + } + + responsePushRPC := &pb.PushRPC{} + responsePushRPC.RequestId = requestPushRPC.RequestId + responsePushRPC.Response = response + + err = writer.WriteMsg(responsePushRPC) + if err != nil { + log.Error("error writing response", err) + s.Reset() + } else { + log.Info(fmt.Sprintf("%s: Response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())) + } + } + + if requestPushRPC.Response != nil { + if requestPushRPC.Response.IsSuccess { + log.Info("lightpush message success") + } else { + log.Info(fmt.Sprintf("lightpush message failure. info=%s", requestPushRPC.Response.Info)) + } + } +} + +// TODO: AddPeer and selectPeer are duplicated in wakustore too. Refactor this code + +func (wakuLP *WakuLightPush) AddPeer(p peer.ID, addrs []ma.Multiaddr) error { + for _, addr := range addrs { + wakuLP.h.Peerstore().AddAddr(p, addr, peerstore.PermanentAddrTTL) + } + err := wakuLP.h.Peerstore().AddProtocols(p, string(WakuLightPushProtocolId)) + if err != nil { + return err + } + return nil +} + +func (wakuLP *WakuLightPush) selectPeer() *peer.ID { + var peers peer.IDSlice + for _, peer := range wakuLP.h.Peerstore().Peers() { + protocols, err := wakuLP.h.Peerstore().SupportsProtocols(peer, string(WakuLightPushProtocolId)) + if err != nil { + log.Error("error obtaining the protocols supported by peers", err) + return nil + } + + if len(protocols) > 0 { + peers = append(peers, peer) + } + } + + if len(peers) >= 1 { + // TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned + return &peers[0] + } + + return nil +} + +type LightPushParameters struct { + selectedPeer peer.ID + requestId []byte + + lp *WakuLightPush +} + +type LightPushOption func(*LightPushParameters) + +func WithPeer(p peer.ID) LightPushOption { + return func(params *LightPushParameters) { + params.selectedPeer = p + } +} + +func WithAutomaticPeerSelection() LightPushOption { + return func(params *LightPushParameters) { + p := params.lp.selectPeer() + params.selectedPeer = *p + } +} + +func WithRequestId(requestId []byte) LightPushOption { + return func(params *LightPushParameters) { + params.requestId = requestId + } +} + +func WithAutomaticRequestId() LightPushOption { + return func(params *LightPushParameters) { + params.requestId = protocol.GenerateRequestId() + } +} + +func DefaultOptions() []LightPushOption { + return []LightPushOption{ + WithAutomaticRequestId(), + WithAutomaticPeerSelection(), + } +} + +func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) { + params := new(LightPushParameters) + + optList := DefaultOptions() + optList = append(optList, opts...) + for _, opt := range optList { + opt(params) + } + + if params.selectedPeer == "" { + return nil, ErrNoPeersAvailable + } + + if len(params.requestId) == 0 { + return nil, ErrInvalidId + } + + connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, WakuLightPushProtocolId) + if err != nil { + log.Info("failed to connect to remote peer", err) + return nil, err + } + + defer connOpt.Close() + defer connOpt.Reset() + + pushRequestRPC := &pb.PushRPC{RequestId: hex.EncodeToString(params.requestId), Query: req} + + writer := protoio.NewDelimitedWriter(connOpt) + reader := protoio.NewDelimitedReader(connOpt, 64*1024) + + err = writer.WriteMsg(pushRequestRPC) + if err != nil { + log.Error("could not write request", err) + return nil, err + } + + pushResponseRPC := &pb.PushRPC{} + err = reader.ReadMsg(pushResponseRPC) + if err != nil { + log.Error("could not read response", err) + return nil, err + } + + return pushResponseRPC.Response, nil +} diff --git a/waku/v2/protocol/pb/utils.go b/waku/v2/protocol/pb/utils.go new file mode 100644 index 00000000..8652be32 --- /dev/null +++ b/waku/v2/protocol/pb/utils.go @@ -0,0 +1,19 @@ +package pb + +import ( + gcrypto "github.com/ethereum/go-ethereum/crypto" + proto "github.com/golang/protobuf/proto" +) + +func (msg *WakuMessage) Hash() ([]byte, error) { + out, err := proto.Marshal(msg) + if err != nil { + return nil, err + } + + return Hash(out), nil +} + +func Hash(data []byte) []byte { + return gcrypto.Keccak256(data) +} diff --git a/waku/v2/protocol/pb/waku_lightpush.pb.go b/waku/v2/protocol/pb/waku_lightpush.pb.go new file mode 100644 index 00000000..3068f1e5 --- /dev/null +++ b/waku/v2/protocol/pb/waku_lightpush.pb.go @@ -0,0 +1,916 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: waku_lightpush.proto + +package pb + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type PushRequest struct { + PubsubTopic string `protobuf:"bytes,1,opt,name=pubsub_topic,json=pubsubTopic,proto3" json:"pubsub_topic,omitempty"` + Message *WakuMessage `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PushRequest) Reset() { *m = PushRequest{} } +func (m *PushRequest) String() string { return proto.CompactTextString(m) } +func (*PushRequest) ProtoMessage() {} +func (*PushRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_0edfa2f8ec212684, []int{0} +} +func (m *PushRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PushRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PushRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PushRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_PushRequest.Merge(m, src) +} +func (m *PushRequest) XXX_Size() int { + return m.Size() +} +func (m *PushRequest) XXX_DiscardUnknown() { + xxx_messageInfo_PushRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_PushRequest proto.InternalMessageInfo + +func (m *PushRequest) GetPubsubTopic() string { + if m != nil { + return m.PubsubTopic + } + return "" +} + +func (m *PushRequest) GetMessage() *WakuMessage { + if m != nil { + return m.Message + } + return nil +} + +type PushResponse struct { + IsSuccess bool `protobuf:"varint,1,opt,name=is_success,json=isSuccess,proto3" json:"is_success,omitempty"` + // Error messages, etc + Info string `protobuf:"bytes,2,opt,name=info,proto3" json:"info,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PushResponse) Reset() { *m = PushResponse{} } +func (m *PushResponse) String() string { return proto.CompactTextString(m) } +func (*PushResponse) ProtoMessage() {} +func (*PushResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_0edfa2f8ec212684, []int{1} +} +func (m *PushResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PushResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PushResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PushResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_PushResponse.Merge(m, src) +} +func (m *PushResponse) XXX_Size() int { + return m.Size() +} +func (m *PushResponse) XXX_DiscardUnknown() { + xxx_messageInfo_PushResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_PushResponse proto.InternalMessageInfo + +func (m *PushResponse) GetIsSuccess() bool { + if m != nil { + return m.IsSuccess + } + return false +} + +func (m *PushResponse) GetInfo() string { + if m != nil { + return m.Info + } + return "" +} + +type PushRPC struct { + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + Query *PushRequest `protobuf:"bytes,2,opt,name=query,proto3" json:"query,omitempty"` + Response *PushResponse `protobuf:"bytes,3,opt,name=response,proto3" json:"response,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PushRPC) Reset() { *m = PushRPC{} } +func (m *PushRPC) String() string { return proto.CompactTextString(m) } +func (*PushRPC) ProtoMessage() {} +func (*PushRPC) Descriptor() ([]byte, []int) { + return fileDescriptor_0edfa2f8ec212684, []int{2} +} +func (m *PushRPC) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PushRPC) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PushRPC.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PushRPC) XXX_Merge(src proto.Message) { + xxx_messageInfo_PushRPC.Merge(m, src) +} +func (m *PushRPC) XXX_Size() int { + return m.Size() +} +func (m *PushRPC) XXX_DiscardUnknown() { + xxx_messageInfo_PushRPC.DiscardUnknown(m) +} + +var xxx_messageInfo_PushRPC proto.InternalMessageInfo + +func (m *PushRPC) GetRequestId() string { + if m != nil { + return m.RequestId + } + return "" +} + +func (m *PushRPC) GetQuery() *PushRequest { + if m != nil { + return m.Query + } + return nil +} + +func (m *PushRPC) GetResponse() *PushResponse { + if m != nil { + return m.Response + } + return nil +} + +func init() { + proto.RegisterType((*PushRequest)(nil), "pb.PushRequest") + proto.RegisterType((*PushResponse)(nil), "pb.PushResponse") + proto.RegisterType((*PushRPC)(nil), "pb.PushRPC") +} + +func init() { proto.RegisterFile("waku_lightpush.proto", fileDescriptor_0edfa2f8ec212684) } + +var fileDescriptor_0edfa2f8ec212684 = []byte{ + // 268 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x29, 0x4f, 0xcc, 0x2e, + 0x8d, 0xcf, 0xc9, 0x4c, 0xcf, 0x28, 0x29, 0x28, 0x2d, 0xce, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, + 0x17, 0x62, 0x2a, 0x48, 0x92, 0x12, 0x02, 0xcb, 0xe4, 0xa6, 0x16, 0x17, 0x27, 0xa6, 0xa7, 0x42, + 0xc4, 0x95, 0xa2, 0xb9, 0xb8, 0x03, 0x4a, 0x8b, 0x33, 0x82, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, + 0x84, 0x14, 0xb9, 0x78, 0x0a, 0x4a, 0x93, 0x8a, 0x4b, 0x93, 0xe2, 0x4b, 0xf2, 0x0b, 0x32, 0x93, + 0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, 0xb8, 0x21, 0x62, 0x21, 0x20, 0x21, 0x21, 0x4d, 0x2e, + 0x76, 0xa8, 0x11, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xdc, 0x46, 0xfc, 0x7a, 0x05, 0x49, 0x7a, 0xe1, + 0x89, 0xd9, 0xa5, 0xbe, 0x10, 0xe1, 0x20, 0x98, 0xbc, 0x92, 0x23, 0x17, 0x0f, 0xc4, 0xf0, 0xe2, + 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x21, 0x59, 0x2e, 0xae, 0xcc, 0xe2, 0xf8, 0xe2, 0xd2, 0xe4, 0xe4, + 0xd4, 0xe2, 0x62, 0xb0, 0xd9, 0x1c, 0x41, 0x9c, 0x99, 0xc5, 0xc1, 0x10, 0x01, 0x21, 0x21, 0x2e, + 0x96, 0xcc, 0xbc, 0xb4, 0x7c, 0xb0, 0xb1, 0x9c, 0x41, 0x60, 0xb6, 0x52, 0x2d, 0x17, 0x3b, 0xd8, + 0x88, 0x00, 0x67, 0x90, 0xee, 0x22, 0x88, 0x33, 0xe3, 0x33, 0x53, 0xa0, 0x2e, 0xe3, 0x84, 0x8a, + 0x78, 0xa6, 0x08, 0xa9, 0x72, 0xb1, 0x16, 0x96, 0xa6, 0x16, 0x55, 0x22, 0xbb, 0x0a, 0xc9, 0x6b, + 0x41, 0x10, 0x59, 0x21, 0x1d, 0x2e, 0x8e, 0x22, 0xa8, 0x7b, 0x24, 0x98, 0xc1, 0x2a, 0x05, 0x10, + 0x2a, 0x21, 0xe2, 0x41, 0x70, 0x15, 0x4e, 0x02, 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7, + 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x8c, 0xc7, 0x72, 0x0c, 0x49, 0x6c, 0xe0, 0x70, 0x33, 0x06, 0x04, + 0x00, 0x00, 0xff, 0xff, 0x76, 0x20, 0x2e, 0xed, 0x67, 0x01, 0x00, 0x00, +} + +func (m *PushRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PushRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PushRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.Message != nil { + { + size, err := m.Message.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintWakuLightpush(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if len(m.PubsubTopic) > 0 { + i -= len(m.PubsubTopic) + copy(dAtA[i:], m.PubsubTopic) + i = encodeVarintWakuLightpush(dAtA, i, uint64(len(m.PubsubTopic))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *PushResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PushResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PushResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.Info) > 0 { + i -= len(m.Info) + copy(dAtA[i:], m.Info) + i = encodeVarintWakuLightpush(dAtA, i, uint64(len(m.Info))) + i-- + dAtA[i] = 0x12 + } + if m.IsSuccess { + i-- + if m.IsSuccess { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *PushRPC) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PushRPC) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PushRPC) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.Response != nil { + { + size, err := m.Response.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintWakuLightpush(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + if m.Query != nil { + { + size, err := m.Query.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintWakuLightpush(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if len(m.RequestId) > 0 { + i -= len(m.RequestId) + copy(dAtA[i:], m.RequestId) + i = encodeVarintWakuLightpush(dAtA, i, uint64(len(m.RequestId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintWakuLightpush(dAtA []byte, offset int, v uint64) int { + offset -= sovWakuLightpush(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *PushRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.PubsubTopic) + if l > 0 { + n += 1 + l + sovWakuLightpush(uint64(l)) + } + if m.Message != nil { + l = m.Message.Size() + n += 1 + l + sovWakuLightpush(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *PushResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.IsSuccess { + n += 2 + } + l = len(m.Info) + if l > 0 { + n += 1 + l + sovWakuLightpush(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *PushRPC) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.RequestId) + if l > 0 { + n += 1 + l + sovWakuLightpush(uint64(l)) + } + if m.Query != nil { + l = m.Query.Size() + n += 1 + l + sovWakuLightpush(uint64(l)) + } + if m.Response != nil { + l = m.Response.Size() + n += 1 + l + sovWakuLightpush(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovWakuLightpush(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozWakuLightpush(x uint64) (n int) { + return sovWakuLightpush(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *PushRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuLightpush + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PushRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PushRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PubsubTopic", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuLightpush + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthWakuLightpush + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthWakuLightpush + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.PubsubTopic = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Message", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuLightpush + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthWakuLightpush + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthWakuLightpush + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Message == nil { + m.Message = &WakuMessage{} + } + if err := m.Message.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipWakuLightpush(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthWakuLightpush + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PushResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuLightpush + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PushResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PushResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field IsSuccess", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuLightpush + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.IsSuccess = bool(v != 0) + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Info", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuLightpush + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthWakuLightpush + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthWakuLightpush + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Info = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipWakuLightpush(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthWakuLightpush + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PushRPC) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuLightpush + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PushRPC: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PushRPC: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuLightpush + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthWakuLightpush + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthWakuLightpush + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.RequestId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuLightpush + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthWakuLightpush + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthWakuLightpush + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Query == nil { + m.Query = &PushRequest{} + } + if err := m.Query.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Response", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuLightpush + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthWakuLightpush + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthWakuLightpush + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Response == nil { + m.Response = &PushResponse{} + } + if err := m.Response.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipWakuLightpush(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthWakuLightpush + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipWakuLightpush(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowWakuLightpush + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowWakuLightpush + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowWakuLightpush + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthWakuLightpush + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupWakuLightpush + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthWakuLightpush + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthWakuLightpush = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowWakuLightpush = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupWakuLightpush = fmt.Errorf("proto: unexpected end of group") +) diff --git a/waku/v2/protocol/pb/waku_lightpush.proto b/waku/v2/protocol/pb/waku_lightpush.proto new file mode 100644 index 00000000..cd6f3d63 --- /dev/null +++ b/waku/v2/protocol/pb/waku_lightpush.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package pb; + +import "waku_message.proto"; + +message PushRequest { + string pubsub_topic = 1; + WakuMessage message = 2; +} + +message PushResponse { + bool is_success = 1; + // Error messages, etc + string info = 2; +} + +message PushRPC { + string request_id = 1; + PushRequest query = 2; + PushResponse response = 3; +} \ No newline at end of file diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go new file mode 100644 index 00000000..5a8fa05a --- /dev/null +++ b/waku/v2/protocol/relay/waku_relay.go @@ -0,0 +1,147 @@ +package relay + +import ( + "context" + "errors" + "sync" + + proto "github.com/golang/protobuf/proto" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/host" + + "github.com/status-im/go-waku/waku/v2/protocol/pb" + wakurelay "github.com/status-im/go-wakurelay-pubsub" +) + +var log = logging.Logger("wakurelay") + +type Topic string + +const DefaultWakuTopic Topic = "/waku/2/default-waku/proto" + +type WakuRelay struct { + host host.Host + pubsub *wakurelay.PubSub + + topics map[Topic]bool + topicsMutex sync.Mutex + wakuRelayTopics map[Topic]*wakurelay.Topic + relaySubs map[Topic]*wakurelay.Subscription +} + +func NewWakuRelay(ctx context.Context, h host.Host, opts ...wakurelay.Option) (*WakuRelay, error) { + w := new(WakuRelay) + w.host = h + w.topics = make(map[Topic]bool) + w.wakuRelayTopics = make(map[Topic]*wakurelay.Topic) + w.relaySubs = make(map[Topic]*wakurelay.Subscription) + + ps, err := wakurelay.NewWakuRelaySub(ctx, h, opts...) + if err != nil { + return nil, err + } + w.pubsub = ps + + log.Info("Relay protocol started") + + return w, nil +} + +func (w *WakuRelay) PubSub() *wakurelay.PubSub { + return w.pubsub +} + +func (w *WakuRelay) Topics() []Topic { + defer w.topicsMutex.Unlock() + w.topicsMutex.Lock() + + var result []Topic + for topic, _ := range w.topics { + result = append(result, topic) + } + return result +} + +func (w *WakuRelay) SetPubSub(pubSub *wakurelay.PubSub) { + w.pubsub = pubSub +} + +func (w *WakuRelay) upsertTopic(topic Topic) (*wakurelay.Topic, error) { + defer w.topicsMutex.Unlock() + w.topicsMutex.Lock() + + w.topics[topic] = true + pubSubTopic, ok := w.wakuRelayTopics[topic] + if !ok { // Joins topic if node hasn't joined yet + newTopic, err := w.pubsub.Join(string(topic)) + if err != nil { + return nil, err + } + w.wakuRelayTopics[topic] = newTopic + pubSubTopic = newTopic + } + return pubSubTopic, nil +} + +func (w *WakuRelay) Subscribe(topic Topic) (subs *wakurelay.Subscription, isNew bool, err error) { + + sub, ok := w.relaySubs[topic] + if !ok { + pubSubTopic, err := w.upsertTopic(topic) + if err != nil { + return nil, false, err + } + + sub, err = pubSubTopic.Subscribe() + if err != nil { + return nil, false, err + } + w.relaySubs[topic] = sub + + log.Info("Subscribing to topic ", topic) + } + + isNew = !ok // ok will be true if subscription already exists + return sub, isNew, nil +} + +func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *Topic) ([]byte, error) { + // Publish a `WakuMessage` to a PubSub topic. + + if w.pubsub == nil { + return nil, errors.New("PubSub hasn't been set.") + } + + if message == nil { + return nil, errors.New("message can't be null") + } + + pubSubTopic, err := w.upsertTopic(GetTopic(topic)) + + if err != nil { + return nil, err + } + + out, err := proto.Marshal(message) + if err != nil { + return nil, err + } + + err = pubSubTopic.Publish(ctx, out) + + if err != nil { + return nil, err + } + + hash := pb.Hash(out) + + return hash, nil +} + +func GetTopic(topic *Topic) Topic { + var t Topic = DefaultWakuTopic + if topic != nil { + t = *topic + } + return t +} diff --git a/waku/v2/protocol/requestId.go b/waku/v2/protocol/requestId.go new file mode 100644 index 00000000..102cb1c9 --- /dev/null +++ b/waku/v2/protocol/requestId.go @@ -0,0 +1,45 @@ +package protocol + +import ( + "crypto/rand" + "sync" + + "github.com/cruxic/go-hmac-drbg/hmacdrbg" + logging "github.com/ipfs/go-log" +) + +var log = logging.Logger("request-gen") + +var brHmacDrbgPool = sync.Pool{New: func() interface{} { + seed := make([]byte, 48) + _, err := rand.Read(seed) + if err != nil { + log.Fatal(err) + } + return hmacdrbg.NewHmacDrbg(256, seed, nil) +}} + +func GenerateRequestId() []byte { + rng := brHmacDrbgPool.Get().(*hmacdrbg.HmacDrbg) + defer brHmacDrbgPool.Put(rng) + + randData := make([]byte, 32) + if !rng.Generate(randData) { + //Reseed is required every 10,000 calls + seed := make([]byte, 48) + _, err := rand.Read(seed) + if err != nil { + log.Fatal(err) + } + err = rng.Reseed(seed) + if err != nil { + //only happens if seed < security-level + log.Fatal(err) + } + + if !rng.Generate(randData) { + log.Error("could not generate random request id") + } + } + return randData +} diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index c22cd233..faa2d0d9 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -3,7 +3,6 @@ package store import ( "bytes" "context" - "crypto/rand" "crypto/sha256" "encoding/hex" "errors" @@ -12,7 +11,6 @@ import ( "sync" "time" - "github.com/cruxic/go-hmac-drbg/hmacdrbg" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" @@ -395,40 +393,6 @@ func (store *WakuStore) selectPeer() *peer.ID { return nil } -var brHmacDrbgPool = sync.Pool{New: func() interface{} { - seed := make([]byte, 48) - _, err := rand.Read(seed) - if err != nil { - log.Fatal(err) - } - return hmacdrbg.NewHmacDrbg(256, seed, nil) -}} - -func GenerateRequestId() []byte { - rng := brHmacDrbgPool.Get().(*hmacdrbg.HmacDrbg) - defer brHmacDrbgPool.Put(rng) - - randData := make([]byte, 32) - if !rng.Generate(randData) { - //Reseed is required every 10,000 calls - seed := make([]byte, 48) - _, err := rand.Read(seed) - if err != nil { - log.Fatal(err) - } - err = rng.Reseed(seed) - if err != nil { - //only happens if seed < security-level - log.Fatal(err) - } - - if !rng.Generate(randData) { - log.Error("could not generate random request id") - } - } - return randData -} - type HistoryRequestParameters struct { selectedPeer peer.ID requestId []byte @@ -463,7 +427,7 @@ func WithRequestId(requestId []byte) HistoryRequestOption { func WithAutomaticRequestId() HistoryRequestOption { return func(params *HistoryRequestParameters) { - params.requestId = GenerateRequestId() + params.requestId = protocol.GenerateRequestId() } } @@ -491,7 +455,10 @@ func DefaultOptions() []HistoryRequestOption { func (store *WakuStore) Query(ctx context.Context, q *pb.HistoryQuery, opts ...HistoryRequestOption) (*pb.HistoryResponse, error) { params := new(HistoryRequestParameters) params.s = store - for _, opt := range opts { + + optList := DefaultOptions() + optList = append(optList, opts...) + for _, opt := range optList { opt(params) }