mirror of
https://github.com/status-im/go-waku.git
synced 2025-02-09 20:24:35 +00:00
refactor: lightpush options and be more explicit about
the relay node being present or not
This commit is contained in:
parent
a46881fc27
commit
a1cb371d5a
@ -9,14 +9,12 @@ import (
|
|||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
"github.com/libp2p/go-libp2p-core/network"
|
"github.com/libp2p/go-libp2p-core/network"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
|
||||||
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
|
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
|
||||||
"github.com/libp2p/go-msgio/protoio"
|
"github.com/libp2p/go-msgio/protoio"
|
||||||
"github.com/status-im/go-waku/waku/v2/metrics"
|
"github.com/status-im/go-waku/waku/v2/metrics"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/relay"
|
"github.com/status-im/go-waku/waku/v2/protocol/relay"
|
||||||
utils "github.com/status-im/go-waku/waku/v2/utils"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("waku_lightpush")
|
var log = logging.Logger("waku_lightpush")
|
||||||
@ -44,8 +42,8 @@ func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wakuLP *WakuLightPush) Start() error {
|
func (wakuLP *WakuLightPush) Start() error {
|
||||||
if wakuLP.relay == nil {
|
if wakuLP.IsClientOnly() {
|
||||||
return errors.New("relay is required")
|
return errors.New("relay is required, without it, it is only a client and cannot be started")
|
||||||
}
|
}
|
||||||
|
|
||||||
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest)
|
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest)
|
||||||
@ -54,6 +52,10 @@ func (wakuLP *WakuLightPush) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (wakuLp *WakuLightPush) IsClientOnly() bool {
|
||||||
|
return wakuLp.relay == nil
|
||||||
|
}
|
||||||
|
|
||||||
func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
|
func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
@ -73,11 +75,11 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
|
|||||||
|
|
||||||
if requestPushRPC.Query != nil {
|
if requestPushRPC.Query != nil {
|
||||||
log.Info("lightpush push request")
|
log.Info("lightpush push request")
|
||||||
pubSubTopic := relay.Topic(requestPushRPC.Query.PubsubTopic)
|
|
||||||
message := requestPushRPC.Query.Message
|
|
||||||
|
|
||||||
response := new(pb.PushResponse)
|
response := new(pb.PushResponse)
|
||||||
if wakuLP.relay != nil {
|
if !wakuLP.IsClientOnly() {
|
||||||
|
pubSubTopic := relay.Topic(requestPushRPC.Query.PubsubTopic)
|
||||||
|
message := requestPushRPC.Query.Message
|
||||||
|
|
||||||
// TODO: Assumes success, should probably be extended to check for network, peers, etc
|
// TODO: Assumes success, should probably be extended to check for network, peers, etc
|
||||||
// It might make sense to use WithReadiness option here?
|
// It might make sense to use WithReadiness option here?
|
||||||
|
|
||||||
@ -118,56 +120,10 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type LightPushParameters struct {
|
|
||||||
selectedPeer peer.ID
|
|
||||||
requestId []byte
|
|
||||||
|
|
||||||
lp *WakuLightPush
|
|
||||||
}
|
|
||||||
|
|
||||||
type LightPushOption func(*LightPushParameters)
|
|
||||||
|
|
||||||
func WithPeer(p peer.ID) LightPushOption {
|
|
||||||
return func(params *LightPushParameters) {
|
|
||||||
params.selectedPeer = p
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithAutomaticPeerSelection() LightPushOption {
|
|
||||||
return func(params *LightPushParameters) {
|
|
||||||
p, err := utils.SelectPeer(params.lp.h, string(LightPushID_v20beta1))
|
|
||||||
if err == nil {
|
|
||||||
params.selectedPeer = *p
|
|
||||||
} else {
|
|
||||||
log.Info("Error selecting peer: ", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithRequestId(requestId []byte) LightPushOption {
|
|
||||||
return func(params *LightPushParameters) {
|
|
||||||
params.requestId = requestId
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithAutomaticRequestId() LightPushOption {
|
|
||||||
return func(params *LightPushParameters) {
|
|
||||||
params.requestId = protocol.GenerateRequestId()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultOptions() []LightPushOption {
|
|
||||||
return []LightPushOption{
|
|
||||||
WithAutomaticRequestId(),
|
|
||||||
WithAutomaticPeerSelection(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) {
|
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) {
|
||||||
params := new(LightPushParameters)
|
params := new(LightPushParameters)
|
||||||
params.lp = wakuLP
|
|
||||||
|
|
||||||
optList := DefaultOptions()
|
optList := DefaultOptions(wakuLP.h)
|
||||||
optList = append(optList, opts...)
|
optList = append(optList, opts...)
|
||||||
for _, opt := range optList {
|
for _, opt := range optList {
|
||||||
opt(params)
|
opt(params)
|
||||||
|
51
waku/v2/protocol/lightpush/waku_lightpush_option.go
Normal file
51
waku/v2/protocol/lightpush/waku_lightpush_option.go
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
package lightpush
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||||
|
"github.com/status-im/go-waku/waku/v2/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
type LightPushParameters struct {
|
||||||
|
selectedPeer peer.ID
|
||||||
|
requestId []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type LightPushOption func(*LightPushParameters)
|
||||||
|
|
||||||
|
func WithPeer(p peer.ID) LightPushOption {
|
||||||
|
return func(params *LightPushParameters) {
|
||||||
|
params.selectedPeer = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithAutomaticPeerSelection(host host.Host) LightPushOption {
|
||||||
|
return func(params *LightPushParameters) {
|
||||||
|
p, err := utils.SelectPeer(host, string(LightPushID_v20beta1))
|
||||||
|
if err == nil {
|
||||||
|
params.selectedPeer = *p
|
||||||
|
} else {
|
||||||
|
log.Info("Error selecting peer: ", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithRequestId(requestId []byte) LightPushOption {
|
||||||
|
return func(params *LightPushParameters) {
|
||||||
|
params.requestId = requestId
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithAutomaticRequestId() LightPushOption {
|
||||||
|
return func(params *LightPushParameters) {
|
||||||
|
params.requestId = protocol.GenerateRequestId()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func DefaultOptions(host host.Host) []LightPushOption {
|
||||||
|
return []LightPushOption{
|
||||||
|
WithAutomaticRequestId(),
|
||||||
|
WithAutomaticPeerSelection(host),
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user