diff --git a/waku/nwaku.go b/waku/nwaku.go index 99978a1..1176c65 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -328,7 +328,6 @@ const requestTimeout = 30 * time.Second type WakuConfig struct { Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` Nodekey string `json:"nodekey,omitempty"` Relay bool `json:"relay,omitempty"` Store bool `json:"store,omitempty"` @@ -352,12 +351,12 @@ type WakuConfig struct { Staticnodes []string `json:"staticnodes,omitempty"` Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` Discv5Discovery bool `json:"discv5Discovery,omitempty"` - Discv5UdpPort uint16 `json:"discv5UdpPort,omitempty"` + Discv5UdpPort int `json:"discv5UdpPort,omitempty"` ClusterID uint16 `json:"clusterId,omitempty"` Shards []uint16 `json:"shards,omitempty"` PeerExchange bool `json:"peerExchange,omitempty"` PeerExchangeNode string `json:"peerExchangeNode,omitempty"` - TcpPort uint16 `json:"tcpPort,omitempty"` + TcpPort int `json:"tcpPort,omitempty"` } // Waku represents a dark communication interface through the Ethereum @@ -757,8 +756,7 @@ func (n *WakuNode) Version() (string, error) { } func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { - // TODO: extract timeout from context - timeoutMs := time.Minute.Milliseconds() + timeoutMs := getContextTimeoutMilliseconds(ctx) b, err := json.Marshal(storeRequest) if err != nil { @@ -799,8 +797,7 @@ func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQu } func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { - // TODO: extract timeout from context - timeoutMs := time.Minute.Milliseconds() + timeoutMs := getContextTimeoutMilliseconds(ctx) jsonMsg, err := json.Marshal(message) if err != nil { @@ -841,9 +838,9 @@ func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsS defer C.free(unsafe.Pointer(cEnrTree)) defer C.free(unsafe.Pointer(cDnsServer)) - // TODO: extract timeout from context + timeoutMs := getContextTimeoutMilliseconds(ctx) wg.Add(1) - C.cGoWakuDnsDiscovery(n.wakuCtx, cEnrTree, cDnsServer, C.int(time.Minute.Milliseconds()), resp) + C.cGoWakuDnsDiscovery(n.wakuCtx, cEnrTree, cDnsServer, C.int(timeoutMs), resp) wg.Wait() if C.getRet(resp) == C.RET_OK { var addrsRet []multiaddr.Multiaddr @@ -876,9 +873,9 @@ func (n *WakuNode) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.D var cPeerId = C.CString(strings.Join(addrs, ",")) defer C.free(unsafe.Pointer(cPeerId)) - // TODO: extract timeout from ctx + timeoutMs := getContextTimeoutMilliseconds(ctx) wg.Add(1) - C.cGoWakuPingPeer(n.wakuCtx, cPeerId, C.int(time.Minute.Milliseconds()), resp) + C.cGoWakuPingPeer(n.wakuCtx, cPeerId, C.int(timeoutMs), resp) wg.Wait() if C.getRet(resp) == C.RET_OK { rttStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -974,9 +971,9 @@ func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerMultiAddr)) - // TODO: extract timeout from ctx + timeoutMs := getContextTimeoutMilliseconds(ctx) wg.Add(1) - C.cGoWakuConnect(n.wakuCtx, cPeerMultiAddr, C.int(time.Minute.Milliseconds()), resp) + C.cGoWakuConnect(n.wakuCtx, cPeerMultiAddr, C.int(timeoutMs), resp) wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -996,9 +993,9 @@ func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol li defer C.free(unsafe.Pointer(cPeerId)) defer C.free(unsafe.Pointer(cProtocol)) - // TODO: extract timeout from ctx + timeoutMs := getContextTimeoutMilliseconds(ctx) wg.Add(1) - C.cGoWakuDialPeerById(n.wakuCtx, cPeerId, cProtocol, C.int(time.Minute.Milliseconds()), resp) + C.cGoWakuDialPeerById(n.wakuCtx, cPeerId, cProtocol, C.int(timeoutMs), resp) wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -1155,9 +1152,10 @@ func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, p defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerMultiAddr)) defer C.free(unsafe.Pointer(cProtocol)) - // TODO: extract timeout from context + + timeoutMs := getContextTimeoutMilliseconds(ctx) wg.Add(1) - C.cGoWakuDialPeer(n.wakuCtx, cPeerMultiAddr, cProtocol, C.int(requestTimeout.Milliseconds()), resp) + C.cGoWakuDialPeer(n.wakuCtx, cPeerMultiAddr, cProtocol, C.int(timeoutMs), resp) wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -1173,3 +1171,11 @@ func (n *WakuNode) GetNumConnectedPeers() (int, error) { } return len(peers), nil } + +func getContextTimeoutMilliseconds(ctx context.Context) int { + deadline, ok := ctx.Deadline() + if ok { + return int(time.Until(deadline).Milliseconds()) + } + return 0 +} diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index 8bd86a9..d558511 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -34,7 +34,6 @@ func TestBasicWaku(t *testing.T) { // ctx := context.Background() nwakuConfig := WakuConfig{ - Port: 30303, Nodekey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", Relay: true, LogLevel: "DEBUG",