mirror of https://github.com/status-im/go-waku.git
chore(lint): adds `make lint-full` target and fixes some linting errors
This commit is contained in:
parent
db25b307e2
commit
dcc87cf24f
|
@ -0,0 +1,12 @@
|
||||||
|
run:
|
||||||
|
build-tags:
|
||||||
|
- gowaku_rln
|
||||||
|
|
||||||
|
issues:
|
||||||
|
include:
|
||||||
|
- EXC0012
|
||||||
|
- EXC0014
|
||||||
|
|
||||||
|
linters:
|
||||||
|
enable:
|
||||||
|
- revive
|
|
@ -1,3 +1,3 @@
|
||||||
run:
|
run:
|
||||||
build-tags:
|
build-tags:
|
||||||
- gowaku_rln
|
- gowaku_rln
|
||||||
|
|
8
Makefile
8
Makefile
|
@ -8,7 +8,7 @@ SHELL := bash # the shell used internally by Make
|
||||||
|
|
||||||
GOBIN ?= $(shell which go)
|
GOBIN ?= $(shell which go)
|
||||||
|
|
||||||
.PHONY: all build lint test coverage build-example static-library dynamic-library test-c test-c-template mobile-android mobile-ios
|
.PHONY: all build lint lint-full test coverage build-example static-library dynamic-library test-c test-c-template mobile-android mobile-ios
|
||||||
|
|
||||||
ifeq ($(OS),Windows_NT) # is Windows_NT on XP, 2000, 7, Vista, 10...
|
ifeq ($(OS),Windows_NT) # is Windows_NT on XP, 2000, 7, Vista, 10...
|
||||||
detected_OS := Windows
|
detected_OS := Windows
|
||||||
|
@ -70,7 +70,11 @@ lint-install:
|
||||||
|
|
||||||
lint:
|
lint:
|
||||||
@echo "lint"
|
@echo "lint"
|
||||||
@golangci-lint --exclude=SA1019 run ./... --deadline=5m
|
@golangci-lint run ./... --deadline=5m
|
||||||
|
|
||||||
|
lint-full:
|
||||||
|
@echo "lint"
|
||||||
|
@golangci-lint run ./... --config=./.golangci.full.yaml --deadline=5m
|
||||||
|
|
||||||
test-with-race:
|
test-with-race:
|
||||||
${GOBIN} test -race -timeout 300s ./waku/...
|
${GOBIN} test -race -timeout 300s ./waku/...
|
||||||
|
|
|
@ -143,7 +143,7 @@ func Execute(options NodeOptions) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if options.DNS4DomainName != "" {
|
if options.DNS4DomainName != "" {
|
||||||
nodeOpts = append(nodeOpts, node.WithDns4Domain(options.DNS4DomainName))
|
nodeOpts = append(nodeOpts, node.WithDNS4Domain(options.DNS4DomainName))
|
||||||
}
|
}
|
||||||
|
|
||||||
libp2pOpts := node.DefaultLibP2POptions
|
libp2pOpts := node.DefaultLibP2POptions
|
||||||
|
@ -289,7 +289,7 @@ func Execute(options NodeOptions) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if options.Rendezvous.Enable {
|
if options.Rendezvous.Enable {
|
||||||
rdb := rendezvous.NewDB(ctx, db, logger)
|
rdb := rendezvous.NewDB(db, logger)
|
||||||
nodeOpts = append(nodeOpts, node.WithRendezvous(rdb))
|
nodeOpts = append(nodeOpts, node.WithRendezvous(rdb))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,7 +313,7 @@ func Execute(options NodeOptions) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, d := range discoveredNodes {
|
for _, d := range discoveredNodes {
|
||||||
wakuNode.AddDiscoveredPeer(d.PeerID, d.PeerInfo.Addrs, wakupeerstore.DnsDiscovery)
|
wakuNode.AddDiscoveredPeer(d.PeerID, d.PeerInfo.Addrs, wakupeerstore.DNSDiscovery)
|
||||||
}
|
}
|
||||||
|
|
||||||
addStaticPeers(wakuNode, options.Store.Nodes, store.StoreID_v20beta4)
|
addStaticPeers(wakuNode, options.Store.Nodes, store.StoreID_v20beta4)
|
||||||
|
@ -434,16 +434,16 @@ func Execute(options NodeOptions) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var rpcServer *rpc.WakuRpc
|
var rpcServer *rpc.WakuRPC
|
||||||
if options.RPCServer.Enable {
|
if options.RPCServer.Enable {
|
||||||
rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.PProf, options.RPCServer.RelayCacheCapacity, logger)
|
rpcServer = rpc.NewWakuRPC(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.PProf, options.RPCServer.RelayCacheCapacity, logger)
|
||||||
rpcServer.Start()
|
rpcServer.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
var restServer *rest.WakuRest
|
var restServer *rest.WakuRest
|
||||||
if options.RESTServer.Enable {
|
if options.RESTServer.Enable {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.RESTServer.Admin, options.PProf, options.RESTServer.RelayCacheCapacity, logger)
|
restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.PProf, options.RESTServer.RelayCacheCapacity, logger)
|
||||||
restServer.Start(ctx, &wg)
|
restServer.Start(ctx, &wg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,8 +20,8 @@ type InfoReply struct {
|
||||||
ListenAddresses []string `json:"listenAddresses,omitempty"`
|
ListenAddresses []string `json:"listenAddresses,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
const ROUTE_DEBUG_INFOV1 = "/debug/v1/info"
|
const routeDebugInfoV1 = "/debug/v1/info"
|
||||||
const ROUTE_DEBUG_VERSIONV1 = "/debug/v1/info"
|
const routeDebugVersionV1 = "/debug/v1/info"
|
||||||
|
|
||||||
func NewDebugService(node *node.WakuNode, m *chi.Mux) *DebugService {
|
func NewDebugService(node *node.WakuNode, m *chi.Mux) *DebugService {
|
||||||
d := &DebugService{
|
d := &DebugService{
|
||||||
|
@ -29,15 +29,15 @@ func NewDebugService(node *node.WakuNode, m *chi.Mux) *DebugService {
|
||||||
mux: m,
|
mux: m,
|
||||||
}
|
}
|
||||||
|
|
||||||
m.Get(ROUTE_DEBUG_INFOV1, d.getV1Info)
|
m.Get(routeDebugInfoV1, d.getV1Info)
|
||||||
m.Get(ROUTE_DEBUG_VERSIONV1, d.getV1Version)
|
m.Get(routeDebugVersionV1, d.getV1Version)
|
||||||
|
|
||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
type VersionResponse string
|
type VersionResponse string
|
||||||
|
|
||||||
func (d *DebugService) getV1Info(w http.ResponseWriter, r *http.Request) {
|
func (d *DebugService) getV1Info(w http.ResponseWriter, req *http.Request) {
|
||||||
response := new(InfoReply)
|
response := new(InfoReply)
|
||||||
response.ENRUri = d.node.ENR().String()
|
response.ENRUri = d.node.ENR().String()
|
||||||
for _, addr := range d.node.ListenAddresses() {
|
for _, addr := range d.node.ListenAddresses() {
|
||||||
|
@ -46,7 +46,7 @@ func (d *DebugService) getV1Info(w http.ResponseWriter, r *http.Request) {
|
||||||
writeErrOrResponse(w, nil, response)
|
writeErrOrResponse(w, nil, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DebugService) getV1Version(w http.ResponseWriter, r *http.Request) {
|
func (d *DebugService) getV1Version(w http.ResponseWriter, req *http.Request) {
|
||||||
response := VersionResponse(node.GetVersionInfo().String())
|
response := VersionResponse(node.GetVersionInfo().String())
|
||||||
writeErrOrResponse(w, nil, response)
|
writeErrOrResponse(w, nil, response)
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ func TestGetV1Info(t *testing.T) {
|
||||||
node: wakuNode1,
|
node: wakuNode1,
|
||||||
}
|
}
|
||||||
|
|
||||||
request, err := http.NewRequest(http.MethodPost, ROUTE_DEBUG_INFOV1, bytes.NewReader([]byte("")))
|
request, err := http.NewRequest(http.MethodPost, routeDebugInfoV1, bytes.NewReader([]byte("")))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
rr := httptest.NewRecorder()
|
rr := httptest.NewRecorder()
|
||||||
|
|
|
@ -93,66 +93,66 @@ func (r *RelayService) Stop() {
|
||||||
r.cancel()
|
r.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *RelayService) deleteV1Subscriptions(w http.ResponseWriter, r *http.Request) {
|
func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Request) {
|
||||||
var topics []string
|
var topics []string
|
||||||
decoder := json.NewDecoder(r.Body)
|
decoder := json.NewDecoder(req.Body)
|
||||||
if err := decoder.Decode(&topics); err != nil {
|
if err := decoder.Decode(&topics); err != nil {
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer r.Body.Close()
|
defer req.Body.Close()
|
||||||
|
|
||||||
d.messagesMutex.Lock()
|
r.messagesMutex.Lock()
|
||||||
defer d.messagesMutex.Unlock()
|
defer r.messagesMutex.Unlock()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
for _, topic := range topics {
|
for _, topic := range topics {
|
||||||
err = d.node.Relay().Unsubscribe(r.Context(), topic)
|
err = r.node.Relay().Unsubscribe(req.Context(), topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err))
|
r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
delete(d.messages, topic)
|
delete(r.messages, topic)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
writeErrOrResponse(w, err, true)
|
writeErrOrResponse(w, err, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *RelayService) postV1Subscriptions(w http.ResponseWriter, r *http.Request) {
|
func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Request) {
|
||||||
var topics []string
|
var topics []string
|
||||||
decoder := json.NewDecoder(r.Body)
|
decoder := json.NewDecoder(req.Body)
|
||||||
if err := decoder.Decode(&topics); err != nil {
|
if err := decoder.Decode(&topics); err != nil {
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer r.Body.Close()
|
defer req.Body.Close()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
var sub *relay.Subscription
|
var sub *relay.Subscription
|
||||||
var topicToSubscribe string
|
var topicToSubscribe string
|
||||||
for _, topic := range topics {
|
for _, topic := range topics {
|
||||||
if topic == "" {
|
if topic == "" {
|
||||||
sub, err = d.node.Relay().Subscribe(r.Context())
|
sub, err = r.node.Relay().Subscribe(req.Context())
|
||||||
topicToSubscribe = relay.DefaultWakuTopic
|
topicToSubscribe = relay.DefaultWakuTopic
|
||||||
} else {
|
} else {
|
||||||
sub, err = d.node.Relay().SubscribeToTopic(r.Context(), topic)
|
sub, err = r.node.Relay().SubscribeToTopic(req.Context(), topic)
|
||||||
topicToSubscribe = topic
|
topicToSubscribe = topic
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
|
r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
sub.Unsubscribe()
|
sub.Unsubscribe()
|
||||||
d.messagesMutex.Lock()
|
r.messagesMutex.Lock()
|
||||||
d.messages[topic] = []*pb.WakuMessage{}
|
r.messages[topic] = []*pb.WakuMessage{}
|
||||||
d.messagesMutex.Unlock()
|
r.messagesMutex.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
writeErrOrResponse(w, err, true)
|
writeErrOrResponse(w, err, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *RelayService) getV1Messages(w http.ResponseWriter, r *http.Request) {
|
func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
|
||||||
topic := chi.URLParam(r, "topic")
|
topic := chi.URLParam(req, "topic")
|
||||||
if topic == "" {
|
if topic == "" {
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
|
@ -160,55 +160,55 @@ func (d *RelayService) getV1Messages(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
d.messagesMutex.Lock()
|
r.messagesMutex.Lock()
|
||||||
defer d.messagesMutex.Unlock()
|
defer r.messagesMutex.Unlock()
|
||||||
|
|
||||||
if _, ok := d.messages[topic]; !ok {
|
if _, ok := r.messages[topic]; !ok {
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
_, err = w.Write([]byte("not subscribed to topic"))
|
_, err = w.Write([]byte("not subscribed to topic"))
|
||||||
d.log.Error("writing response", zap.Error(err))
|
r.log.Error("writing response", zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
response := d.messages[topic]
|
response := r.messages[topic]
|
||||||
|
|
||||||
d.messages[topic] = []*pb.WakuMessage{}
|
r.messages[topic] = []*pb.WakuMessage{}
|
||||||
writeErrOrResponse(w, nil, response)
|
writeErrOrResponse(w, nil, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *RelayService) postV1Message(w http.ResponseWriter, r *http.Request) {
|
func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) {
|
||||||
topic := chi.URLParam(r, "topic")
|
topic := chi.URLParam(req, "topic")
|
||||||
if topic == "" {
|
if topic == "" {
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var message *pb.WakuMessage
|
var message *pb.WakuMessage
|
||||||
decoder := json.NewDecoder(r.Body)
|
decoder := json.NewDecoder(req.Body)
|
||||||
if err := decoder.Decode(&message); err != nil {
|
if err := decoder.Decode(&message); err != nil {
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer r.Body.Close()
|
defer req.Body.Close()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
if topic == "" {
|
if topic == "" {
|
||||||
topic = relay.DefaultWakuTopic
|
topic = relay.DefaultWakuTopic
|
||||||
}
|
}
|
||||||
|
|
||||||
if !d.node.Relay().IsSubscribed(topic) {
|
if !r.node.Relay().IsSubscribed(topic) {
|
||||||
writeErrOrResponse(w, errors.New("not subscribed to pubsubTopic"), nil)
|
writeErrOrResponse(w, errors.New("not subscribed to pubsubTopic"), nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = server.AppendRLNProof(d.node, message); err != nil {
|
if err = server.AppendRLNProof(r.node, message); err != nil {
|
||||||
writeErrOrResponse(w, err, nil)
|
writeErrOrResponse(w, err, nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = d.node.Relay().PublishToTopic(r.Context(), message, strings.Replace(topic, "\n", "", -1))
|
_, err = r.node.Relay().PublishToTopic(req.Context(), message, strings.Replace(topic, "\n", "", -1))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.log.Error("publishing message", zap.Error(err))
|
r.log.Error("publishing message", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
writeErrOrResponse(w, err, true)
|
writeErrOrResponse(w, err, true)
|
||||||
|
|
|
@ -41,11 +41,11 @@ func TestPostV1Message(t *testing.T) {
|
||||||
Version: 0,
|
Version: 0,
|
||||||
Timestamp: utils.GetUnixEpoch(),
|
Timestamp: utils.GetUnixEpoch(),
|
||||||
}
|
}
|
||||||
msgJsonBytes, err := json.Marshal(msg)
|
msgJSONBytes, err := json.Marshal(msg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
rr := httptest.NewRecorder()
|
rr := httptest.NewRecorder()
|
||||||
req, _ := http.NewRequest(http.MethodPost, "/relay/v1/messages/test", bytes.NewReader(msgJsonBytes))
|
req, _ := http.NewRequest(http.MethodPost, "/relay/v1/messages/test", bytes.NewReader(msgJSONBytes))
|
||||||
router.ServeHTTP(rr, req)
|
router.ServeHTTP(rr, req)
|
||||||
require.Equal(t, http.StatusOK, rr.Code)
|
require.Equal(t, http.StatusOK, rr.Code)
|
||||||
require.Equal(t, "true", rr.Body.String())
|
require.Equal(t, "true", rr.Body.String())
|
||||||
|
|
|
@ -44,7 +44,7 @@ type StoreWakuMessage struct {
|
||||||
Meta []byte `json:"meta"`
|
Meta []byte `json:"meta"`
|
||||||
}
|
}
|
||||||
|
|
||||||
const ROUTE_STORE_MESSAGESV1 = "/store/v1/messages"
|
const routeStoreMessagesV1 = "/store/v1/messages"
|
||||||
|
|
||||||
func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService {
|
func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService {
|
||||||
s := &StoreService{
|
s := &StoreService{
|
||||||
|
@ -52,7 +52,7 @@ func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService {
|
||||||
mux: m,
|
mux: m,
|
||||||
}
|
}
|
||||||
|
|
||||||
m.Get(ROUTE_STORE_MESSAGESV1, s.getV1Messages)
|
m.Get(routeStoreMessagesV1, s.getV1Messages)
|
||||||
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,7 @@ func TestGetMessages(t *testing.T) {
|
||||||
"pubsubTopic": {pubsubTopic1},
|
"pubsubTopic": {pubsubTopic1},
|
||||||
"pageSize": {"2"},
|
"pageSize": {"2"},
|
||||||
}
|
}
|
||||||
path := ROUTE_STORE_MESSAGESV1 + "?" + queryParams.Encode()
|
path := routeStoreMessagesV1 + "?" + queryParams.Encode()
|
||||||
req, _ := http.NewRequest(http.MethodGet, path, nil)
|
req, _ := http.NewRequest(http.MethodGet, path, nil)
|
||||||
router.ServeHTTP(rr, req)
|
router.ServeHTTP(rr, req)
|
||||||
require.Equal(t, http.StatusOK, rr.Code)
|
require.Equal(t, http.StatusOK, rr.Code)
|
||||||
|
@ -83,7 +83,7 @@ func TestGetMessages(t *testing.T) {
|
||||||
"digest": {base64.URLEncoding.EncodeToString(response.Cursor.Digest)},
|
"digest": {base64.URLEncoding.EncodeToString(response.Cursor.Digest)},
|
||||||
"pageSize": {"2"},
|
"pageSize": {"2"},
|
||||||
}
|
}
|
||||||
path = ROUTE_STORE_MESSAGESV1 + "?" + queryParams.Encode()
|
path = routeStoreMessagesV1 + "?" + queryParams.Encode()
|
||||||
req, _ = http.NewRequest(http.MethodGet, path, nil)
|
req, _ = http.NewRequest(http.MethodGet, path, nil)
|
||||||
router.ServeHTTP(rr, req)
|
router.ServeHTTP(rr, req)
|
||||||
require.Equal(t, http.StatusOK, rr.Code)
|
require.Equal(t, http.StatusOK, rr.Code)
|
||||||
|
|
|
@ -21,7 +21,7 @@ type WakuRest struct {
|
||||||
relayService *RelayService
|
relayService *RelayService
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWakuRest(node *node.WakuNode, address string, port int, enableAdmin bool, enablePProf bool, relayCacheCapacity int, log *zap.Logger) *WakuRest {
|
func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool, relayCacheCapacity int, log *zap.Logger) *WakuRest {
|
||||||
wrpc := new(WakuRest)
|
wrpc := new(WakuRest)
|
||||||
wrpc.log = log.Named("rest")
|
wrpc.log = log.Named("rest")
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@ func TestWakuRest(t *testing.T) {
|
||||||
n, err := node.New(options)
|
n, err := node.New(options)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
rpc := NewWakuRest(n, "127.0.0.1", 8080, true, false, 10, utils.Logger())
|
rpc := NewWakuRest(n, "127.0.0.1", 8080, false, 10, utils.Logger())
|
||||||
require.NotNil(t, rpc.server)
|
require.NotNil(t, rpc.server)
|
||||||
require.Equal(t, rpc.server.Addr, "127.0.0.1:8080")
|
require.Equal(t, rpc.server.Addr, "127.0.0.1:8080")
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ type serverRequest struct {
|
||||||
Params *json.RawMessage `json:"params"`
|
Params *json.RawMessage `json:"params"`
|
||||||
// The request id. This can be of any type. It is used to match the
|
// The request id. This can be of any type. It is used to match the
|
||||||
// response with the request that it is replying to.
|
// response with the request that it is replying to.
|
||||||
Id *json.RawMessage `json:"id"`
|
ID *json.RawMessage `json:"id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// serverResponse represents a JSON-RPC response returned by the server.
|
// serverResponse represents a JSON-RPC response returned by the server.
|
||||||
|
@ -52,7 +52,7 @@ type serverResponse struct {
|
||||||
// null if there was no error.
|
// null if there was no error.
|
||||||
Error interface{} `json:"error"`
|
Error interface{} `json:"error"`
|
||||||
// This must be the same id as the request it is responding to.
|
// This must be the same id as the request it is responding to.
|
||||||
Id *json.RawMessage `json:"id"`
|
ID *json.RawMessage `json:"id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
|
@ -158,12 +158,12 @@ func (c *CodecRequest) ReadRequest(args interface{}) error {
|
||||||
|
|
||||||
// WriteResponse encodes the response and writes it to the ResponseWriter.
|
// WriteResponse encodes the response and writes it to the ResponseWriter.
|
||||||
func (c *CodecRequest) WriteResponse(w http.ResponseWriter, reply interface{}) {
|
func (c *CodecRequest) WriteResponse(w http.ResponseWriter, reply interface{}) {
|
||||||
if c.request.Id != nil {
|
if c.request.ID != nil {
|
||||||
// Id is null for notifications and they don't have a response.
|
// Id is null for notifications and they don't have a response.
|
||||||
res := &serverResponse{
|
res := &serverResponse{
|
||||||
Result: reply,
|
Result: reply,
|
||||||
Error: &null,
|
Error: &null,
|
||||||
Id: c.request.Id,
|
ID: c.request.ID,
|
||||||
}
|
}
|
||||||
c.writeServerResponse(w, 200, res)
|
c.writeServerResponse(w, 200, res)
|
||||||
}
|
}
|
||||||
|
@ -172,7 +172,7 @@ func (c *CodecRequest) WriteResponse(w http.ResponseWriter, reply interface{}) {
|
||||||
func (c *CodecRequest) WriteError(w http.ResponseWriter, _ int, err error) {
|
func (c *CodecRequest) WriteError(w http.ResponseWriter, _ int, err error) {
|
||||||
res := &serverResponse{
|
res := &serverResponse{
|
||||||
Result: &null,
|
Result: &null,
|
||||||
Id: c.request.Id,
|
ID: c.request.ID,
|
||||||
}
|
}
|
||||||
if jsonErr, ok := err.(*Error); ok {
|
if jsonErr, ok := err.(*Error); ok {
|
||||||
res.Error = jsonErr.Data
|
res.Error = jsonErr.Data
|
||||||
|
|
|
@ -32,8 +32,8 @@ func TestBase64Encoding(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, base64.StdEncoding.EncodeToString([]byte(input)), m["payload"])
|
require.Equal(t, base64.StdEncoding.EncodeToString([]byte(input)), m["payload"])
|
||||||
|
|
||||||
decodedRpcMsg := new(RPCWakuMessage)
|
decodedRPCMsg := new(RPCWakuMessage)
|
||||||
err = json.Unmarshal(jsonBytes, decodedRpcMsg)
|
err = json.Unmarshal(jsonBytes, decodedRPCMsg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, input, string(decodedRpcMsg.Payload))
|
require.Equal(t, input, string(decodedRPCMsg.Payload))
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type WakuRpc struct {
|
type WakuRPC struct {
|
||||||
node *node.WakuNode
|
node *node.WakuNode
|
||||||
server *http.Server
|
server *http.Server
|
||||||
|
|
||||||
|
@ -24,8 +24,8 @@ type WakuRpc struct {
|
||||||
adminService *AdminService
|
adminService *AdminService
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, enablePProf bool, cacheCapacity int, log *zap.Logger) *WakuRpc {
|
func NewWakuRPC(node *node.WakuNode, address string, port int, enableAdmin bool, enablePProf bool, cacheCapacity int, log *zap.Logger) *WakuRPC {
|
||||||
wrpc := new(WakuRpc)
|
wrpc := new(WakuRPC)
|
||||||
wrpc.log = log.Named("rpc")
|
wrpc.log = log.Named("rpc")
|
||||||
|
|
||||||
s := rpc.NewServer()
|
s := rpc.NewServer()
|
||||||
|
@ -102,7 +102,7 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool,
|
||||||
return wrpc
|
return wrpc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *WakuRpc) Start() {
|
func (r *WakuRPC) Start() {
|
||||||
if r.relayService != nil {
|
if r.relayService != nil {
|
||||||
go r.relayService.Start()
|
go r.relayService.Start()
|
||||||
}
|
}
|
||||||
|
@ -115,7 +115,7 @@ func (r *WakuRpc) Start() {
|
||||||
r.log.Info("server started", zap.String("addr", r.server.Addr))
|
r.log.Info("server started", zap.String("addr", r.server.Addr))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *WakuRpc) Stop(ctx context.Context) error {
|
func (r *WakuRPC) Stop(ctx context.Context) error {
|
||||||
r.log.Info("shutting down server")
|
r.log.Info("shutting down server")
|
||||||
return r.server.Shutdown(ctx)
|
return r.server.Shutdown(ctx)
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@ func TestWakuRpc(t *testing.T) {
|
||||||
n, err := node.New(options)
|
n, err := node.New(options)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
rpc := NewWakuRpc(n, "127.0.0.1", 8080, true, false, 30, utils.Logger())
|
rpc := NewWakuRPC(n, "127.0.0.1", 8080, true, false, 30, utils.Logger())
|
||||||
require.NotNil(t, rpc.server)
|
require.NotNil(t, rpc.server)
|
||||||
require.Equal(t, rpc.server.Addr, "127.0.0.1:8080")
|
require.Equal(t, rpc.server.Addr, "127.0.0.1:8080")
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,8 +14,8 @@ func execOkCB(onOkCb C.WakuCallBack, value string) C.int {
|
||||||
}
|
}
|
||||||
|
|
||||||
val := C.CString(value)
|
val := C.CString(value)
|
||||||
len := C.size_t(len(value))
|
valLen := C.size_t(len(value))
|
||||||
C._waku_execCB(onOkCb, val, len)
|
C._waku_execCB(onOkCb, val, valLen)
|
||||||
|
|
||||||
C.free(unsafe.Pointer(val))
|
C.free(unsafe.Pointer(val))
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ func DNSDiscovery(url string, nameserver string, ms int) (string, error) {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
|
|
||||||
var dnsDiscOpt []dnsdisc.DnsDiscoveryOption
|
var dnsDiscOpt []dnsdisc.DNSDiscoveryOption
|
||||||
if nameserver != "" {
|
if nameserver != "" {
|
||||||
dnsDiscOpt = append(dnsDiscOpt, dnsdisc.WithNameserver(nameserver))
|
dnsDiscOpt = append(dnsDiscOpt, dnsdisc.WithNameserver(nameserver))
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,7 +104,7 @@ func NewNode(configJSON string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.DNS4DomainName != "" {
|
if config.DNS4DomainName != "" {
|
||||||
opts = append(opts, node.WithDns4Domain(config.DNS4DomainName))
|
opts = append(opts, node.WithDNS4Domain(config.DNS4DomainName))
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.Websockets.Enabled {
|
if config.Websockets.Enabled {
|
||||||
|
|
|
@ -68,7 +68,7 @@ func HostID(key string, id peer.ID) zapcore.Field {
|
||||||
return zap.Stringer(key, hostID(id))
|
return zap.Stringer(key, hostID(id))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (id hostID) String() string { return peer.Encode(peer.ID(id)) }
|
func (id hostID) String() string { return peer.ID(id).String() }
|
||||||
|
|
||||||
// Time - Waku uses Nanosecond Unix Time
|
// Time - Waku uses Nanosecond Unix Time
|
||||||
type timestamp int64
|
type timestamp int64
|
||||||
|
|
|
@ -51,8 +51,8 @@ func TestBasicSendingReceiving(t *testing.T) {
|
||||||
|
|
||||||
func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) error {
|
func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) error {
|
||||||
var contentTopic string = "test"
|
var contentTopic string = "test"
|
||||||
var version uint32 = 0
|
version := uint32(0)
|
||||||
var timestamp int64 = utils.GetUnixEpoch()
|
timestamp := utils.GetUnixEpoch()
|
||||||
|
|
||||||
p := new(payload.Payload)
|
p := new(payload.Payload)
|
||||||
p.Data = []byte(wakuNode.ID() + ": " + msgContent)
|
p.Data = []byte(wakuNode.ID() + ": " + msgContent)
|
||||||
|
|
|
@ -22,13 +22,13 @@ func (k *MultiaddrSlice) Set(value string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *MultiaddrSlice) String() string {
|
func (k *MultiaddrSlice) String() string {
|
||||||
if v.Values == nil {
|
if k.Values == nil {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
var output []string
|
var output []string
|
||||||
for _, v := range *v.Values {
|
for _, v := range *k.Values {
|
||||||
output = append(output, v.String())
|
output = append(output, v.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,12 +42,12 @@ func (k *ProtectedTopicSlice) Set(value string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *ProtectedTopicSlice) String() string {
|
func (k *ProtectedTopicSlice) String() string {
|
||||||
if v.Values == nil {
|
if k.Values == nil {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
var output []string
|
var output []string
|
||||||
for _, v := range *v.Values {
|
for _, v := range *k.Values {
|
||||||
output = append(output, v.String())
|
output = append(output, v.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -76,6 +76,7 @@ func WithAutoUpdate(autoUpdate bool) DiscoveryV5Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithBootnodes is an option used to specify the bootstrap nodes to use with DiscV5
|
||||||
func WithBootnodes(bootnodes []*enode.Node) DiscoveryV5Option {
|
func WithBootnodes(bootnodes []*enode.Node) DiscoveryV5Option {
|
||||||
return func(params *discV5Parameters) {
|
return func(params *discV5Parameters) {
|
||||||
params.bootnodes = bootnodes
|
params.bootnodes = bootnodes
|
||||||
|
@ -106,6 +107,7 @@ func WithAutoFindPeers(find bool) DiscoveryV5Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DefaultOptions contains the default list of options used when setting up DiscoveryV5
|
||||||
func DefaultOptions() []DiscoveryV5Option {
|
func DefaultOptions() []DiscoveryV5Option {
|
||||||
return []DiscoveryV5Option{
|
return []DiscoveryV5Option{
|
||||||
WithUDPPort(9000),
|
WithUDPPort(9000),
|
||||||
|
@ -124,7 +126,7 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn
|
||||||
|
|
||||||
logger := log.Named("discv5")
|
logger := log.Named("discv5")
|
||||||
|
|
||||||
var NAT nat.Interface = nil
|
var NAT nat.Interface
|
||||||
if params.advertiseAddr == nil {
|
if params.advertiseAddr == nil {
|
||||||
NAT = nat.Any()
|
NAT = nat.Any()
|
||||||
}
|
}
|
||||||
|
@ -269,6 +271,7 @@ func (d *DiscoveryV5) Start(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetBootnodes is used to setup the bootstrap nodes to use for discovering new peers
|
||||||
func (d *DiscoveryV5) SetBootnodes(nodes []*enode.Node) error {
|
func (d *DiscoveryV5) SetBootnodes(nodes []*enode.Node) error {
|
||||||
if d.listener == nil {
|
if d.listener == nil {
|
||||||
return ErrNoDiscV5Listener
|
return ErrNoDiscV5Listener
|
||||||
|
@ -277,6 +280,7 @@ func (d *DiscoveryV5) SetBootnodes(nodes []*enode.Node) error {
|
||||||
return d.listener.SetFallbackNodes(nodes)
|
return d.listener.SetFallbackNodes(nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stop is a function that stops the execution of DiscV5.
|
||||||
// only works if the discovery v5 is in running state
|
// only works if the discovery v5 is in running state
|
||||||
// so we can assume that cancel method is set
|
// so we can assume that cancel method is set
|
||||||
func (d *DiscoveryV5) Stop() {
|
func (d *DiscoveryV5) Stop() {
|
||||||
|
@ -524,6 +528,7 @@ restartLoop:
|
||||||
d.log.Warn("Discv5 loop stopped")
|
d.log.Warn("Discv5 loop stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsStarted determines whether discoveryV5 started or not
|
||||||
func (d *DiscoveryV5) IsStarted() bool {
|
func (d *DiscoveryV5) IsStarted() bool {
|
||||||
return d.started.Load()
|
return d.started.Load()
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ func FilterPredicate(predicate func(*enode.Node) bool) Predicate {
|
||||||
}
|
}
|
||||||
|
|
||||||
// FilterShard creates a Predicate that filters nodes that belong to a specific shard
|
// FilterShard creates a Predicate that filters nodes that belong to a specific shard
|
||||||
func FilterShard(iterator enode.Iterator, cluster, index uint16) Predicate {
|
func FilterShard(cluster, index uint16) Predicate {
|
||||||
return func(iterator enode.Iterator) enode.Iterator {
|
return func(iterator enode.Iterator) enode.Iterator {
|
||||||
predicate := func(node *enode.Node) bool {
|
predicate := func(node *enode.Node) bool {
|
||||||
rs, err := wenr.RelaySharding(node.Record())
|
rs, err := wenr.RelaySharding(node.Record())
|
||||||
|
@ -33,7 +33,7 @@ func FilterShard(iterator enode.Iterator, cluster, index uint16) Predicate {
|
||||||
}
|
}
|
||||||
|
|
||||||
// FilterCapabilities creates a Predicate to filter nodes that support specific protocols
|
// FilterCapabilities creates a Predicate to filter nodes that support specific protocols
|
||||||
func FilterCapabilities(iterator enode.Iterator, flags wenr.WakuEnrBitfield) Predicate {
|
func FilterCapabilities(flags wenr.WakuEnrBitfield) Predicate {
|
||||||
return func(iterator enode.Iterator) enode.Iterator {
|
return func(iterator enode.Iterator) enode.Iterator {
|
||||||
predicate := func(node *enode.Node) bool {
|
predicate := func(node *enode.Node) bool {
|
||||||
enrField := new(wenr.WakuEnrBitfield)
|
enrField := new(wenr.WakuEnrBitfield)
|
||||||
|
|
|
@ -17,10 +17,10 @@ type dnsDiscoveryParameters struct {
|
||||||
nameserver string
|
nameserver string
|
||||||
}
|
}
|
||||||
|
|
||||||
type DnsDiscoveryOption func(*dnsDiscoveryParameters)
|
type DNSDiscoveryOption func(*dnsDiscoveryParameters)
|
||||||
|
|
||||||
// WithNameserver is a DnsDiscoveryOption that configures the nameserver to use
|
// WithNameserver is a DnsDiscoveryOption that configures the nameserver to use
|
||||||
func WithNameserver(nameserver string) DnsDiscoveryOption {
|
func WithNameserver(nameserver string) DNSDiscoveryOption {
|
||||||
return func(params *dnsDiscoveryParameters) {
|
return func(params *dnsDiscoveryParameters) {
|
||||||
params.nameserver = nameserver
|
params.nameserver = nameserver
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,7 @@ type DiscoveredNode struct {
|
||||||
ENR *enode.Node
|
ENR *enode.Node
|
||||||
}
|
}
|
||||||
|
|
||||||
var metrics Metrics = nil
|
var metrics Metrics
|
||||||
|
|
||||||
// SetPrometheusRegisterer is used to setup a custom prometheus registerer for metrics
|
// SetPrometheusRegisterer is used to setup a custom prometheus registerer for metrics
|
||||||
func SetPrometheusRegisterer(reg prometheus.Registerer, logger *zap.Logger) {
|
func SetPrometheusRegisterer(reg prometheus.Registerer, logger *zap.Logger) {
|
||||||
|
@ -44,7 +44,7 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// RetrieveNodes returns a list of multiaddress given a url to a DNS discoverable ENR tree
|
// RetrieveNodes returns a list of multiaddress given a url to a DNS discoverable ENR tree
|
||||||
func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption) ([]DiscoveredNode, error) {
|
func RetrieveNodes(ctx context.Context, url string, opts ...DNSDiscoveryOption) ([]DiscoveredNode, error) {
|
||||||
var discoveredNodes []DiscoveredNode
|
var discoveredNodes []DiscoveredNode
|
||||||
|
|
||||||
params := new(dnsDiscoveryParameters)
|
params := new(dnsDiscoveryParameters)
|
||||||
|
|
|
@ -10,6 +10,7 @@ var sha256Pool = sync.Pool{New: func() interface{} {
|
||||||
return sha256.New()
|
return sha256.New()
|
||||||
}}
|
}}
|
||||||
|
|
||||||
|
// SHA256 generates the SHA256 hash from the input data
|
||||||
func SHA256(data ...[]byte) []byte {
|
func SHA256(data ...[]byte) []byte {
|
||||||
h, ok := sha256Pool.Get().(hash.Hash)
|
h, ok := sha256Pool.Get().(hash.Hash)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -16,7 +16,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags wenr.WakuEnrBitfield, advertiseAddr []ma.Multiaddr, shouldAutoUpdate bool, log *zap.Logger) error {
|
func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags wenr.WakuEnrBitfield, advertiseAddr []ma.Multiaddr, shouldAutoUpdate bool) error {
|
||||||
var options []wenr.ENROption
|
var options []wenr.ENROption
|
||||||
options = append(options, wenr.WithUDPPort(udpPort))
|
options = append(options, wenr.WithUDPPort(udpPort))
|
||||||
options = append(options, wenr.WithWakuBitfield(wakuFlags))
|
options = append(options, wenr.WithWakuBitfield(wakuFlags))
|
||||||
|
@ -268,7 +268,7 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = w.updateLocalNode(w.localNode, multiaddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddrs, w.opts.discV5autoUpdate, w.log)
|
err = w.updateLocalNode(w.localNode, multiaddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddrs, w.opts.discV5autoUpdate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.log.Error("updating localnode ENR record", zap.Error(err))
|
w.log.Error("updating localnode ENR record", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -171,8 +171,8 @@ func WithPrometheusRegisterer(reg prometheus.Registerer) WakuNodeOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithDns4Domain is a WakuNodeOption that adds a custom domain name to listen
|
// WithDNS4Domain is a WakuNodeOption that adds a custom domain name to listen
|
||||||
func WithDns4Domain(dns4Domain string) WakuNodeOption {
|
func WithDNS4Domain(dns4Domain string) WakuNodeOption {
|
||||||
return func(params *WakuNodeParameters) error {
|
return func(params *WakuNodeParameters) error {
|
||||||
params.dns4Domain = dns4Domain
|
params.dns4Domain = dns4Domain
|
||||||
previousAddrFactory := params.addressFactory
|
previousAddrFactory := params.addressFactory
|
||||||
|
@ -203,9 +203,9 @@ func WithDns4Domain(dns4Domain string) WakuNodeOption {
|
||||||
|
|
||||||
if previousAddrFactory != nil {
|
if previousAddrFactory != nil {
|
||||||
return previousAddrFactory(addresses)
|
return previousAddrFactory(addresses)
|
||||||
} else {
|
|
||||||
return addresses
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return addresses
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -72,16 +72,15 @@ func (payload Payload) Encode(version uint32) ([]byte, error) {
|
||||||
encoded, err := encryptSymmetric(data, payload.Key.SymKey)
|
encoded, err := encryptSymmetric(data, payload.Key.SymKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't encrypt using symmetric key: %w", err)
|
return nil, fmt.Errorf("couldn't encrypt using symmetric key: %w", err)
|
||||||
} else {
|
|
||||||
return encoded, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return encoded, nil
|
||||||
case Asymmetric:
|
case Asymmetric:
|
||||||
encoded, err := encryptAsymmetric(data, &payload.Key.PubKey)
|
encoded, err := encryptAsymmetric(data, &payload.Key.PubKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't encrypt using asymmetric key: %w", err)
|
return nil, fmt.Errorf("couldn't encrypt using asymmetric key: %w", err)
|
||||||
} else {
|
|
||||||
return encoded, nil
|
|
||||||
}
|
}
|
||||||
|
return encoded, nil
|
||||||
case None:
|
case None:
|
||||||
return nil, errors.New("non supported KeyKind")
|
return nil, errors.New("non supported KeyKind")
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,10 +25,15 @@ func NewTestPeerDiscoverer() *TestPeerDiscoverer {
|
||||||
// Subscribe is for subscribing to peer discoverer
|
// Subscribe is for subscribing to peer discoverer
|
||||||
func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan PeerData) {
|
func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan PeerData) {
|
||||||
go func() {
|
go func() {
|
||||||
for p := range ch {
|
for {
|
||||||
t.Lock()
|
select {
|
||||||
t.peerMap[p.AddrInfo.ID] = struct{}{}
|
case <-ctx.Done():
|
||||||
t.Unlock()
|
return
|
||||||
|
case p := <-ch:
|
||||||
|
t.Lock()
|
||||||
|
t.peerMap[p.AddrInfo.ID] = struct{}{}
|
||||||
|
t.Unlock()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
|
@ -300,7 +300,7 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
|
||||||
// If a list of specific peers is passed, the peer will be chosen from that list assuming
|
// If a list of specific peers is passed, the peer will be chosen from that list assuming
|
||||||
// it supports the chosen protocol, otherwise it will chose a peer from the service slot.
|
// it supports the chosen protocol, otherwise it will chose a peer from the service slot.
|
||||||
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
|
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
|
||||||
func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID, logger *zap.Logger) (peer.ID, error) {
|
func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID) (peer.ID, error) {
|
||||||
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
|
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
|
||||||
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
|
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
|
||||||
// This will require us to check for various factors such as:
|
// This will require us to check for various factors such as:
|
||||||
|
@ -319,5 +319,6 @@ func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID, lo
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return utils.SelectRandomPeer(filteredPeers, pm.logger)
|
return utils.SelectRandomPeer(filteredPeers, pm.logger)
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,7 +64,7 @@ func TestServiceSlots(t *testing.T) {
|
||||||
///////////////
|
///////////////
|
||||||
|
|
||||||
// select peer from pm, currently only h2 is set in pm
|
// select peer from pm, currently only h2 is set in pm
|
||||||
peerID, err := pm.SelectPeer(protocol, nil, utils.Logger())
|
peerID, err := pm.SelectPeer(protocol, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, peerID, h2.ID())
|
require.Equal(t, peerID, h2.ID())
|
||||||
|
|
||||||
|
@ -73,7 +73,7 @@ func TestServiceSlots(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// check that returned peer is h2 or h3 peer
|
// check that returned peer is h2 or h3 peer
|
||||||
peerID, err = pm.SelectPeer(protocol, nil, utils.Logger())
|
peerID, err = pm.SelectPeer(protocol, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
if peerID == h2.ID() || peerID == h3.ID() {
|
if peerID == h2.ID() || peerID == h3.ID() {
|
||||||
//Test success
|
//Test success
|
||||||
|
@ -89,7 +89,7 @@ func TestServiceSlots(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer h4.Close()
|
defer h4.Close()
|
||||||
|
|
||||||
_, err = pm.SelectPeer(protocol1, nil, utils.Logger())
|
_, err = pm.SelectPeer(protocol1, nil)
|
||||||
require.Error(t, err, utils.ErrNoPeersAvailable)
|
require.Error(t, err, utils.ErrNoPeersAvailable)
|
||||||
|
|
||||||
// add h4 peer for protocol1
|
// add h4 peer for protocol1
|
||||||
|
@ -97,7 +97,7 @@ func TestServiceSlots(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
//Test peer selection for protocol1
|
//Test peer selection for protocol1
|
||||||
peerID, err = pm.SelectPeer(protocol1, nil, utils.Logger())
|
peerID, err = pm.SelectPeer(protocol1, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, peerID, h4.ID())
|
require.Equal(t, peerID, h4.ID())
|
||||||
|
|
||||||
|
@ -110,7 +110,7 @@ func TestDefaultProtocol(t *testing.T) {
|
||||||
// check peer for default protocol
|
// check peer for default protocol
|
||||||
///////////////
|
///////////////
|
||||||
//Test empty peer selection for relay protocol
|
//Test empty peer selection for relay protocol
|
||||||
_, err := pm.SelectPeer(WakuRelayIDv200, nil, utils.Logger())
|
_, err := pm.SelectPeer(WakuRelayIDv200, nil)
|
||||||
require.Error(t, err, utils.ErrNoPeersAvailable)
|
require.Error(t, err, utils.ErrNoPeersAvailable)
|
||||||
|
|
||||||
///////////////
|
///////////////
|
||||||
|
@ -125,7 +125,7 @@ func TestDefaultProtocol(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
|
// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
|
||||||
peerID, err := pm.SelectPeer(WakuRelayIDv200, nil, utils.Logger())
|
peerID, err := pm.SelectPeer(WakuRelayIDv200, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, peerID, h5.ID())
|
require.Equal(t, peerID, h5.ID())
|
||||||
}
|
}
|
||||||
|
@ -145,12 +145,12 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
|
||||||
_, err = pm.AddPeer(getAddr(h6), wps.Static, protocol2)
|
_, err = pm.AddPeer(getAddr(h6), wps.Static, protocol2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
peerID, err := pm.SelectPeer(protocol2, nil, utils.Logger())
|
peerID, err := pm.SelectPeer(protocol2, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, peerID, h6.ID())
|
require.Equal(t, peerID, h6.ID())
|
||||||
|
|
||||||
pm.RemovePeer(peerID)
|
pm.RemovePeer(peerID)
|
||||||
_, err = pm.SelectPeer(protocol2, nil, utils.Logger())
|
_, err = pm.SelectPeer(protocol2, nil)
|
||||||
require.Error(t, err, utils.ErrNoPeersAvailable)
|
require.Error(t, err, utils.ErrNoPeersAvailable)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,17 +14,17 @@ func TestServiceSlot(t *testing.T) {
|
||||||
|
|
||||||
protocol := libp2pProtocol.ID("test/protocol")
|
protocol := libp2pProtocol.ID("test/protocol")
|
||||||
|
|
||||||
peerId := peer.ID("peerId")
|
peerID := peer.ID("peerId")
|
||||||
|
|
||||||
//
|
//
|
||||||
slots.getPeers(protocol).add(peerId)
|
slots.getPeers(protocol).add(peerID)
|
||||||
//
|
//
|
||||||
fetchedPeer, err := slots.getPeers(protocol).getRandom()
|
fetchedPeer, err := slots.getPeers(protocol).getRandom()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, peerId, fetchedPeer)
|
require.Equal(t, peerID, fetchedPeer)
|
||||||
|
|
||||||
//
|
//
|
||||||
slots.getPeers(protocol).remove(peerId)
|
slots.getPeers(protocol).remove(peerID)
|
||||||
//
|
//
|
||||||
_, err = slots.getPeers(protocol).getRandom()
|
_, err = slots.getPeers(protocol).getRandom()
|
||||||
require.Equal(t, err, utils.ErrNoPeersAvailable)
|
require.Equal(t, err, utils.ErrNoPeersAvailable)
|
||||||
|
@ -36,18 +36,18 @@ func TestServiceSlotRemovePeerFromAll(t *testing.T) {
|
||||||
protocol := libp2pProtocol.ID("test/protocol")
|
protocol := libp2pProtocol.ID("test/protocol")
|
||||||
protocol1 := libp2pProtocol.ID("test/protocol1")
|
protocol1 := libp2pProtocol.ID("test/protocol1")
|
||||||
|
|
||||||
peerId := peer.ID("peerId")
|
peerID := peer.ID("peerId")
|
||||||
|
|
||||||
//
|
//
|
||||||
slots.getPeers(protocol).add(peerId)
|
slots.getPeers(protocol).add(peerID)
|
||||||
slots.getPeers(protocol1).add(peerId)
|
slots.getPeers(protocol1).add(peerID)
|
||||||
//
|
//
|
||||||
fetchedPeer, err := slots.getPeers(protocol1).getRandom()
|
fetchedPeer, err := slots.getPeers(protocol1).getRandom()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, peerId, fetchedPeer)
|
require.Equal(t, peerID, fetchedPeer)
|
||||||
|
|
||||||
//
|
//
|
||||||
slots.removePeer(peerId)
|
slots.removePeer(peerID)
|
||||||
//
|
//
|
||||||
_, err = slots.getPeers(protocol).getRandom()
|
_, err = slots.getPeers(protocol).getRandom()
|
||||||
require.Equal(t, err, utils.ErrNoPeersAvailable)
|
require.Equal(t, err, utils.ErrNoPeersAvailable)
|
||||||
|
|
|
@ -18,7 +18,7 @@ const (
|
||||||
Discv5
|
Discv5
|
||||||
Static
|
Static
|
||||||
PeerExchange
|
PeerExchange
|
||||||
DnsDiscovery
|
DNSDiscovery
|
||||||
Rendezvous
|
Rendezvous
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ type WakuEnrBitfield = uint8
|
||||||
|
|
||||||
// NewWakuEnrBitfield creates a WakuEnrBitField whose value will depend on which protocols are enabled in the node
|
// NewWakuEnrBitfield creates a WakuEnrBitField whose value will depend on which protocols are enabled in the node
|
||||||
func NewWakuEnrBitfield(lightpush, filter, store, relay bool) WakuEnrBitfield {
|
func NewWakuEnrBitfield(lightpush, filter, store, relay bool) WakuEnrBitfield {
|
||||||
var v uint8 = 0
|
var v uint8
|
||||||
|
|
||||||
if lightpush {
|
if lightpush {
|
||||||
v |= (1 << 3)
|
v |= (1 << 3)
|
||||||
|
@ -91,10 +91,9 @@ func Multiaddress(node *enode.Node) (peer.ID, []multiaddr.Multiaddr, error) {
|
||||||
if err := node.Record().Load(enr.WithEntry(MultiaddrENRField, &multiaddrRaw)); err != nil {
|
if err := node.Record().Load(enr.WithEntry(MultiaddrENRField, &multiaddrRaw)); err != nil {
|
||||||
if !enr.IsNotFound(err) {
|
if !enr.IsNotFound(err) {
|
||||||
return "", nil, err
|
return "", nil, err
|
||||||
} else {
|
|
||||||
// No multiaddr entry on enr
|
|
||||||
return peerID, result, nil
|
|
||||||
}
|
}
|
||||||
|
// No multiaddr entry on enr
|
||||||
|
return peerID, result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(multiaddrRaw) < 2 {
|
if len(multiaddrRaw) < 2 {
|
||||||
|
|
|
@ -9,8 +9,6 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||||
ma "github.com/multiformats/go-multiaddr"
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestEnodeToMultiAddr(t *testing.T) {
|
func TestEnodeToMultiAddr(t *testing.T) {
|
||||||
|
@ -25,7 +23,7 @@ func TestEnodeToMultiAddr(t *testing.T) {
|
||||||
|
|
||||||
// TODO: this function is duplicated in localnode.go. Remove duplication
|
// TODO: this function is duplicated in localnode.go. Remove duplication
|
||||||
|
|
||||||
func updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags WakuEnrBitfield, advertiseAddr *net.IP, shouldAutoUpdate bool, log *zap.Logger) error {
|
func updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags WakuEnrBitfield, advertiseAddr *net.IP, shouldAutoUpdate bool) error {
|
||||||
var options []ENROption
|
var options []ENROption
|
||||||
options = append(options, WithUDPPort(udpPort))
|
options = append(options, WithUDPPort(udpPort))
|
||||||
options = append(options, WithWakuBitfield(wakuFlags))
|
options = append(options, WithWakuBitfield(wakuFlags))
|
||||||
|
@ -87,7 +85,7 @@ func TestMultiaddr(t *testing.T) {
|
||||||
|
|
||||||
db, _ := enode.OpenDB("")
|
db, _ := enode.OpenDB("")
|
||||||
localNode := enode.NewLocalNode(db, key)
|
localNode := enode.NewLocalNode(db, key)
|
||||||
err := updateLocalNode(localNode, multiaddrValues, &net.TCPAddr{IP: net.IPv4(192, 168, 1, 241), Port: 60000}, 50000, wakuFlag, nil, false, utils.Logger())
|
err := updateLocalNode(localNode, multiaddrValues, &net.TCPAddr{IP: net.IPv4(192, 168, 1, 241), Port: 60000}, 50000, wakuFlag, nil, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_ = localNode.Node() // Should not panic
|
_ = localNode.Node() // Should not panic
|
||||||
|
|
|
@ -41,9 +41,8 @@ func WithMultiaddress(multiaddrs ...multiaddr.Multiaddr) ENROption {
|
||||||
couldWriteENRatLeastOnce = true
|
couldWriteENRatLeastOnce = true
|
||||||
successIdx = i
|
successIdx = i
|
||||||
break
|
break
|
||||||
} else {
|
|
||||||
failedOnceWritingENR = true
|
|
||||||
}
|
}
|
||||||
|
failedOnceWritingENR = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if failedOnceWritingENR && couldWriteENRatLeastOnce {
|
if failedOnceWritingENR && couldWriteENRatLeastOnce {
|
||||||
|
|
|
@ -37,9 +37,9 @@ func WithWakuRelaySharding(rs protocol.RelayShards) ENROption {
|
||||||
return func(localnode *enode.LocalNode) error {
|
return func(localnode *enode.LocalNode) error {
|
||||||
if len(rs.Indices) >= 64 {
|
if len(rs.Indices) >= 64 {
|
||||||
return WithWakuRelayShardingBitVector(rs)(localnode)
|
return WithWakuRelayShardingBitVector(rs)(localnode)
|
||||||
} else {
|
|
||||||
return WithWakuRelayShardingIndicesList(rs)(localnode)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return WithWakuRelayShardingIndicesList(rs)(localnode)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,6 +48,6 @@ func (e *Envelope) Hash() []byte {
|
||||||
return e.hash
|
return e.hash
|
||||||
}
|
}
|
||||||
|
|
||||||
func (env *Envelope) Index() *pb.Index {
|
func (e *Envelope) Index() *pb.Index {
|
||||||
return env.index
|
return e.index
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption
|
||||||
if params.pm == nil {
|
if params.pm == nil {
|
||||||
p, err = utils.SelectPeer(params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log)
|
p, err = utils.SelectPeer(params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log)
|
||||||
} else {
|
} else {
|
||||||
p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, fromThesePeers, params.log)
|
p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, fromThesePeers)
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
params.selectedPeer = p
|
params.selectedPeer = p
|
||||||
|
@ -100,7 +100,7 @@ func WithRequestID(requestID []byte) FilterSubscribeOption {
|
||||||
// when creating a filter subscription
|
// when creating a filter subscription
|
||||||
func WithAutomaticRequestID() FilterSubscribeOption {
|
func WithAutomaticRequestID() FilterSubscribeOption {
|
||||||
return func(params *FilterSubscribeParameters) {
|
return func(params *FilterSubscribeParameters) {
|
||||||
params.requestID = protocol.GenerateRequestId()
|
params.requestID = protocol.GenerateRequestID()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,9 +131,9 @@ func RequestID(requestID []byte) FilterUnsubscribeOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func AutomaticRequestId() FilterUnsubscribeOption {
|
func AutomaticRequestID() FilterUnsubscribeOption {
|
||||||
return func(params *FilterUnsubscribeParameters) {
|
return func(params *FilterUnsubscribeParameters) {
|
||||||
params.requestID = protocol.GenerateRequestId()
|
params.requestID = protocol.GenerateRequestID()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,7 +155,7 @@ func DontWait() FilterUnsubscribeOption {
|
||||||
|
|
||||||
func DefaultUnsubscribeOptions() []FilterUnsubscribeOption {
|
func DefaultUnsubscribeOptions() []FilterUnsubscribeOption {
|
||||||
return []FilterUnsubscribeOption{
|
return []FilterUnsubscribeOption{
|
||||||
AutomaticRequestId(),
|
AutomaticRequestID(),
|
||||||
WithWaitGroup(&sync.WaitGroup{}),
|
WithWaitGroup(&sync.WaitGroup{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,7 @@ func TestFilterOption(t *testing.T) {
|
||||||
require.NotNil(t, params.selectedPeer)
|
require.NotNil(t, params.selectedPeer)
|
||||||
|
|
||||||
options2 := []FilterUnsubscribeOption{
|
options2 := []FilterUnsubscribeOption{
|
||||||
AutomaticRequestId(),
|
AutomaticRequestID(),
|
||||||
UnsubscribeAll(),
|
UnsubscribeAll(),
|
||||||
Peer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
|
Peer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,13 +107,13 @@ func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(s network.Stre
|
||||||
|
|
||||||
switch subscribeRequest.FilterSubscribeType {
|
switch subscribeRequest.FilterSubscribeType {
|
||||||
case pb.FilterSubscribeRequest_SUBSCRIBE:
|
case pb.FilterSubscribeRequest_SUBSCRIBE:
|
||||||
wf.subscribe(ctx, s, logger, subscribeRequest)
|
wf.subscribe(ctx, s, subscribeRequest)
|
||||||
case pb.FilterSubscribeRequest_SUBSCRIBER_PING:
|
case pb.FilterSubscribeRequest_SUBSCRIBER_PING:
|
||||||
wf.ping(ctx, s, logger, subscribeRequest)
|
wf.ping(ctx, s, subscribeRequest)
|
||||||
case pb.FilterSubscribeRequest_UNSUBSCRIBE:
|
case pb.FilterSubscribeRequest_UNSUBSCRIBE:
|
||||||
wf.unsubscribe(ctx, s, logger, subscribeRequest)
|
wf.unsubscribe(ctx, s, subscribeRequest)
|
||||||
case pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL:
|
case pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL:
|
||||||
wf.unsubscribeAll(ctx, s, logger, subscribeRequest)
|
wf.unsubscribeAll(ctx, s, subscribeRequest)
|
||||||
}
|
}
|
||||||
|
|
||||||
wf.metrics.RecordRequest(subscribeRequest.FilterSubscribeType.String(), time.Since(start))
|
wf.metrics.RecordRequest(subscribeRequest.FilterSubscribeType.String(), time.Since(start))
|
||||||
|
@ -142,7 +142,7 @@ func (wf *WakuFilterFullNode) reply(ctx context.Context, s network.Stream, reque
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterFullNode) ping(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
|
func (wf *WakuFilterFullNode) ping(ctx context.Context, s network.Stream, request *pb.FilterSubscribeRequest) {
|
||||||
exists := wf.subscriptions.Has(s.Conn().RemotePeer())
|
exists := wf.subscriptions.Has(s.Conn().RemotePeer())
|
||||||
|
|
||||||
if exists {
|
if exists {
|
||||||
|
@ -152,7 +152,7 @@ func (wf *WakuFilterFullNode) ping(ctx context.Context, s network.Stream, logger
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterFullNode) subscribe(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
|
func (wf *WakuFilterFullNode) subscribe(ctx context.Context, s network.Stream, request *pb.FilterSubscribeRequest) {
|
||||||
if request.PubsubTopic == nil {
|
if request.PubsubTopic == nil {
|
||||||
wf.reply(ctx, s, request, http.StatusBadRequest, "pubsubtopic can't be empty")
|
wf.reply(ctx, s, request, http.StatusBadRequest, "pubsubtopic can't be empty")
|
||||||
return
|
return
|
||||||
|
@ -192,7 +192,7 @@ func (wf *WakuFilterFullNode) subscribe(ctx context.Context, s network.Stream, l
|
||||||
wf.reply(ctx, s, request, http.StatusOK)
|
wf.reply(ctx, s, request, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterFullNode) unsubscribe(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
|
func (wf *WakuFilterFullNode) unsubscribe(ctx context.Context, s network.Stream, request *pb.FilterSubscribeRequest) {
|
||||||
if request.PubsubTopic == nil {
|
if request.PubsubTopic == nil {
|
||||||
wf.reply(ctx, s, request, http.StatusBadRequest, "pubsubtopic can't be empty")
|
wf.reply(ctx, s, request, http.StatusBadRequest, "pubsubtopic can't be empty")
|
||||||
return
|
return
|
||||||
|
@ -216,7 +216,7 @@ func (wf *WakuFilterFullNode) unsubscribe(ctx context.Context, s network.Stream,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterFullNode) unsubscribeAll(ctx context.Context, s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
|
func (wf *WakuFilterFullNode) unsubscribeAll(ctx context.Context, s network.Stream, request *pb.FilterSubscribeRequest) {
|
||||||
err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer())
|
err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wf.reply(ctx, s, request, http.StatusNotFound, peerHasNoSubscription)
|
wf.reply(ctx, s, request, http.StatusNotFound, peerHasNoSubscription)
|
||||||
|
|
|
@ -12,7 +12,7 @@ import (
|
||||||
|
|
||||||
const TOPIC = "/test/topic"
|
const TOPIC = "/test/topic"
|
||||||
|
|
||||||
func createPeerId(t *testing.T) peer.ID {
|
func createPeerID(t *testing.T) peer.ID {
|
||||||
peerId, err := test.RandPeerID()
|
peerId, err := test.RandPeerID()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
return peerId
|
return peerId
|
||||||
|
@ -27,7 +27,7 @@ func firstSubscriber(subs *SubscribersMap, pubsubTopic string, contentTopic stri
|
||||||
|
|
||||||
func TestAppend(t *testing.T) {
|
func TestAppend(t *testing.T) {
|
||||||
subs := NewSubscribersMap(5 * time.Second)
|
subs := NewSubscribersMap(5 * time.Second)
|
||||||
peerId := createPeerId(t)
|
peerId := createPeerID(t)
|
||||||
|
|
||||||
subs.Set(peerId, TOPIC, []string{"topic1"})
|
subs.Set(peerId, TOPIC, []string{"topic1"})
|
||||||
|
|
||||||
|
@ -51,7 +51,7 @@ func TestAppend(t *testing.T) {
|
||||||
|
|
||||||
func TestRemove(t *testing.T) {
|
func TestRemove(t *testing.T) {
|
||||||
subs := NewSubscribersMap(5 * time.Second)
|
subs := NewSubscribersMap(5 * time.Second)
|
||||||
peerId := createPeerId(t)
|
peerId := createPeerID(t)
|
||||||
|
|
||||||
subs.Set(peerId, TOPIC+"1", []string{"topic1", "topic2"})
|
subs.Set(peerId, TOPIC+"1", []string{"topic1", "topic2"})
|
||||||
subs.Set(peerId, TOPIC+"2", []string{"topic1"})
|
subs.Set(peerId, TOPIC+"2", []string{"topic1"})
|
||||||
|
@ -79,7 +79,7 @@ func TestRemove(t *testing.T) {
|
||||||
|
|
||||||
func TestRemovePartial(t *testing.T) {
|
func TestRemovePartial(t *testing.T) {
|
||||||
subs := NewSubscribersMap(5 * time.Second)
|
subs := NewSubscribersMap(5 * time.Second)
|
||||||
peerId := createPeerId(t)
|
peerId := createPeerID(t)
|
||||||
|
|
||||||
subs.Set(peerId, TOPIC, []string{"topic1", "topic2"})
|
subs.Set(peerId, TOPIC, []string{"topic1", "topic2"})
|
||||||
err := subs.Delete(peerId, TOPIC, []string{"topic1"})
|
err := subs.Delete(peerId, TOPIC, []string{"topic1"})
|
||||||
|
@ -91,7 +91,7 @@ func TestRemovePartial(t *testing.T) {
|
||||||
|
|
||||||
func TestRemoveBogus(t *testing.T) {
|
func TestRemoveBogus(t *testing.T) {
|
||||||
subs := NewSubscribersMap(5 * time.Second)
|
subs := NewSubscribersMap(5 * time.Second)
|
||||||
peerId := createPeerId(t)
|
peerId := createPeerID(t)
|
||||||
|
|
||||||
subs.Set(peerId, TOPIC, []string{"topic1", "topic2"})
|
subs.Set(peerId, TOPIC, []string{"topic1", "topic2"})
|
||||||
err := subs.Delete(peerId, TOPIC, []string{"does not exist", "topic1"})
|
err := subs.Delete(peerId, TOPIC, []string{"does not exist", "topic1"})
|
||||||
|
@ -108,7 +108,7 @@ func TestRemoveBogus(t *testing.T) {
|
||||||
|
|
||||||
func TestSuccessFailure(t *testing.T) {
|
func TestSuccessFailure(t *testing.T) {
|
||||||
subs := NewSubscribersMap(5 * time.Second)
|
subs := NewSubscribersMap(5 * time.Second)
|
||||||
peerId := createPeerId(t)
|
peerId := createPeerID(t)
|
||||||
|
|
||||||
subs.Set(peerId, TOPIC, []string{"topic1", "topic2"})
|
subs.Set(peerId, TOPIC, []string{"topic1", "topic2"})
|
||||||
|
|
||||||
|
|
|
@ -14,16 +14,16 @@ import (
|
||||||
|
|
||||||
func TestSubscriptionMapAppend(t *testing.T) {
|
func TestSubscriptionMapAppend(t *testing.T) {
|
||||||
fmap := NewSubscriptionMap(utils.Logger())
|
fmap := NewSubscriptionMap(utils.Logger())
|
||||||
peerId := createPeerId(t)
|
peerID := createPeerID(t)
|
||||||
contentTopics := []string{"ct1", "ct2"}
|
contentTopics := []string{"ct1", "ct2"}
|
||||||
|
|
||||||
sub := fmap.NewSubscription(peerId, TOPIC, contentTopics)
|
sub := fmap.NewSubscription(peerID, TOPIC, contentTopics)
|
||||||
_, found := sub.ContentTopics["ct1"]
|
_, found := sub.ContentTopics["ct1"]
|
||||||
require.True(t, found)
|
require.True(t, found)
|
||||||
_, found = sub.ContentTopics["ct2"]
|
_, found = sub.ContentTopics["ct2"]
|
||||||
require.True(t, found)
|
require.True(t, found)
|
||||||
require.False(t, sub.Closed)
|
require.False(t, sub.Closed)
|
||||||
require.Equal(t, sub.PeerID, peerId)
|
require.Equal(t, sub.PeerID, peerID)
|
||||||
require.Equal(t, sub.PubsubTopic, TOPIC)
|
require.Equal(t, sub.PubsubTopic, TOPIC)
|
||||||
|
|
||||||
sub.Add("ct3")
|
sub.Add("ct3")
|
||||||
|
@ -47,9 +47,9 @@ func TestSubscriptionClear(t *testing.T) {
|
||||||
contentTopics := []string{"ct1", "ct2"}
|
contentTopics := []string{"ct1", "ct2"}
|
||||||
|
|
||||||
var subscriptions = []*SubscriptionDetails{
|
var subscriptions = []*SubscriptionDetails{
|
||||||
fmap.NewSubscription(createPeerId(t), TOPIC+"1", contentTopics),
|
fmap.NewSubscription(createPeerID(t), TOPIC+"1", contentTopics),
|
||||||
fmap.NewSubscription(createPeerId(t), TOPIC+"2", contentTopics),
|
fmap.NewSubscription(createPeerID(t), TOPIC+"2", contentTopics),
|
||||||
fmap.NewSubscription(createPeerId(t), TOPIC+"3", contentTopics),
|
fmap.NewSubscription(createPeerID(t), TOPIC+"3", contentTopics),
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||||
|
@ -81,8 +81,8 @@ func TestSubscriptionClear(t *testing.T) {
|
||||||
|
|
||||||
func TestSubscriptionsNotify(t *testing.T) {
|
func TestSubscriptionsNotify(t *testing.T) {
|
||||||
fmap := NewSubscriptionMap(utils.Logger())
|
fmap := NewSubscriptionMap(utils.Logger())
|
||||||
p1 := createPeerId(t)
|
p1 := createPeerID(t)
|
||||||
p2 := createPeerId(t)
|
p2 := createPeerID(t)
|
||||||
var subscriptions = []*SubscriptionDetails{
|
var subscriptions = []*SubscriptionDetails{
|
||||||
fmap.NewSubscription(p1, TOPIC+"1", []string{"ct1", "ct2"}),
|
fmap.NewSubscription(p1, TOPIC+"1", []string{"ct1", "ct2"}),
|
||||||
fmap.NewSubscription(p2, TOPIC+"1", []string{"ct1"}),
|
fmap.NewSubscription(p2, TOPIC+"1", []string{"ct1"}),
|
||||||
|
|
|
@ -155,9 +155,9 @@ func (sub *Subscribers) RemoveContentFilters(peerID peer.ID, requestID string, c
|
||||||
|
|
||||||
// make sure we delete the subscriber
|
// make sure we delete the subscriber
|
||||||
// if no more content filters left
|
// if no more content filters left
|
||||||
for _, peerId := range peerIdsToRemove {
|
for _, peerID := range peerIdsToRemove {
|
||||||
for i, s := range sub.subscribers {
|
for i, s := range sub.subscribers {
|
||||||
if s.peer == peerId && s.requestID == requestID {
|
if s.peer == peerID && s.requestID == requestID {
|
||||||
l := len(sub.subscribers) - 1
|
l := len(sub.subscribers) - 1
|
||||||
sub.subscribers[i] = sub.subscribers[l]
|
sub.subscribers[i] = sub.subscribers[l]
|
||||||
sub.subscribers = sub.subscribers[:l]
|
sub.subscribers = sub.subscribers[:l]
|
||||||
|
|
|
@ -12,10 +12,10 @@ import (
|
||||||
|
|
||||||
const TOPIC = "/test/topic"
|
const TOPIC = "/test/topic"
|
||||||
|
|
||||||
func createPeerId(t *testing.T) peer.ID {
|
func createPeerID(t *testing.T) peer.ID {
|
||||||
peerId, err := test.RandPeerID()
|
peerID, err := test.RandPeerID()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
return peerId
|
return peerID
|
||||||
}
|
}
|
||||||
|
|
||||||
func firstSubscriber(subs *Subscribers, contentTopic string) *Subscriber {
|
func firstSubscriber(subs *Subscribers, contentTopic string) *Subscriber {
|
||||||
|
@ -27,7 +27,7 @@ func firstSubscriber(subs *Subscribers, contentTopic string) *Subscriber {
|
||||||
|
|
||||||
func TestAppend(t *testing.T) {
|
func TestAppend(t *testing.T) {
|
||||||
subs := NewSubscribers(10 * time.Second)
|
subs := NewSubscribers(10 * time.Second)
|
||||||
peerId := createPeerId(t)
|
peerID := createPeerID(t)
|
||||||
requestID := "request_1"
|
requestID := "request_1"
|
||||||
contentTopic := "topic1"
|
contentTopic := "topic1"
|
||||||
request := &pb.FilterRequest{
|
request := &pb.FilterRequest{
|
||||||
|
@ -35,7 +35,7 @@ func TestAppend(t *testing.T) {
|
||||||
Topic: TOPIC,
|
Topic: TOPIC,
|
||||||
ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}},
|
ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}},
|
||||||
}
|
}
|
||||||
subs.Append(Subscriber{peerId, requestID, request})
|
subs.Append(Subscriber{peerID, requestID, request})
|
||||||
|
|
||||||
sub := firstSubscriber(subs, contentTopic)
|
sub := firstSubscriber(subs, contentTopic)
|
||||||
assert.NotNil(t, sub)
|
assert.NotNil(t, sub)
|
||||||
|
@ -43,7 +43,7 @@ func TestAppend(t *testing.T) {
|
||||||
|
|
||||||
func TestRemove(t *testing.T) {
|
func TestRemove(t *testing.T) {
|
||||||
subs := NewSubscribers(10 * time.Second)
|
subs := NewSubscribers(10 * time.Second)
|
||||||
peerId := createPeerId(t)
|
peerId := createPeerID(t)
|
||||||
requestID := "request_1"
|
requestID := "request_1"
|
||||||
contentTopic := "topic1"
|
contentTopic := "topic1"
|
||||||
request := &pb.FilterRequest{
|
request := &pb.FilterRequest{
|
||||||
|
@ -60,7 +60,7 @@ func TestRemove(t *testing.T) {
|
||||||
|
|
||||||
func TestRemovePartial(t *testing.T) {
|
func TestRemovePartial(t *testing.T) {
|
||||||
subs := NewSubscribers(10 * time.Second)
|
subs := NewSubscribers(10 * time.Second)
|
||||||
peerId := createPeerId(t)
|
peerId := createPeerID(t)
|
||||||
requestID := "request_1"
|
requestID := "request_1"
|
||||||
topic1 := "topic1"
|
topic1 := "topic1"
|
||||||
topic2 := "topic2"
|
topic2 := "topic2"
|
||||||
|
@ -79,7 +79,7 @@ func TestRemovePartial(t *testing.T) {
|
||||||
|
|
||||||
func TestRemoveDuplicateSubscriptions(t *testing.T) {
|
func TestRemoveDuplicateSubscriptions(t *testing.T) {
|
||||||
subs := NewSubscribers(10 * time.Second)
|
subs := NewSubscribers(10 * time.Second)
|
||||||
peerId := createPeerId(t)
|
peerId := createPeerID(t)
|
||||||
topic := "topic"
|
topic := "topic"
|
||||||
requestID1 := "request_1"
|
requestID1 := "request_1"
|
||||||
requestID2 := "request_2"
|
requestID2 := "request_2"
|
||||||
|
@ -104,7 +104,7 @@ func TestRemoveDuplicateSubscriptions(t *testing.T) {
|
||||||
|
|
||||||
func TestRemoveDuplicateSubscriptionsPartial(t *testing.T) {
|
func TestRemoveDuplicateSubscriptionsPartial(t *testing.T) {
|
||||||
subs := NewSubscribers(10 * time.Second)
|
subs := NewSubscribers(10 * time.Second)
|
||||||
peerId := createPeerId(t)
|
peerId := createPeerID(t)
|
||||||
topic := "topic"
|
topic := "topic"
|
||||||
requestID1 := "request_1"
|
requestID1 := "request_1"
|
||||||
requestID2 := "request_2"
|
requestID2 := "request_2"
|
||||||
|
@ -129,7 +129,7 @@ func TestRemoveDuplicateSubscriptionsPartial(t *testing.T) {
|
||||||
|
|
||||||
func TestRemoveBogus(t *testing.T) {
|
func TestRemoveBogus(t *testing.T) {
|
||||||
subs := NewSubscribers(10 * time.Second)
|
subs := NewSubscribers(10 * time.Second)
|
||||||
peerId := createPeerId(t)
|
peerId := createPeerID(t)
|
||||||
requestID := "request_1"
|
requestID := "request_1"
|
||||||
contentTopic := "topic1"
|
contentTopic := "topic1"
|
||||||
request := &pb.FilterRequest{
|
request := &pb.FilterRequest{
|
||||||
|
|
|
@ -143,13 +143,13 @@ func (wf *WakuFilter) onRequest(ctx context.Context) func(s network.Stream) {
|
||||||
subscriber.filter.Topic = relay.DefaultWakuTopic
|
subscriber.filter.Topic = relay.DefaultWakuTopic
|
||||||
}
|
}
|
||||||
|
|
||||||
len := wf.subscribers.Append(subscriber)
|
subscribersLen := wf.subscribers.Append(subscriber)
|
||||||
|
|
||||||
logger.Info("adding subscriber")
|
logger.Info("adding subscriber")
|
||||||
wf.metrics.RecordSubscribers(len)
|
wf.metrics.RecordSubscribers(subscribersLen)
|
||||||
} else {
|
} else {
|
||||||
peerId := s.Conn().RemotePeer()
|
peerID := s.Conn().RemotePeer()
|
||||||
wf.subscribers.RemoveContentFilters(peerId, filterRPCRequest.RequestId, filterRPCRequest.Request.ContentFilters)
|
wf.subscribers.RemoveContentFilters(peerID, filterRPCRequest.RequestId, filterRPCRequest.Request.ContentFilters)
|
||||||
|
|
||||||
logger.Info("removing subscriber")
|
logger.Info("removing subscriber")
|
||||||
wf.metrics.RecordSubscribers(wf.subscribers.Length())
|
wf.metrics.RecordSubscribers(wf.subscribers.Length())
|
||||||
|
@ -270,7 +270,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
// This is the only successful path to subscription
|
// This is the only successful path to subscription
|
||||||
requestID := hex.EncodeToString(protocol.GenerateRequestId())
|
requestID := hex.EncodeToString(protocol.GenerateRequestID())
|
||||||
|
|
||||||
writer := pbio.NewDelimitedWriter(conn)
|
writer := pbio.NewDelimitedWriter(conn)
|
||||||
filterRPC := &pb.FilterRPC{RequestId: requestID, Request: request}
|
filterRPC := &pb.FilterRPC{RequestId: requestID, Request: request}
|
||||||
|
@ -301,7 +301,7 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilt
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
// This is the only successful path to subscription
|
// This is the only successful path to subscription
|
||||||
id := protocol.GenerateRequestId()
|
id := protocol.GenerateRequestID()
|
||||||
|
|
||||||
var contentFilters []*pb.FilterRequest_ContentFilter
|
var contentFilters []*pb.FilterRequest_ContentFilter
|
||||||
for _, ct := range contentFilter.ContentTopics {
|
for _, ct := range contentFilter.ContentTopics {
|
||||||
|
@ -444,8 +444,8 @@ func (wf *WakuFilter) UnsubscribeFilter(ctx context.Context, cf ContentFilter) e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for rId := range idsToRemove {
|
for rID := range idsToRemove {
|
||||||
wf.filters.Delete(rId)
|
wf.filters.Delete(rID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -25,7 +25,7 @@ const LightPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/lightpush/2.0.0-beta1"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrNoPeersAvailable = errors.New("no suitable remote peers")
|
ErrNoPeersAvailable = errors.New("no suitable remote peers")
|
||||||
ErrInvalidId = errors.New("invalid request id")
|
ErrInvalidID = errors.New("invalid request id")
|
||||||
)
|
)
|
||||||
|
|
||||||
// WakuLightPush is the implementation of the Waku LightPush protocol
|
// WakuLightPush is the implementation of the Waku LightPush protocol
|
||||||
|
@ -72,8 +72,8 @@ func (wakuLP *WakuLightPush) Start(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// relayIsNotAvailable determines if this node supports relaying messages for other lightpush clients
|
// relayIsNotAvailable determines if this node supports relaying messages for other lightpush clients
|
||||||
func (wakuLp *WakuLightPush) relayIsNotAvailable() bool {
|
func (wakuLP *WakuLightPush) relayIsNotAvailable() bool {
|
||||||
return wakuLp.relay == nil
|
return wakuLP.relay == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Stream) {
|
func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Stream) {
|
||||||
|
@ -161,7 +161,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(params.requestID) == 0 {
|
if len(params.requestID) == 0 {
|
||||||
return nil, ErrInvalidId
|
return nil, ErrInvalidID
|
||||||
}
|
}
|
||||||
|
|
||||||
logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer))
|
logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer))
|
||||||
|
@ -234,9 +234,9 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.Wa
|
||||||
hash := message.Hash(topic)
|
hash := message.Hash(topic)
|
||||||
wakuLP.log.Info("waku.lightpush published", logging.HexString("hash", hash))
|
wakuLP.log.Info("waku.lightpush published", logging.HexString("hash", hash))
|
||||||
return hash, nil
|
return hash, nil
|
||||||
} else {
|
|
||||||
return nil, errors.New(response.Info)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil, errors.New(response.Info)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish is used to broadcast a WakuMessage to the default waku pubsub topic via lightpush protocol
|
// Publish is used to broadcast a WakuMessage to the default waku pubsub topic via lightpush protocol
|
||||||
|
|
|
@ -40,7 +40,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option {
|
||||||
if params.pm == nil {
|
if params.pm == nil {
|
||||||
p, err = utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log)
|
p, err = utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log)
|
||||||
} else {
|
} else {
|
||||||
p, err = params.pm.SelectPeer(LightPushID_v20beta1, fromThesePeers, params.log)
|
p, err = params.pm.SelectPeer(LightPushID_v20beta1, fromThesePeers)
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
params.selectedPeer = p
|
params.selectedPeer = p
|
||||||
|
@ -77,7 +77,7 @@ func WithRequestID(requestID []byte) Option {
|
||||||
// when publishing a message
|
// when publishing a message
|
||||||
func WithAutomaticRequestID() Option {
|
func WithAutomaticRequestID() Option {
|
||||||
return func(params *lightPushParameters) {
|
return func(params *lightPushParameters) {
|
||||||
params.requestID = protocol.GenerateRequestId()
|
params.requestID = protocol.GenerateRequestID()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,7 @@ func WithInitiatorParameters(qrString string, qrMessageNametag n.MessageNametag)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithResponderParameters(applicationName, applicationVersion, shardId string, qrMessageNameTag *n.MessageNametag) PairingParameterOption {
|
func WithResponderParameters(applicationName, applicationVersion, shardID string, qrMessageNameTag *n.MessageNametag) PairingParameterOption {
|
||||||
return func(params *PairingParameters) error {
|
return func(params *PairingParameters) error {
|
||||||
params.initiator = false
|
params.initiator = false
|
||||||
if qrMessageNameTag == nil {
|
if qrMessageNameTag == nil {
|
||||||
|
@ -72,17 +72,17 @@ func WithResponderParameters(applicationName, applicationVersion, shardId string
|
||||||
} else {
|
} else {
|
||||||
params.qrMessageNametag = *qrMessageNameTag
|
params.qrMessageNametag = *qrMessageNameTag
|
||||||
}
|
}
|
||||||
params.qr = NewQR(applicationName, applicationVersion, shardId, params.ephemeralPublicKey, params.myCommitedStaticKey)
|
params.qr = NewQR(applicationName, applicationVersion, shardID, params.ephemeralPublicKey, params.myCommitedStaticKey)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const DefaultApplicationName = "waku-noise-sessions"
|
const DefaultApplicationName = "waku-noise-sessions"
|
||||||
const DefaultApplicationVersion = "0.1"
|
const DefaultApplicationVersion = "0.1"
|
||||||
const DefaultShardId = "10"
|
const DefaultShardID = "10"
|
||||||
|
|
||||||
func WithDefaultResponderParameters() PairingParameterOption {
|
func WithDefaultResponderParameters() PairingParameterOption {
|
||||||
return WithResponderParameters(DefaultApplicationName, DefaultApplicationVersion, DefaultShardId, nil)
|
return WithResponderParameters(DefaultApplicationName, DefaultApplicationVersion, DefaultShardID, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
type PairingParameters struct {
|
type PairingParameters struct {
|
||||||
|
@ -115,7 +115,7 @@ func NewPairing(myStaticKey n.Keypair, myEphemeralKey n.Keypair, opts PairingPar
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
contentTopic := "/" + params.qr.applicationName + "/" + params.qr.applicationVersion + "/wakunoise/1/sessions_shard-" + params.qr.shardId + "/proto"
|
contentTopic := "/" + params.qr.applicationName + "/" + params.qr.applicationVersion + "/wakunoise/1/sessions_shard-" + params.qr.shardID + "/proto"
|
||||||
|
|
||||||
// TODO: check if subscription is removed on stop
|
// TODO: check if subscription is removed on stop
|
||||||
msgCh := messenger.Subscribe(context.Background(), contentTopic)
|
msgCh := messenger.Subscribe(context.Background(), contentTopic)
|
||||||
|
@ -197,9 +197,9 @@ func (p *Pairing) executeReadStepWithNextMessage(ctx context.Context, nextMsgCha
|
||||||
if errors.Is(err, n.ErrNametagNotExpected) || errors.Is(err, n.ErrUnexpectedMessageNametag) {
|
if errors.Is(err, n.ErrNametagNotExpected) || errors.Is(err, n.ErrUnexpectedMessageNametag) {
|
||||||
p.logger.Debug(err.Error())
|
p.logger.Debug(err.Error())
|
||||||
continue
|
continue
|
||||||
} else {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
return step, nil
|
return step, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Decodes a WakuMessage to a PayloadV2
|
// DecodePayloadV2 decodes a WakuMessage to a PayloadV2
|
||||||
// Currently, this is just a wrapper over deserializePayloadV2 and encryption/decryption is done on top (no KeyInfo)
|
// Currently, this is just a wrapper over deserializePayloadV2 and encryption/decryption is done on top (no KeyInfo)
|
||||||
func DecodePayloadV2(message *pb.WakuMessage) (*n.PayloadV2, error) {
|
func DecodePayloadV2(message *pb.WakuMessage) (*n.PayloadV2, error) {
|
||||||
if message.Version != 2 {
|
if message.Version != 2 {
|
||||||
|
@ -16,7 +16,7 @@ func DecodePayloadV2(message *pb.WakuMessage) (*n.PayloadV2, error) {
|
||||||
return n.DeserializePayloadV2(message.Payload)
|
return n.DeserializePayloadV2(message.Payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encodes a PayloadV2 to a WakuMessage
|
// EncodePayloadV2 encodes a PayloadV2 to a WakuMessage
|
||||||
// Currently, this is just a wrapper over serializePayloadV2 and encryption/decryption is done on top (no KeyInfo)
|
// Currently, this is just a wrapper over serializePayloadV2 and encryption/decryption is done on top (no KeyInfo)
|
||||||
func EncodePayloadV2(payload2 *n.PayloadV2) (*pb.WakuMessage, error) {
|
func EncodePayloadV2(payload2 *n.PayloadV2) (*pb.WakuMessage, error) {
|
||||||
serializedPayload2, err := payload2.Serialize()
|
serializedPayload2, err := payload2.Serialize()
|
||||||
|
|
|
@ -10,16 +10,16 @@ import (
|
||||||
type QR struct {
|
type QR struct {
|
||||||
applicationName string
|
applicationName string
|
||||||
applicationVersion string
|
applicationVersion string
|
||||||
shardId string
|
shardID string
|
||||||
ephemeralPublicKey ed25519.PublicKey
|
ephemeralPublicKey ed25519.PublicKey
|
||||||
committedStaticKey []byte
|
committedStaticKey []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewQR(applicationName, applicationVersion, shardId string, ephemeralKey ed25519.PublicKey, committedStaticKey []byte) QR {
|
func NewQR(applicationName, applicationVersion, shardID string, ephemeralKey ed25519.PublicKey, committedStaticKey []byte) QR {
|
||||||
return QR{
|
return QR{
|
||||||
applicationName: applicationName,
|
applicationName: applicationName,
|
||||||
applicationVersion: applicationVersion,
|
applicationVersion: applicationVersion,
|
||||||
shardId: shardId,
|
shardID: shardID,
|
||||||
ephemeralPublicKey: ephemeralKey,
|
ephemeralPublicKey: ephemeralKey,
|
||||||
committedStaticKey: committedStaticKey,
|
committedStaticKey: committedStaticKey,
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@ func NewQR(applicationName, applicationVersion, shardId string, ephemeralKey ed2
|
||||||
func (qr QR) String() string {
|
func (qr QR) String() string {
|
||||||
return base64.URLEncoding.EncodeToString([]byte(qr.applicationName)) + ":" +
|
return base64.URLEncoding.EncodeToString([]byte(qr.applicationName)) + ":" +
|
||||||
base64.URLEncoding.EncodeToString([]byte(qr.applicationVersion)) + ":" +
|
base64.URLEncoding.EncodeToString([]byte(qr.applicationVersion)) + ":" +
|
||||||
base64.URLEncoding.EncodeToString([]byte(qr.shardId)) + ":" +
|
base64.URLEncoding.EncodeToString([]byte(qr.shardID)) + ":" +
|
||||||
base64.URLEncoding.EncodeToString(qr.ephemeralPublicKey) + ":" +
|
base64.URLEncoding.EncodeToString(qr.ephemeralPublicKey) + ":" +
|
||||||
base64.URLEncoding.EncodeToString(qr.committedStaticKey[:])
|
base64.URLEncoding.EncodeToString(qr.committedStaticKey[:])
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,7 @@ func decodeBase64String(inputValue string) ([]byte, error) {
|
||||||
return enc.DecodeString(inputValue)
|
return enc.DecodeString(inputValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deserializes input string in base64 to the corresponding (applicationName, applicationVersion, shardId, ephemeralKey, committedStaticKey)
|
// StringToQR deserializes input string in base64 to the corresponding (applicationName, applicationVersion, shardId, ephemeralKey, committedStaticKey)
|
||||||
func StringToQR(qrString string) (QR, error) {
|
func StringToQR(qrString string) (QR, error) {
|
||||||
values := strings.Split(qrString, ":")
|
values := strings.Split(qrString, ":")
|
||||||
if len(values) != 5 {
|
if len(values) != 5 {
|
||||||
|
@ -67,7 +67,7 @@ func StringToQR(qrString string) (QR, error) {
|
||||||
return QR{}, err
|
return QR{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
shardId, err := decodeBase64String(values[2])
|
shardID, err := decodeBase64String(values[2])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return QR{}, err
|
return QR{}, err
|
||||||
}
|
}
|
||||||
|
@ -85,7 +85,7 @@ func StringToQR(qrString string) (QR, error) {
|
||||||
return QR{
|
return QR{
|
||||||
applicationName: string(applicationName),
|
applicationName: string(applicationName),
|
||||||
applicationVersion: string(applicationVersion),
|
applicationVersion: string(applicationVersion),
|
||||||
shardId: string(shardId),
|
shardID: string(shardID),
|
||||||
ephemeralPublicKey: ephemeralKey,
|
ephemeralPublicKey: ephemeralKey,
|
||||||
committedStaticKey: committedStaticKey,
|
committedStaticKey: committedStaticKey,
|
||||||
}, nil
|
}, nil
|
||||||
|
|
|
@ -26,16 +26,16 @@ func TestQR(t *testing.T) {
|
||||||
// Content topic information
|
// Content topic information
|
||||||
applicationName := "waku-noise-sessions"
|
applicationName := "waku-noise-sessions"
|
||||||
applicationVersion := "0.1"
|
applicationVersion := "0.1"
|
||||||
shardId := "10"
|
shardID := "10"
|
||||||
|
|
||||||
qr := NewQR(applicationName, applicationVersion, shardId, ephemeralKey.Public, committedStaticKey)
|
qr := NewQR(applicationName, applicationVersion, shardID, ephemeralKey.Public, committedStaticKey)
|
||||||
readQR, err := StringToQR(qr.String())
|
readQR, err := StringToQR(qr.String())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// We check if QR serialization/deserialization works
|
// We check if QR serialization/deserialization works
|
||||||
require.Equal(t, applicationName, readQR.applicationName)
|
require.Equal(t, applicationName, readQR.applicationName)
|
||||||
require.Equal(t, applicationVersion, readQR.applicationVersion)
|
require.Equal(t, applicationVersion, readQR.applicationVersion)
|
||||||
require.Equal(t, shardId, readQR.shardId)
|
require.Equal(t, shardID, readQR.shardID)
|
||||||
require.True(t, bytes.Equal(ephemeralKey.Public, readQR.ephemeralPublicKey))
|
require.True(t, bytes.Equal(ephemeralKey.Public, readQR.ephemeralPublicKey))
|
||||||
require.True(t, bytes.Equal(committedStaticKey[:], readQR.committedStaticKey[:]))
|
require.True(t, bytes.Equal(committedStaticKey[:], readQR.committedStaticKey[:]))
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ const MaxCacheSize = 1000
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrNoPeersAvailable = errors.New("no suitable remote peers")
|
ErrNoPeersAvailable = errors.New("no suitable remote peers")
|
||||||
ErrInvalidId = errors.New("invalid request id")
|
ErrInvalidID = errors.New("invalid request id")
|
||||||
)
|
)
|
||||||
|
|
||||||
// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
|
// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
|
||||||
|
@ -69,7 +69,7 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector,
|
||||||
return wakuPX, nil
|
return wakuPX, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sets the host to be able to mount or consume a protocol
|
// SetHost sets the host to be able to mount or consume a protocol
|
||||||
func (wakuPX *WakuPeerExchange) SetHost(h host.Host) {
|
func (wakuPX *WakuPeerExchange) SetHost(h host.Host) {
|
||||||
wakuPX.h = h
|
wakuPX.h = h
|
||||||
}
|
}
|
||||||
|
@ -85,7 +85,7 @@ func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
wakuPX.cancel = cancel
|
wakuPX.cancel = cancel
|
||||||
|
|
||||||
wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest(ctx))
|
wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest())
|
||||||
wakuPX.log.Info("Peer exchange protocol started")
|
wakuPX.log.Info("Peer exchange protocol started")
|
||||||
|
|
||||||
wakuPX.wg.Add(1)
|
wakuPX.wg.Add(1)
|
||||||
|
@ -93,7 +93,7 @@ func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wakuPX *WakuPeerExchange) onRequest(ctx context.Context) func(s network.Stream) {
|
func (wakuPX *WakuPeerExchange) onRequest() func(s network.Stream) {
|
||||||
return func(s network.Stream) {
|
return func(s network.Stream) {
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
logger := wakuPX.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
|
logger := wakuPX.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
|
||||||
|
|
|
@ -37,7 +37,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
|
||||||
if params.pm == nil {
|
if params.pm == nil {
|
||||||
p, err = utils.SelectPeer(params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log)
|
p, err = utils.SelectPeer(params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log)
|
||||||
} else {
|
} else {
|
||||||
p, err = params.pm.SelectPeer(PeerExchangeID_v20alpha1, fromThesePeers, params.log)
|
p, err = params.pm.SelectPeer(PeerExchangeID_v20alpha1, fromThesePeers)
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
params.selectedPeer = p
|
params.selectedPeer = p
|
||||||
|
|
|
@ -7,15 +7,23 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Waku2PubsubTopicPrefix is the expected prefix to be used for pubsub topics
|
||||||
const Waku2PubsubTopicPrefix = "/waku/2"
|
const Waku2PubsubTopicPrefix = "/waku/2"
|
||||||
|
|
||||||
|
// StaticShardingPubsubTopicPrefix is the expected prefix to be used for static sharding pubsub topics
|
||||||
const StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix + "/rs"
|
const StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix + "/rs"
|
||||||
|
|
||||||
|
// ErrInvalidStructure indicates that the pubsub topic is malformed
|
||||||
var ErrInvalidStructure = errors.New("invalid topic structure")
|
var ErrInvalidStructure = errors.New("invalid topic structure")
|
||||||
|
|
||||||
|
// ErrInvalidTopicPrefix indicates that the pubsub topic is missing the prefix /waku/2
|
||||||
var ErrInvalidTopicPrefix = errors.New("must start with " + Waku2PubsubTopicPrefix)
|
var ErrInvalidTopicPrefix = errors.New("must start with " + Waku2PubsubTopicPrefix)
|
||||||
var ErrMissingTopicName = errors.New("missing topic-name")
|
var ErrMissingTopicName = errors.New("missing topic-name")
|
||||||
var ErrInvalidShardedTopicPrefix = errors.New("must start with " + StaticShardingPubsubTopicPrefix)
|
var ErrInvalidShardedTopicPrefix = errors.New("must start with " + StaticShardingPubsubTopicPrefix)
|
||||||
var ErrMissingClusterIndex = errors.New("missing shard_cluster_index")
|
var ErrMissingClusterIndex = errors.New("missing shard_cluster_index")
|
||||||
var ErrMissingShardNumber = errors.New("missing shard_number")
|
var ErrMissingShardNumber = errors.New("missing shard_number")
|
||||||
|
|
||||||
|
// ErrInvalidNumberFormat indicates that a number exceeds the allowed range
|
||||||
var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed")
|
var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed")
|
||||||
|
|
||||||
// NamespacedPubsubTopicKind used to represent kind of NamespacedPubsubTopicKind
|
// NamespacedPubsubTopicKind used to represent kind of NamespacedPubsubTopicKind
|
||||||
|
@ -107,7 +115,7 @@ func (s StaticShardingPubsubTopic) Cluster() uint16 {
|
||||||
return s.cluster
|
return s.cluster
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cluster returns the shard number
|
// Shard returns the shard number
|
||||||
func (s StaticShardingPubsubTopic) Shard() uint16 {
|
func (s StaticShardingPubsubTopic) Shard() uint16 {
|
||||||
return s.shard
|
return s.shard
|
||||||
}
|
}
|
||||||
|
@ -174,14 +182,14 @@ func ToShardedPubsubTopic(topic string) (NamespacedPubsubTopic, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
} else {
|
|
||||||
s := NamedShardingPubsubTopic{}
|
|
||||||
err := s.Parse(topic)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return s, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s := NamedShardingPubsubTopic{}
|
||||||
|
err := s.Parse(topic)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultPubsubTopic is the default pubSub topic used in waku
|
// DefaultPubsubTopic is the default pubSub topic used in waku
|
||||||
|
|
|
@ -75,15 +75,15 @@ func (s *chStore) broadcast(ctx context.Context, m *protocol.Envelope) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *chStore) close() {
|
func (s *chStore) close() {
|
||||||
b.mu.Lock()
|
s.mu.Lock()
|
||||||
defer b.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
for _, chans := range b.topicToChans {
|
for _, chans := range s.topicToChans {
|
||||||
for _, ch := range chans {
|
for _, ch := range chans {
|
||||||
close(ch)
|
close(ch)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
b.topicToChans = nil
|
s.topicToChans = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Broadcaster is used to create a fanout for an envelope that will be received by any subscriber interested in the topic of the message
|
// Broadcaster is used to create a fanout for an envelope that will be received by any subscriber interested in the topic of the message
|
||||||
|
|
|
@ -75,7 +75,7 @@ type EvtRelayUnsubscribed struct {
|
||||||
Topic string
|
Topic string
|
||||||
}
|
}
|
||||||
|
|
||||||
func msgIdFn(pmsg *pubsub_pb.Message) string {
|
func msgIDFn(pmsg *pubsub_pb.Message) string {
|
||||||
return string(hash.SHA256(pmsg.Data))
|
return string(hash.SHA256(pmsg.Data))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +160,7 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
|
||||||
w.opts = append([]pubsub.Option{
|
w.opts = append([]pubsub.Option{
|
||||||
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
|
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
|
||||||
pubsub.WithNoAuthor(),
|
pubsub.WithNoAuthor(),
|
||||||
pubsub.WithMessageIdFn(msgIdFn),
|
pubsub.WithMessageIdFn(msgIDFn),
|
||||||
pubsub.WithGossipSubProtocols(
|
pubsub.WithGossipSubProtocols(
|
||||||
[]protocol.ID{WakuRelayID_v200, pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID},
|
[]protocol.ID{WakuRelayID_v200, pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID},
|
||||||
func(feat pubsub.GossipSubFeature, proto protocol.ID) bool {
|
func(feat pubsub.GossipSubFeature, proto protocol.ID) bool {
|
||||||
|
@ -404,7 +404,7 @@ func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscr
|
||||||
return &subscription, nil
|
return &subscription, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeToTopic returns a Subscription to receive messages from the default waku pubsub topic
|
// Subscribe returns a Subscription to receive messages from the default waku pubsub topic
|
||||||
func (w *WakuRelay) Subscribe(ctx context.Context) (*Subscription, error) {
|
func (w *WakuRelay) Subscribe(ctx context.Context) (*Subscription, error) {
|
||||||
return w.SubscribeToTopic(ctx, DefaultWakuTopic)
|
return w.SubscribeToTopic(ctx, DefaultWakuTopic)
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,7 +139,7 @@ func TestGossipsubScore(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMsgID(t *testing.T) {
|
func TestMsgID(t *testing.T) {
|
||||||
expectedMsgIdBytes := []byte{208, 214, 63, 55, 144, 6, 206, 39, 40, 251, 138, 74, 66, 168, 43, 32, 91, 94, 149, 122, 237, 198, 149, 87, 232, 156, 197, 34, 53, 131, 78, 112}
|
expectedMsgIDBytes := []byte{208, 214, 63, 55, 144, 6, 206, 39, 40, 251, 138, 74, 66, 168, 43, 32, 91, 94, 149, 122, 237, 198, 149, 87, 232, 156, 197, 34, 53, 131, 78, 112}
|
||||||
|
|
||||||
topic := "abcde"
|
topic := "abcde"
|
||||||
msg := &pubsub_pb.Message{
|
msg := &pubsub_pb.Message{
|
||||||
|
@ -147,7 +147,7 @@ func TestMsgID(t *testing.T) {
|
||||||
Topic: &topic,
|
Topic: &topic,
|
||||||
}
|
}
|
||||||
|
|
||||||
msgId := msgIdFn(msg)
|
msgID := msgIDFn(msg)
|
||||||
|
|
||||||
require.Equal(t, expectedMsgIdBytes, []byte(msgId))
|
require.Equal(t, expectedMsgIDBytes, []byte(msgID))
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,9 +18,9 @@ var brHmacDrbgPool = sync.Pool{New: func() interface{} {
|
||||||
return hmacdrbg.NewHmacDrbg(256, seed, nil)
|
return hmacdrbg.NewHmacDrbg(256, seed, nil)
|
||||||
}}
|
}}
|
||||||
|
|
||||||
// GenerateRequestId generates a random 32 byte slice that can be used for
|
// GenerateRequestID generates a random 32 byte slice that can be used for
|
||||||
// creating requests inf the filter, store and lightpush protocols
|
// creating requests inf the filter, store and lightpush protocols
|
||||||
func GenerateRequestId() []byte {
|
func GenerateRequestID() []byte {
|
||||||
rng := brHmacDrbgPool.Get().(*hmacdrbg.HmacDrbg)
|
rng := brHmacDrbgPool.Get().(*hmacdrbg.HmacDrbg)
|
||||||
defer brHmacDrbgPool.Put(rng)
|
defer brHmacDrbgPool.Put(rng)
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
func TestGenerateRequestId(t *testing.T) {
|
func TestGenerateRequestId(t *testing.T) {
|
||||||
// Force 2 reseed to ensure this is working as expected
|
// Force 2 reseed to ensure this is working as expected
|
||||||
for i := 1; i < 20001; i++ {
|
for i := 1; i < 20001; i++ {
|
||||||
bytes := GenerateRequestId()
|
bytes := GenerateRequestID()
|
||||||
require.Equal(t, 32, len(bytes))
|
require.Equal(t, 32, len(bytes))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,17 +50,17 @@ func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered)
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
if event.Raw.Removed {
|
if event.Raw.Removed {
|
||||||
var indexes []uint
|
var indexes []uint
|
||||||
i_idx, ok := toRemoveTable.Get(event.Raw.BlockNumber)
|
iIdx, ok := toRemoveTable.Get(event.Raw.BlockNumber)
|
||||||
if ok {
|
if ok {
|
||||||
indexes = i_idx.([]uint)
|
indexes = iIdx.([]uint)
|
||||||
}
|
}
|
||||||
indexes = append(indexes, uint(event.Index.Uint64()))
|
indexes = append(indexes, uint(event.Index.Uint64()))
|
||||||
toRemoveTable.Set(event.Raw.BlockNumber, indexes)
|
toRemoveTable.Set(event.Raw.BlockNumber, indexes)
|
||||||
} else {
|
} else {
|
||||||
var eventsPerBlock []*contracts.RLNMemberRegistered
|
var eventsPerBlock []*contracts.RLNMemberRegistered
|
||||||
i_evt, ok := toInsertTable.Get(event.Raw.BlockNumber)
|
iEvt, ok := toInsertTable.Get(event.Raw.BlockNumber)
|
||||||
if ok {
|
if ok {
|
||||||
eventsPerBlock = i_evt.([]*contracts.RLNMemberRegistered)
|
eventsPerBlock = iEvt.([]*contracts.RLNMemberRegistered)
|
||||||
}
|
}
|
||||||
eventsPerBlock = append(eventsPerBlock, event)
|
eventsPerBlock = append(eventsPerBlock, event)
|
||||||
toInsertTable.Set(event.Raw.BlockNumber, eventsPerBlock)
|
toInsertTable.Set(event.Raw.BlockNumber, eventsPerBlock)
|
||||||
|
@ -159,11 +159,7 @@ func (gm *DynamicGroupManager) Start(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = gm.MembershipFetcher.HandleGroupUpdates(ctx, gm.handler); err != nil {
|
return gm.MembershipFetcher.HandleGroupUpdates(ctx, gm.handler)
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gm *DynamicGroupManager) loadCredential(ctx context.Context) error {
|
func (gm *DynamicGroupManager) loadCredential(ctx context.Context) error {
|
||||||
|
|
|
@ -18,10 +18,10 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface
|
// RegistrationEventHandler represents the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface
|
||||||
type RegistrationEventHandler = func([]*contracts.RLNMemberRegistered) error
|
type RegistrationEventHandler = func([]*contracts.RLNMemberRegistered) error
|
||||||
|
|
||||||
// for getting membershipRegsitered Events from the eth rpc
|
// MembershipFetcher is used for getting membershipRegsitered Events from the eth rpc
|
||||||
type MembershipFetcher struct {
|
type MembershipFetcher struct {
|
||||||
web3Config *web3.Config
|
web3Config *web3.Config
|
||||||
rln *rln.RLN
|
rln *rln.RLN
|
||||||
|
|
|
@ -3,8 +3,8 @@ package dynamic
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io/ioutil"
|
|
||||||
"math/big"
|
"math/big"
|
||||||
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -38,7 +38,7 @@ func (c *MockClient) BlockByNumber(ctx context.Context, number *big.Int) (*types
|
||||||
}
|
}
|
||||||
func NewMockClient(t *testing.T, blockFile string) *MockClient {
|
func NewMockClient(t *testing.T, blockFile string) *MockClient {
|
||||||
blockChain := MockBlockChain{}
|
blockChain := MockBlockChain{}
|
||||||
data, err := ioutil.ReadFile(blockFile)
|
data, err := os.ReadFile(blockFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -48,8 +48,8 @@ func NewMockClient(t *testing.T, blockFile string) *MockClient {
|
||||||
return &MockClient{blockChain: blockChain, errOnBlock: map[int64]*ErrCount{}}
|
return &MockClient{blockChain: blockChain, errOnBlock: map[int64]*ErrCount{}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *MockClient) SetErrorOnBlock(blockNum int64, err error, count int) {
|
func (c *MockClient) SetErrorOnBlock(blockNum int64, err error, count int) {
|
||||||
client.errOnBlock[blockNum] = &ErrCount{err: err, count: count}
|
c.errOnBlock[blockNum] = &ErrCount{err: err, count: count}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *MockClient) getFromAndToRange(query ethereum.FilterQuery) (int64, int64) {
|
func (c *MockClient) getFromAndToRange(query ethereum.FilterQuery) (int64, int64) {
|
||||||
|
|
|
@ -18,9 +18,6 @@ import (
|
||||||
r "github.com/waku-org/go-zerokit-rln/rln"
|
r "github.com/waku-org/go-zerokit-rln/rln"
|
||||||
)
|
)
|
||||||
|
|
||||||
const RLNRELAY_PUBSUB_TOPIC = "waku/2/rlnrelay/proto"
|
|
||||||
const RLNRELAY_CONTENT_TOPIC = "waku/2/rlnrelay/proto"
|
|
||||||
|
|
||||||
func TestWakuRLNRelaySuite(t *testing.T) {
|
func TestWakuRLNRelaySuite(t *testing.T) {
|
||||||
suite.Run(t, new(WakuRLNRelaySuite))
|
suite.Run(t, new(WakuRLNRelaySuite))
|
||||||
}
|
}
|
||||||
|
|
|
@ -194,6 +194,7 @@ func (rs RelayShards) BitVector() []byte {
|
||||||
return append(result, vec...)
|
return append(result, vec...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Generate a RelayShards from a byte slice
|
||||||
func FromBitVector(buf []byte) (RelayShards, error) {
|
func FromBitVector(buf []byte) (RelayShards, error) {
|
||||||
if len(buf) != 130 {
|
if len(buf) != 130 {
|
||||||
return RelayShards{}, errors.New("invalid data: expected 130 bytes")
|
return RelayShards{}, errors.New("invalid data: expected 130 bytes")
|
||||||
|
|
|
@ -31,7 +31,7 @@ type Result struct {
|
||||||
store Store
|
store Store
|
||||||
query *pb.HistoryQuery
|
query *pb.HistoryQuery
|
||||||
cursor *pb.Index
|
cursor *pb.Index
|
||||||
peerId peer.ID
|
peerID peer.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Result) Cursor() *pb.Index {
|
func (r *Result) Cursor() *pb.Index {
|
||||||
|
@ -43,7 +43,7 @@ func (r *Result) IsComplete() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Result) PeerID() peer.ID {
|
func (r *Result) PeerID() peer.ID {
|
||||||
return r.peerId
|
return r.peerID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Result) Query() *pb.HistoryQuery {
|
func (r *Result) Query() *pb.HistoryQuery {
|
||||||
|
@ -111,7 +111,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption
|
||||||
if params.s.pm == nil {
|
if params.s.pm == nil {
|
||||||
p, err = utils.SelectPeer(params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log)
|
p, err = utils.SelectPeer(params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log)
|
||||||
} else {
|
} else {
|
||||||
p, err = params.s.pm.SelectPeer(StoreID_v20beta4, fromThesePeers, params.s.log)
|
p, err = params.s.pm.SelectPeer(StoreID_v20beta4, fromThesePeers)
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
params.selectedPeer = p
|
params.selectedPeer = p
|
||||||
|
@ -148,7 +148,7 @@ func WithRequestID(requestID []byte) HistoryRequestOption {
|
||||||
// when creating a store request
|
// when creating a store request
|
||||||
func WithAutomaticRequestID() HistoryRequestOption {
|
func WithAutomaticRequestID() HistoryRequestOption {
|
||||||
return func(params *HistoryRequestParameters) {
|
return func(params *HistoryRequestParameters) {
|
||||||
params.requestID = protocol.GenerateRequestId()
|
params.requestID = protocol.GenerateRequestID()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -282,7 +282,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(params.requestID) == 0 {
|
if len(params.requestID) == 0 {
|
||||||
return nil, ErrInvalidId
|
return nil, ErrInvalidID
|
||||||
}
|
}
|
||||||
|
|
||||||
if params.cursor != nil {
|
if params.cursor != nil {
|
||||||
|
@ -321,7 +321,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
|
||||||
store: store,
|
store: store,
|
||||||
Messages: response.Messages,
|
Messages: response.Messages,
|
||||||
query: q,
|
query: q,
|
||||||
peerId: params.selectedPeer,
|
peerID: params.selectedPeer,
|
||||||
}
|
}
|
||||||
|
|
||||||
if response.PagingInfo != nil {
|
if response.PagingInfo != nil {
|
||||||
|
@ -379,7 +379,7 @@ func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) {
|
||||||
Messages: []*wpb.WakuMessage{},
|
Messages: []*wpb.WakuMessage{},
|
||||||
cursor: nil,
|
cursor: nil,
|
||||||
query: r.query,
|
query: r.query,
|
||||||
peerId: r.PeerID(),
|
peerID: r.PeerID(),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -400,7 +400,7 @@ func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
response, err := store.queryFrom(ctx, q, r.PeerID(), protocol.GenerateRequestId())
|
response, err := store.queryFrom(ctx, q, r.PeerID(), protocol.GenerateRequestID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -414,7 +414,7 @@ func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) {
|
||||||
store: store,
|
store: store,
|
||||||
Messages: response.Messages,
|
Messages: response.Messages,
|
||||||
query: q,
|
query: q,
|
||||||
peerId: r.PeerID(),
|
peerID: r.PeerID(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if response.PagingInfo != nil {
|
if response.PagingInfo != nil {
|
||||||
|
|
|
@ -32,8 +32,8 @@ var (
|
||||||
// that could be used to retrieve message history
|
// that could be used to retrieve message history
|
||||||
ErrNoPeersAvailable = errors.New("no suitable remote peers")
|
ErrNoPeersAvailable = errors.New("no suitable remote peers")
|
||||||
|
|
||||||
// ErrInvalidId is returned when no RequestID is given
|
// ErrInvalidID is returned when no RequestID is given
|
||||||
ErrInvalidId = errors.New("invalid request id")
|
ErrInvalidID = errors.New("invalid request id")
|
||||||
|
|
||||||
// ErrFailedToResumeHistory is returned when the node attempted to retrieve historic
|
// ErrFailedToResumeHistory is returned when the node attempted to retrieve historic
|
||||||
// messages to fill its own message history but for some reason it failed
|
// messages to fill its own message history but for some reason it failed
|
||||||
|
|
|
@ -243,7 +243,7 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c
|
||||||
for _, peer := range candidateList {
|
for _, peer := range candidateList {
|
||||||
func() {
|
func() {
|
||||||
defer queryWg.Done()
|
defer queryWg.Done()
|
||||||
result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestId())
|
result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestID())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
resultChan <- result
|
resultChan <- result
|
||||||
return
|
return
|
||||||
|
@ -298,7 +298,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var offset int64 = int64(20 * time.Nanosecond)
|
offset := int64(20 * time.Nanosecond)
|
||||||
currentTime := store.timesource.Now().UnixNano() + offset
|
currentTime := store.timesource.Now().UnixNano() + offset
|
||||||
lastSeenTime = max(lastSeenTime-offset, 0)
|
lastSeenTime = max(lastSeenTime-offset, 0)
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ type DB struct {
|
||||||
cancel func()
|
cancel func()
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDB(ctx context.Context, db *sql.DB, logger *zap.Logger) *DB {
|
func NewDB(db *sql.DB, logger *zap.Logger) *DB {
|
||||||
rdb := &DB{
|
rdb := &DB{
|
||||||
db: db,
|
db: db,
|
||||||
logger: logger.Named("rendezvous/db"),
|
logger: logger.Named("rendezvous/db"),
|
||||||
|
|
|
@ -51,7 +51,7 @@ func TestRendezvous(t *testing.T) {
|
||||||
err = sqlite.Migrations(db)
|
err = sqlite.Migrations(db)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
rdb := NewDB(ctx, db, utils.Logger())
|
rdb := NewDB(db, utils.Logger())
|
||||||
rendezvousPoint := NewRendezvous(rdb, nil, utils.Logger())
|
rendezvousPoint := NewRendezvous(rdb, nil, utils.Logger())
|
||||||
rendezvousPoint.SetHost(host1)
|
rendezvousPoint.SetHost(host1)
|
||||||
err = rendezvousPoint.Start(ctx)
|
err = rendezvousPoint.Start(ctx)
|
||||||
|
|
|
@ -113,6 +113,7 @@ func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (t
|
||||||
return offsets[mid], nil
|
return offsets[mid], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewNTPTimesource creates a timesource that uses NTP
|
||||||
func NewNTPTimesource(ntpServers []string, log *zap.Logger) *NTPTimeSource {
|
func NewNTPTimesource(ntpServers []string, log *zap.Logger) *NTPTimeSource {
|
||||||
return &NTPTimeSource{
|
return &NTPTimeSource{
|
||||||
servers: ntpServers,
|
servers: ntpServers,
|
||||||
|
|
|
@ -16,7 +16,7 @@ func EcdsaPubKeyToSecp256k1PublicKey(pubKey *ecdsa.PublicKey) *crypto.Secp256k1P
|
||||||
return (*crypto.Secp256k1PublicKey)(btcec.NewPublicKey(xFieldVal, yFieldVal))
|
return (*crypto.Secp256k1PublicKey)(btcec.NewPublicKey(xFieldVal, yFieldVal))
|
||||||
}
|
}
|
||||||
|
|
||||||
// EcdsaPubKeyToSecp256k1PublicKey converts an `ecdsa.PrivateKey` into a libp2p `crypto.Secp256k1PrivateKey“
|
// EcdsaPrivKeyToSecp256k1PrivKey converts an `ecdsa.PrivateKey` into a libp2p `crypto.Secp256k1PrivateKey“
|
||||||
func EcdsaPrivKeyToSecp256k1PrivKey(privKey *ecdsa.PrivateKey) *crypto.Secp256k1PrivateKey {
|
func EcdsaPrivKeyToSecp256k1PrivKey(privKey *ecdsa.PrivateKey) *crypto.Secp256k1PrivateKey {
|
||||||
privK, _ := btcec.PrivKeyFromBytes(privKey.D.Bytes())
|
privK, _ := btcec.PrivKeyFromBytes(privKey.D.Bytes())
|
||||||
return (*crypto.Secp256k1PrivateKey)(privK)
|
return (*crypto.Secp256k1PrivateKey)(privK)
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log *zap.Logger = nil
|
var log *zap.Logger
|
||||||
|
|
||||||
// Logger creates a zap.Logger with some reasonable defaults
|
// Logger creates a zap.Logger with some reasonable defaults
|
||||||
func Logger() *zap.Logger {
|
func Logger() *zap.Logger {
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
// some protocol
|
// some protocol
|
||||||
var ErrNoPeersAvailable = errors.New("no suitable peers found")
|
var ErrNoPeersAvailable = errors.New("no suitable peers found")
|
||||||
|
|
||||||
|
// GetPeerID is used to extract the peerID from a multiaddress
|
||||||
func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) {
|
func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) {
|
||||||
peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)
|
peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -72,7 +73,7 @@ func SelectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) {
|
||||||
// Note: Use this method only if WakuNode is not being initialized, otherwise use peermanager.SelectPeer.
|
// Note: Use this method only if WakuNode is not being initialized, otherwise use peermanager.SelectPeer.
|
||||||
// If a list of specific peers is passed, the peer will be chosen from that list assuming
|
// If a list of specific peers is passed, the peer will be chosen from that list assuming
|
||||||
// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore
|
// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore
|
||||||
func SelectPeer(host host.Host, protocolId protocol.ID, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) {
|
func SelectPeer(host host.Host, protocolID protocol.ID, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) {
|
||||||
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
|
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
|
||||||
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
|
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
|
||||||
// This will require us to check for various factors such as:
|
// This will require us to check for various factors such as:
|
||||||
|
@ -80,7 +81,7 @@ func SelectPeer(host host.Host, protocolId protocol.ID, specificPeers []peer.ID,
|
||||||
// - latency?
|
// - latency?
|
||||||
// - default store peer?
|
// - default store peer?
|
||||||
|
|
||||||
peers, err := FilterPeersByProto(host, specificPeers, protocolId)
|
peers, err := FilterPeersByProto(host, specificPeers, protocolID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -96,7 +97,7 @@ type pingResult struct {
|
||||||
// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time
|
// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time
|
||||||
// If a list of specific peers is passed, the peer will be chosen from that list assuming
|
// If a list of specific peers is passed, the peer will be chosen from that list assuming
|
||||||
// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore
|
// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore
|
||||||
func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId protocol.ID, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) {
|
func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolID protocol.ID, specificPeers []peer.ID, _ *zap.Logger) (peer.ID, error) {
|
||||||
var peers peer.IDSlice
|
var peers peer.IDSlice
|
||||||
|
|
||||||
peerSet := specificPeers
|
peerSet := specificPeers
|
||||||
|
@ -105,7 +106,7 @@ func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId pro
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, peer := range peerSet {
|
for _, peer := range peerSet {
|
||||||
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)
|
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ type Timesource interface {
|
||||||
func GetUnixEpoch(timesource ...Timesource) int64 {
|
func GetUnixEpoch(timesource ...Timesource) int64 {
|
||||||
if len(timesource) != 0 {
|
if len(timesource) != 0 {
|
||||||
return GetUnixEpochFrom(timesource[0].Now())
|
return GetUnixEpochFrom(timesource[0].Now())
|
||||||
} else {
|
|
||||||
return GetUnixEpochFrom(time.Now())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return GetUnixEpochFrom(time.Now())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue