mirror of https://github.com/waku-org/js-waku.git
feat: Logger with log levels (#1672)
* setup a custom Logger with log level support * refactor codebase for to use new Logger with log levels * disallow usage of `debug` directly / only allow usage in/through custom Logger * remove `debug` from logger
This commit is contained in:
parent
1150ddcd02
commit
0f7d63ef93
|
@ -16,6 +16,15 @@
|
|||
],
|
||||
"globals": { "BigInt": true, "console": true, "WebAssembly": true },
|
||||
"rules": {
|
||||
"no-restricted-imports": [
|
||||
"error",
|
||||
{
|
||||
"paths": [{
|
||||
"name": "debug",
|
||||
"message": "The usage of 'debug' package directly is disallowed. Please use the custom logger from @waku/utils instead."
|
||||
}]
|
||||
}
|
||||
],
|
||||
"prettier/prettier": [
|
||||
"error",
|
||||
{
|
||||
|
|
|
@ -17,11 +17,11 @@ import {
|
|||
} from "@waku/interfaces";
|
||||
import { Libp2p, Tags } from "@waku/interfaces";
|
||||
import { shardInfoToPubSubTopics } from "@waku/utils";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { KeepAliveManager } from "./keep_alive_manager.js";
|
||||
|
||||
const log = debug("waku:connection-manager");
|
||||
const log = new Logger("connection-manager");
|
||||
|
||||
export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
|
||||
export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
|
||||
|
@ -128,14 +128,16 @@ export class ConnectionManager
|
|||
this.keepAliveManager = new KeepAliveManager(keepAliveOptions, relay);
|
||||
|
||||
this.run()
|
||||
.then(() => log(`Connection Manager is now running`))
|
||||
.catch((error) => log(`Unexpected error while running service`, error));
|
||||
.then(() => log.info(`Connection Manager is now running`))
|
||||
.catch((error) =>
|
||||
log.error(`Unexpected error while running service`, error)
|
||||
);
|
||||
|
||||
// libp2p emits `peer:discovery` events during its initialization
|
||||
// which means that before the ConnectionManager is initialized, some peers may have been discovered
|
||||
// we will dial the peers in peerStore ONCE before we start to listen to the `peer:discovery` events within the ConnectionManager
|
||||
this.dialPeerStorePeers().catch((error) =>
|
||||
log(`Unexpected error while dialing peer store peers`, error)
|
||||
log.error(`Unexpected error while dialing peer store peers`, error)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -153,7 +155,7 @@ export class ConnectionManager
|
|||
try {
|
||||
await Promise.all(dialPromises);
|
||||
} catch (error) {
|
||||
log(`Unexpected error while dialing peer store peers`, error);
|
||||
log.error(`Unexpected error while dialing peer store peers`, error);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -185,7 +187,9 @@ export class ConnectionManager
|
|||
let dialAttempt = 0;
|
||||
while (dialAttempt < this.options.maxDialAttemptsForPeer) {
|
||||
try {
|
||||
log(`Dialing peer ${peerId.toString()} on attempt ${dialAttempt + 1}`);
|
||||
log.info(
|
||||
`Dialing peer ${peerId.toString()} on attempt ${dialAttempt + 1}`
|
||||
);
|
||||
await this.libp2p.dial(peerId);
|
||||
|
||||
const tags = await this.getTagNamesForPeer(peerId);
|
||||
|
@ -201,10 +205,12 @@ export class ConnectionManager
|
|||
} catch (error) {
|
||||
if (error instanceof AggregateError) {
|
||||
// Handle AggregateError
|
||||
log(`Error dialing peer ${peerId.toString()} - ${error.errors}`);
|
||||
log.error(
|
||||
`Error dialing peer ${peerId.toString()} - ${error.errors}`
|
||||
);
|
||||
} else {
|
||||
// Handle generic error
|
||||
log(
|
||||
log.error(
|
||||
`Error dialing peer ${peerId.toString()} - ${
|
||||
(error as any).message
|
||||
}`
|
||||
|
@ -230,9 +236,9 @@ export class ConnectionManager
|
|||
let errorMessage;
|
||||
if (error instanceof AggregateError) {
|
||||
if (!error.errors) {
|
||||
log(`No errors array found for AggregateError`);
|
||||
log.warn(`No errors array found for AggregateError`);
|
||||
} else if (error.errors.length === 0) {
|
||||
log(`Errors array is empty for AggregateError`);
|
||||
log.warn(`Errors array is empty for AggregateError`);
|
||||
} else {
|
||||
errorMessage = JSON.stringify(error.errors[0]);
|
||||
}
|
||||
|
@ -240,8 +246,8 @@ export class ConnectionManager
|
|||
errorMessage = error.message;
|
||||
}
|
||||
|
||||
log(
|
||||
`Deleting undialable peer ${peerId.toString()} from peer store. Error: ${errorMessage}`
|
||||
log.info(
|
||||
`Deleting undialable peer ${peerId.toString()} from peer store. Reason: ${errorMessage}`
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -259,9 +265,9 @@ export class ConnectionManager
|
|||
try {
|
||||
this.keepAliveManager.stop(peerId);
|
||||
await this.libp2p.hangUp(peerId);
|
||||
log(`Dropped connection with peer ${peerId.toString()}`);
|
||||
log.info(`Dropped connection with peer ${peerId.toString()}`);
|
||||
} catch (error) {
|
||||
log(
|
||||
log.error(
|
||||
`Error dropping connection with peer ${peerId.toString()} - ${error}`
|
||||
);
|
||||
}
|
||||
|
@ -275,7 +281,7 @@ export class ConnectionManager
|
|||
const peerId = this.pendingPeerDialQueue.shift();
|
||||
if (!peerId) return;
|
||||
this.attemptDial(peerId).catch((error) => {
|
||||
log(error);
|
||||
log.error(error);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -322,7 +328,7 @@ export class ConnectionManager
|
|||
}
|
||||
|
||||
this.dialPeer(peerId).catch((err) => {
|
||||
log(`Error dialing peer ${peerId.toString()} : ${err}`);
|
||||
log.error(`Error dialing peer ${peerId.toString()} : ${err}`);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -336,7 +342,7 @@ export class ConnectionManager
|
|||
try {
|
||||
await this.attemptDial(peerId);
|
||||
} catch (error) {
|
||||
log(`Error dialing peer ${peerId.toString()} : ${error}`);
|
||||
log.error(`Error dialing peer ${peerId.toString()} : ${error}`);
|
||||
}
|
||||
})();
|
||||
},
|
||||
|
@ -404,7 +410,7 @@ export class ConnectionManager
|
|||
// if we're already connected to the peer, don't dial
|
||||
const isConnected = this.libp2p.getConnections(peerId).length > 0;
|
||||
if (isConnected) {
|
||||
log(`Already connected to peer ${peerId.toString()}. Not dialing.`);
|
||||
log.warn(`Already connected to peer ${peerId.toString()}. Not dialing.`);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -414,7 +420,7 @@ export class ConnectionManager
|
|||
peerId,
|
||||
this.libp2p.peerStore
|
||||
);
|
||||
log(
|
||||
log.warn(
|
||||
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${
|
||||
this.configuredPubSubTopics
|
||||
}).
|
||||
|
@ -425,7 +431,7 @@ export class ConnectionManager
|
|||
|
||||
// if the peer is not dialable based on bootstrap status, don't dial
|
||||
if (!(await this.isPeerDialableBasedOnBootstrapStatus(peerId))) {
|
||||
log(
|
||||
log.warn(
|
||||
`Peer ${peerId.toString()} is not dialable based on bootstrap status. Not dialing.`
|
||||
);
|
||||
return false;
|
||||
|
@ -486,7 +492,7 @@ export class ConnectionManager
|
|||
const peer = await this.libp2p.peerStore.get(peerId);
|
||||
return Array.from(peer.tags.keys());
|
||||
} catch (error) {
|
||||
log(`Failed to get peer ${peerId}, error: ${error}`);
|
||||
log.error(`Failed to get peer ${peerId}, error: ${error}`);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import {
|
|||
groupByContentTopic,
|
||||
toAsyncIterator
|
||||
} from "@waku/utils";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
import all from "it-all";
|
||||
import * as lp from "it-length-prefixed";
|
||||
import { pipe } from "it-pipe";
|
||||
|
@ -36,7 +36,7 @@ import {
|
|||
FilterSubscribeRpc
|
||||
} from "./filter_rpc.js";
|
||||
|
||||
const log = debug("waku:filter:v2");
|
||||
const log = new Logger("filter:v2");
|
||||
|
||||
type SubscriptionCallback<T extends IDecodedMessage> = {
|
||||
decoders: IDecoder<T>[];
|
||||
|
@ -108,7 +108,7 @@ class Subscription {
|
|||
);
|
||||
}
|
||||
|
||||
log(
|
||||
log.info(
|
||||
"Subscribed to peer ",
|
||||
this.peer.id.toString(),
|
||||
"for content topics",
|
||||
|
@ -183,9 +183,9 @@ class Subscription {
|
|||
);
|
||||
}
|
||||
|
||||
log("Ping successful");
|
||||
log.info("Ping successful");
|
||||
} catch (error) {
|
||||
log("Error pinging: ", error);
|
||||
log.error("Error pinging: ", error);
|
||||
throw new Error("Error pinging: " + error);
|
||||
}
|
||||
}
|
||||
|
@ -216,7 +216,7 @@ class Subscription {
|
|||
}
|
||||
|
||||
this.subscriptionCallbacks.clear();
|
||||
log("Unsubscribed from all content topics");
|
||||
log.info("Unsubscribed from all content topics");
|
||||
} catch (error) {
|
||||
throw new Error("Error unsubscribing from all content topics: " + error);
|
||||
}
|
||||
|
@ -226,7 +226,7 @@ class Subscription {
|
|||
const contentTopic = message.contentTopic;
|
||||
const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic);
|
||||
if (!subscriptionCallback) {
|
||||
log("No subscription callback available for ", contentTopic);
|
||||
log.error("No subscription callback available for ", contentTopic);
|
||||
return;
|
||||
}
|
||||
await pushMessage(subscriptionCallback, this.pubsubTopic, message);
|
||||
|
@ -260,7 +260,7 @@ class Filter extends BaseProtocol implements IReceiver {
|
|||
this.pubsubTopics = options?.pubsubTopics || [DefaultPubSubTopic];
|
||||
|
||||
libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
|
||||
log("Failed to register ", FilterCodecs.PUSH, e);
|
||||
log.error("Failed to register ", FilterCodecs.PUSH, e);
|
||||
});
|
||||
|
||||
this.activeSubscriptions = new Map();
|
||||
|
@ -332,7 +332,6 @@ class Filter extends BaseProtocol implements IReceiver {
|
|||
}
|
||||
|
||||
private onRequest(streamData: IncomingStreamData): void {
|
||||
log("Receiving message push");
|
||||
try {
|
||||
pipe(streamData.stream, lp.decode, async (source) => {
|
||||
for await (const bytes of source) {
|
||||
|
@ -341,12 +340,12 @@ class Filter extends BaseProtocol implements IReceiver {
|
|||
const { pubsubTopic, wakuMessage } = response;
|
||||
|
||||
if (!wakuMessage) {
|
||||
log("Received empty message");
|
||||
log.error("Received empty message");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!pubsubTopic) {
|
||||
log("PubSub topic missing from push message");
|
||||
log.error("PubSub topic missing from push message");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -357,7 +356,9 @@ class Filter extends BaseProtocol implements IReceiver {
|
|||
);
|
||||
|
||||
if (!subscription) {
|
||||
log(`No subscription locally registered for topic ${pubsubTopic}`);
|
||||
log.error(
|
||||
`No subscription locally registered for topic ${pubsubTopic}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -365,14 +366,14 @@ class Filter extends BaseProtocol implements IReceiver {
|
|||
}
|
||||
}).then(
|
||||
() => {
|
||||
log("Receiving pipe closed.");
|
||||
log.info("Receiving pipe closed.");
|
||||
},
|
||||
(e) => {
|
||||
log("Error with receiving pipe", e);
|
||||
log.error("Error with receiving pipe", e);
|
||||
}
|
||||
);
|
||||
} catch (e) {
|
||||
log("Error decoding message", e);
|
||||
log.error("Error decoding message", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -392,7 +393,7 @@ async function pushMessage<T extends IDecodedMessage>(
|
|||
|
||||
const { contentTopic } = message;
|
||||
if (!contentTopic) {
|
||||
log("Message has no content topic, skipping");
|
||||
log.warn("Message has no content topic, skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -407,6 +408,6 @@ async function pushMessage<T extends IDecodedMessage>(
|
|||
|
||||
await callback(decodedMessage);
|
||||
} catch (e) {
|
||||
log("Error decoding message", e);
|
||||
log.error("Error decoding message", e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,14 +2,14 @@ import type { PeerId } from "@libp2p/interface/peer-id";
|
|||
import type { PeerStore } from "@libp2p/interface/peer-store";
|
||||
import type { IRelay, PeerIdStr } from "@waku/interfaces";
|
||||
import type { KeepAliveOptions } from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import debug from "debug";
|
||||
import type { PingService } from "libp2p/ping";
|
||||
|
||||
import { createEncoder } from "./message/version_0.js";
|
||||
|
||||
export const RelayPingContentTopic = "/relay-ping/1/ping/null";
|
||||
const log = debug("waku:keep-alive");
|
||||
const log = new Logger("keep-alive");
|
||||
|
||||
export class KeepAliveManager {
|
||||
private pingKeepAliveTimers: Map<string, ReturnType<typeof setInterval>>;
|
||||
|
@ -48,9 +48,9 @@ export class KeepAliveManager {
|
|||
// also update the peer store with the latency
|
||||
try {
|
||||
ping = await libp2pPing.ping(peerId);
|
||||
log(`Ping succeeded (${peerIdStr})`, ping);
|
||||
log.info(`Ping succeeded (${peerIdStr})`, ping);
|
||||
} catch (error) {
|
||||
log(`Ping failed for peer (${peerIdStr}).
|
||||
log.error(`Ping failed for peer (${peerIdStr}).
|
||||
Next ping will be attempted in ${pingPeriodSecs} seconds.
|
||||
`);
|
||||
return;
|
||||
|
@ -63,10 +63,10 @@ export class KeepAliveManager {
|
|||
}
|
||||
});
|
||||
} catch (e) {
|
||||
log("Failed to update ping", e);
|
||||
log.error("Failed to update ping", e);
|
||||
}
|
||||
} catch (e) {
|
||||
log(`Ping failed (${peerIdStr})`, e);
|
||||
log.error(`Ping failed (${peerIdStr})`, e);
|
||||
}
|
||||
})();
|
||||
}, pingPeriodSecs * 1000);
|
||||
|
@ -128,10 +128,10 @@ export class KeepAliveManager {
|
|||
ephemeral: true
|
||||
});
|
||||
const interval = setInterval(() => {
|
||||
log("Sending Waku Relay ping message");
|
||||
log.info("Sending Waku Relay ping message");
|
||||
relay
|
||||
.send(encoder, { payload: new Uint8Array([1]) })
|
||||
.catch((e) => log("Failed to send relay ping", e));
|
||||
.catch((e) => log.error("Failed to send relay ping", e));
|
||||
}, relayPeriodSecs * 1000);
|
||||
intervals.push(interval);
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ import {
|
|||
} from "@waku/interfaces";
|
||||
import { PushResponse } from "@waku/proto";
|
||||
import { ensurePubsubTopicIsConfigured, isSizeUnderCap } from "@waku/utils";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
import all from "it-all";
|
||||
import * as lp from "it-length-prefixed";
|
||||
import { pipe } from "it-pipe";
|
||||
|
@ -23,7 +23,7 @@ import { DefaultPubSubTopic } from "../constants.js";
|
|||
|
||||
import { PushRpc } from "./push_rpc.js";
|
||||
|
||||
const log = debug("waku:light-push");
|
||||
const log = new Logger("light-push");
|
||||
|
||||
export const LightPushCodec = "/vac/waku/lightpush/2.0.0-beta1";
|
||||
export { PushResponse };
|
||||
|
@ -57,18 +57,18 @@ class LightPush extends BaseProtocol implements ILightPush {
|
|||
): Promise<PreparePushMessageResult> {
|
||||
try {
|
||||
if (!message.payload || message.payload.length === 0) {
|
||||
log("Failed to send waku light push: payload is empty");
|
||||
log.error("Failed to send waku light push: payload is empty");
|
||||
return { query: null, error: SendError.EMPTY_PAYLOAD };
|
||||
}
|
||||
|
||||
if (!isSizeUnderCap(message.payload)) {
|
||||
log("Failed to send waku light push: message is bigger than 1MB");
|
||||
log.error("Failed to send waku light push: message is bigger than 1MB");
|
||||
return { query: null, error: SendError.SIZE_TOO_BIG };
|
||||
}
|
||||
|
||||
const protoMessage = await encoder.toProtoObj(message);
|
||||
if (!protoMessage) {
|
||||
log("Failed to encode to protoMessage, aborting push");
|
||||
log.error("Failed to encode to protoMessage, aborting push");
|
||||
return {
|
||||
query: null,
|
||||
error: SendError.ENCODE_FAILED
|
||||
|
@ -78,7 +78,7 @@ class LightPush extends BaseProtocol implements ILightPush {
|
|||
const query = PushRpc.createRequest(protoMessage, pubsubTopic);
|
||||
return { query, error: null };
|
||||
} catch (error) {
|
||||
log("Failed to prepare push message", error);
|
||||
log.error("Failed to prepare push message", error);
|
||||
|
||||
return {
|
||||
query: null,
|
||||
|
@ -124,7 +124,10 @@ class LightPush extends BaseProtocol implements ILightPush {
|
|||
try {
|
||||
stream = await this.getStream(peer);
|
||||
} catch (err) {
|
||||
log(`Failed to get a stream for remote peer${peer.id.toString()}`, err);
|
||||
log.error(
|
||||
`Failed to get a stream for remote peer${peer.id.toString()}`,
|
||||
err
|
||||
);
|
||||
return { recipients, error: SendError.REMOTE_PEER_FAULT };
|
||||
}
|
||||
|
||||
|
@ -138,7 +141,7 @@ class LightPush extends BaseProtocol implements ILightPush {
|
|||
async (source) => await all(source)
|
||||
);
|
||||
} catch (err) {
|
||||
log("Failed to send waku light push request", err);
|
||||
log.error("Failed to send waku light push request", err);
|
||||
return { recipients, error: SendError.GENERIC_FAIL };
|
||||
}
|
||||
|
||||
|
@ -151,17 +154,17 @@ class LightPush extends BaseProtocol implements ILightPush {
|
|||
try {
|
||||
response = PushRpc.decode(bytes).response;
|
||||
} catch (err) {
|
||||
log("Failed to decode push reply", err);
|
||||
log.error("Failed to decode push reply", err);
|
||||
return { recipients, error: SendError.DECODE_FAILED };
|
||||
}
|
||||
|
||||
if (!response) {
|
||||
log("Remote peer fault: No response in PushRPC");
|
||||
log.error("Remote peer fault: No response in PushRPC");
|
||||
return { recipients, error: SendError.REMOTE_PEER_FAULT };
|
||||
}
|
||||
|
||||
if (!response.isSuccess) {
|
||||
log("Remote peer rejected the message: ", response.info);
|
||||
log.error("Remote peer rejected the message: ", response.info);
|
||||
return { recipients, error: SendError.REMOTE_PEER_REJECTED };
|
||||
}
|
||||
|
||||
|
|
|
@ -10,11 +10,11 @@ import type {
|
|||
PubSubTopic
|
||||
} from "@waku/interfaces";
|
||||
import { proto_message as proto } from "@waku/proto";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { DefaultPubSubTopic } from "../constants.js";
|
||||
|
||||
const log = debug("waku:message:version-0");
|
||||
const log = new Logger("message:version-0");
|
||||
const OneMillion = BigInt(1_000_000);
|
||||
|
||||
export const Version = 0;
|
||||
|
@ -139,7 +139,6 @@ export class Decoder implements IDecoder<DecodedMessage> {
|
|||
|
||||
fromWireToProtoObj(bytes: Uint8Array): Promise<IProtoMessage | undefined> {
|
||||
const protoMessage = proto.WakuMessage.decode(bytes);
|
||||
log("Message decoded", protoMessage);
|
||||
return Promise.resolve({
|
||||
payload: protoMessage.payload,
|
||||
contentTopic: protoMessage.contentTopic,
|
||||
|
@ -158,7 +157,7 @@ export class Decoder implements IDecoder<DecodedMessage> {
|
|||
// https://rfc.vac.dev/spec/14/
|
||||
// > If omitted, the value SHOULD be interpreted as version 0.
|
||||
if (proto.version ?? 0 !== Version) {
|
||||
log(
|
||||
log.error(
|
||||
"Failed to decode due to incorrect version, expected:",
|
||||
Version,
|
||||
", actual:",
|
||||
|
|
|
@ -11,8 +11,8 @@ import {
|
|||
} from "@waku/interfaces";
|
||||
import { proto_store as proto } from "@waku/proto";
|
||||
import { ensurePubsubTopicIsConfigured, isDefined } from "@waku/utils";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { concat, utf8ToBytes } from "@waku/utils/bytes";
|
||||
import debug from "debug";
|
||||
import all from "it-all";
|
||||
import * as lp from "it-length-prefixed";
|
||||
import { pipe } from "it-pipe";
|
||||
|
@ -26,7 +26,7 @@ import { HistoryRpc, PageDirection, Params } from "./history_rpc.js";
|
|||
|
||||
import HistoryError = proto.HistoryResponse.HistoryError;
|
||||
|
||||
const log = debug("waku:store");
|
||||
const log = new Logger("store");
|
||||
|
||||
export const StoreCodec = "/vac/waku/store/2.0.0-beta4";
|
||||
|
||||
|
@ -284,8 +284,6 @@ class Store extends BaseProtocol implements IStore {
|
|||
{ contentTopics, startTime, endTime }
|
||||
);
|
||||
|
||||
log("Querying history with the following options", options);
|
||||
|
||||
const peer = (
|
||||
await this.getPeers({
|
||||
numPeers: this.NUM_PEERS_PROTOCOL,
|
||||
|
@ -325,7 +323,7 @@ async function* paginate<T extends IDecodedMessage>(
|
|||
|
||||
const historyRpcQuery = HistoryRpc.createQuery(queryOpts);
|
||||
|
||||
log(
|
||||
log.info(
|
||||
"Querying store peer",
|
||||
`for (${queryOpts.pubsubTopic})`,
|
||||
queryOpts.contentTopics
|
||||
|
@ -349,7 +347,7 @@ async function* paginate<T extends IDecodedMessage>(
|
|||
const reply = historyRpcQuery.decode(bytes);
|
||||
|
||||
if (!reply.response) {
|
||||
log("Stopping pagination due to store `response` field missing");
|
||||
log.warn("Stopping pagination due to store `response` field missing");
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -360,13 +358,13 @@ async function* paginate<T extends IDecodedMessage>(
|
|||
}
|
||||
|
||||
if (!response.messages || !response.messages.length) {
|
||||
log(
|
||||
log.warn(
|
||||
"Stopping pagination due to store `response.messages` field missing or empty"
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
log(`${response.messages.length} messages retrieved from store`);
|
||||
log.error(`${response.messages.length} messages retrieved from store`);
|
||||
|
||||
yield response.messages.map((protoMsg) => {
|
||||
const contentTopic = protoMsg.contentTopic;
|
||||
|
@ -386,7 +384,7 @@ async function* paginate<T extends IDecodedMessage>(
|
|||
if (typeof nextCursor === "undefined") {
|
||||
// If the server does not return cursor then there is an issue,
|
||||
// Need to abort, or we end up in an infinite loop
|
||||
log(
|
||||
log.warn(
|
||||
"Stopping pagination due to `response.pagingInfo.cursor` missing from store response"
|
||||
);
|
||||
break;
|
||||
|
|
|
@ -2,19 +2,19 @@ import type { PeerUpdate } from "@libp2p/interface";
|
|||
import type { Stream } from "@libp2p/interface/connection";
|
||||
import { Peer } from "@libp2p/interface/peer-store";
|
||||
import { Libp2p } from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { selectConnection } from "@waku/utils/libp2p";
|
||||
import debug from "debug";
|
||||
|
||||
export class StreamManager {
|
||||
private streamPool: Map<string, Promise<Stream | void>>;
|
||||
private readonly log: debug.Debugger;
|
||||
private readonly log: Logger;
|
||||
|
||||
constructor(
|
||||
public multicodec: string,
|
||||
public getConnections: Libp2p["getConnections"],
|
||||
public addEventListener: Libp2p["addEventListener"]
|
||||
) {
|
||||
this.log = debug(`waku:stream-manager:${multicodec}`);
|
||||
this.log = new Logger(`stream-manager:${multicodec}`);
|
||||
this.addEventListener(
|
||||
"peer:update",
|
||||
this.handlePeerUpdateStreamPool.bind(this)
|
||||
|
@ -57,7 +57,9 @@ export class StreamManager {
|
|||
private prepareNewStream(peer: Peer): void {
|
||||
const streamPromise = this.newStream(peer).catch(() => {
|
||||
// No error thrown as this call is not triggered by the user
|
||||
this.log(`Failed to prepare a new stream for ${peer.id.toString()}`);
|
||||
this.log.error(
|
||||
`Failed to prepare a new stream for ${peer.id.toString()}`
|
||||
);
|
||||
});
|
||||
this.streamPool.set(peer.id.toString(), streamPromise);
|
||||
}
|
||||
|
@ -65,7 +67,7 @@ export class StreamManager {
|
|||
private handlePeerUpdateStreamPool = (evt: CustomEvent<PeerUpdate>): void => {
|
||||
const peer = evt.detail.peer;
|
||||
if (peer.protocols.includes(this.multicodec)) {
|
||||
this.log(`Preemptively opening a stream to ${peer.id.toString()}`);
|
||||
this.log.error(`Preemptively opening a stream to ${peer.id.toString()}`);
|
||||
this.prepareNewStream(peer);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
import type { IdentifyResult } from "@libp2p/interface";
|
||||
import type { IBaseProtocol, IRelay, Waku } from "@waku/interfaces";
|
||||
import { Protocols } from "@waku/interfaces";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { pEvent } from "p-event";
|
||||
|
||||
const log = debug("waku:wait-for-remote-peer");
|
||||
const log = new Logger("wait-for-remote-peer");
|
||||
|
||||
/**
|
||||
* Wait for a remote peer to be ready given the passed protocols.
|
||||
|
@ -79,14 +79,13 @@ async function waitForConnectedPeer(protocol: IBaseProtocol): Promise<void> {
|
|||
const peers = await protocol.peers();
|
||||
|
||||
if (peers.length) {
|
||||
log(`${codec} peer found: `, peers[0].id.toString());
|
||||
log.info(`${codec} peer found: `, peers[0].id.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const cb = (evt: CustomEvent<IdentifyResult>): void => {
|
||||
if (evt.detail?.protocols?.includes(codec)) {
|
||||
log("Resolving for", codec, evt.detail.protocols);
|
||||
protocol.removeLibp2pEventListener("peer:identify", cb);
|
||||
resolve();
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ import type {
|
|||
Waku
|
||||
} from "@waku/interfaces";
|
||||
import { Protocols } from "@waku/interfaces";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { ConnectionManager } from "./connection_manager.js";
|
||||
|
||||
|
@ -19,7 +19,7 @@ export const DefaultPingKeepAliveValueSecs = 5 * 60;
|
|||
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
|
||||
export const DefaultUserAgent = "js-waku";
|
||||
|
||||
const log = debug("waku:waku");
|
||||
const log = new Logger("waku");
|
||||
|
||||
export interface WakuOptions {
|
||||
/**
|
||||
|
@ -92,7 +92,7 @@ export class WakuNode implements Waku {
|
|||
this.relay
|
||||
);
|
||||
|
||||
log(
|
||||
log.info(
|
||||
"Waku node created",
|
||||
peerId,
|
||||
`relay: ${!!this.relay}, store: ${!!this.store}, light push: ${!!this
|
||||
|
@ -127,7 +127,7 @@ export class WakuNode implements Waku {
|
|||
codecs.push(codec)
|
||||
);
|
||||
} else {
|
||||
log(
|
||||
log.error(
|
||||
"Relay codec not included in dial codec: protocol not mounted locally"
|
||||
);
|
||||
}
|
||||
|
@ -136,7 +136,7 @@ export class WakuNode implements Waku {
|
|||
if (this.store) {
|
||||
codecs.push(this.store.multicodec);
|
||||
} else {
|
||||
log(
|
||||
log.error(
|
||||
"Store codec not included in dial codec: protocol not mounted locally"
|
||||
);
|
||||
}
|
||||
|
@ -145,7 +145,7 @@ export class WakuNode implements Waku {
|
|||
if (this.lightPush) {
|
||||
codecs.push(this.lightPush.multicodec);
|
||||
} else {
|
||||
log(
|
||||
log.error(
|
||||
"Light Push codec not included in dial codec: protocol not mounted locally"
|
||||
);
|
||||
}
|
||||
|
@ -154,13 +154,13 @@ export class WakuNode implements Waku {
|
|||
if (this.filter) {
|
||||
codecs.push(this.filter.multicodec);
|
||||
} else {
|
||||
log(
|
||||
log.error(
|
||||
"Filter codec not included in dial codec: protocol not mounted locally"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
log(`Dialing to ${peerId.toString()} with protocols ${_protocols}`);
|
||||
log.info(`Dialing to ${peerId.toString()} with protocols ${_protocols}`);
|
||||
|
||||
return this.libp2p.dialProtocol(peerId, codecs);
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ import type {
|
|||
NodeCapabilityCount,
|
||||
SearchContext
|
||||
} from "@waku/interfaces";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { DnsOverHttps } from "./dns_over_https.js";
|
||||
import { ENRTree } from "./enrtree.js";
|
||||
|
@ -14,7 +14,7 @@ import {
|
|||
yieldNodesUntilCapabilitiesFulfilled
|
||||
} from "./fetch_nodes.js";
|
||||
|
||||
const log = debug("waku:discovery:dns");
|
||||
const log = new Logger("discovery:dns");
|
||||
|
||||
export class DnsNodeDiscovery {
|
||||
private readonly dns: DnsClient;
|
||||
|
@ -54,7 +54,7 @@ export class DnsNodeDiscovery {
|
|||
this._errorTolerance,
|
||||
() => this._search(domain, context)
|
||||
);
|
||||
log(
|
||||
log.info(
|
||||
"retrieved peers: ",
|
||||
peers.map((peer) => {
|
||||
return {
|
||||
|
@ -126,13 +126,15 @@ export class DnsNodeDiscovery {
|
|||
return null;
|
||||
}
|
||||
} catch (error) {
|
||||
log(
|
||||
log.error(
|
||||
`Failed to search DNS tree ${entryType} at subdomain ${subdomain}: ${error}`
|
||||
);
|
||||
return null;
|
||||
}
|
||||
} catch (error) {
|
||||
log(`Failed to retrieve TXT record at subdomain ${subdomain}: ${error}`);
|
||||
log.error(
|
||||
`Failed to retrieve TXT record at subdomain ${subdomain}: ${error}`
|
||||
);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ import type {
|
|||
IEnr,
|
||||
NodeCapabilityCount
|
||||
} from "@waku/interfaces";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import {
|
||||
DEFAULT_BOOTSTRAP_TAG_NAME,
|
||||
|
@ -22,7 +22,7 @@ import {
|
|||
} from "./constants.js";
|
||||
import { DnsNodeDiscovery } from "./dns.js";
|
||||
|
||||
const log = debug("waku:peer-discovery-dns");
|
||||
const log = new Logger("peer-discovery-dns");
|
||||
|
||||
/**
|
||||
* Parse options and expose function to return bootstrap peer addresses.
|
||||
|
@ -43,14 +43,14 @@ export class PeerDiscoveryDns
|
|||
this._options = options;
|
||||
|
||||
const { enrUrls } = options;
|
||||
log("Use following EIP-1459 ENR Tree URLs: ", enrUrls);
|
||||
log.info("Use following EIP-1459 ENR Tree URLs: ", enrUrls);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start discovery process
|
||||
*/
|
||||
async start(): Promise<void> {
|
||||
log("Starting peer discovery via dns");
|
||||
log.info("Starting peer discovery via dns");
|
||||
|
||||
this._started = true;
|
||||
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
import type { DnsClient } from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { bytesToUtf8 } from "@waku/utils/bytes";
|
||||
import debug from "debug";
|
||||
import { Endpoint, query, wellknown } from "dns-query";
|
||||
|
||||
const log = debug("waku:dns-over-https");
|
||||
const log = new Logger("dns-over-https");
|
||||
|
||||
export class DnsOverHttps implements DnsClient {
|
||||
/**
|
||||
|
@ -50,7 +50,7 @@ export class DnsOverHttps implements DnsClient {
|
|||
);
|
||||
answers = res.answers;
|
||||
} catch (error) {
|
||||
log("query failed: ", error);
|
||||
log.error("query failed: ", error);
|
||||
throw new Error("DNS query failed");
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import type { IEnr, NodeCapabilityCount, Waku2 } from "@waku/interfaces";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
const log = debug("waku:discovery:fetch_nodes");
|
||||
const log = new Logger("discovery:fetch_nodes");
|
||||
|
||||
/**
|
||||
* Fetch nodes using passed [[getNode]] until all wanted capabilities are
|
||||
|
@ -46,7 +46,9 @@ export async function fetchNodesUntilCapabilitiesFulfilled(
|
|||
peers.push(peer);
|
||||
}
|
||||
}
|
||||
log(`got new peer candidate from DNS address=${peer.nodeId}@${peer.ip}`);
|
||||
log.info(
|
||||
`got new peer candidate from DNS address=${peer.nodeId}@${peer.ip}`
|
||||
);
|
||||
}
|
||||
|
||||
totalSearches++;
|
||||
|
@ -98,7 +100,9 @@ export async function* yieldNodesUntilCapabilitiesFulfilled(
|
|||
yield peer;
|
||||
}
|
||||
}
|
||||
log(`got new peer candidate from DNS address=${peer.nodeId}@${peer.ip}`);
|
||||
log.info(
|
||||
`got new peer candidate from DNS address=${peer.nodeId}@${peer.ip}`
|
||||
);
|
||||
}
|
||||
totalSearches++;
|
||||
}
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
import * as RLP from "@ethersproject/rlp";
|
||||
import type { ENRKey, ENRValue } from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { bytesToHex, bytesToUtf8, hexToBytes } from "@waku/utils/bytes";
|
||||
import { log } from "debug";
|
||||
import { fromString } from "uint8arrays/from-string";
|
||||
|
||||
import { ENR } from "./enr.js";
|
||||
|
||||
const log = new Logger("enr:decoder");
|
||||
|
||||
export class EnrDecoder {
|
||||
static fromString(encoded: string): Promise<ENR> {
|
||||
if (!encoded.startsWith(ENR.RECORD_PREFIX)) {
|
||||
|
@ -30,7 +32,7 @@ async function fromValues(values: Uint8Array[]): Promise<ENR> {
|
|||
try {
|
||||
obj[bytesToUtf8(kvs[i])] = kvs[i + 1];
|
||||
} catch (e) {
|
||||
log("Failed to decode ENR key to UTF-8, skipping it", kvs[i], e);
|
||||
log.error("Failed to decode ENR key to UTF-8, skipping it", kvs[i], e);
|
||||
}
|
||||
}
|
||||
const _seq = decodeSeq(seq);
|
||||
|
|
|
@ -9,7 +9,7 @@ import type {
|
|||
SequenceNumber,
|
||||
ShardInfo
|
||||
} from "@waku/interfaces";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { ERR_INVALID_ID } from "./constants.js";
|
||||
import { keccak256, verifySignature } from "./crypto.js";
|
||||
|
@ -18,7 +18,7 @@ import { createPeerIdFromPublicKey } from "./peer_id.js";
|
|||
import { RawEnr } from "./raw_enr.js";
|
||||
import * as v4 from "./v4.js";
|
||||
|
||||
const log = debug("waku:enr");
|
||||
const log = new Logger("enr");
|
||||
|
||||
export enum TransportProtocol {
|
||||
TCP = "tcp",
|
||||
|
@ -47,7 +47,7 @@ export class ENR extends RawEnr implements IEnr {
|
|||
enr.peerId = await createPeerIdFromPublicKey(publicKey);
|
||||
}
|
||||
} catch (e) {
|
||||
log("Could not calculate peer id for ENR", e);
|
||||
log.error("Could not calculate peer id for ENR", e);
|
||||
}
|
||||
|
||||
return enr;
|
||||
|
@ -67,7 +67,7 @@ export class ENR extends RawEnr implements IEnr {
|
|||
|
||||
get shardInfo(): ShardInfo | undefined {
|
||||
if (this.rs && this.rsv) {
|
||||
log("Warning: ENR contains both `rs` and `rsv` fields.");
|
||||
log.warn("ENR contains both `rs` and `rsv` fields.");
|
||||
}
|
||||
return this.rs || this.rsv;
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ import type {
|
|||
IProtoMessage
|
||||
} from "@waku/interfaces";
|
||||
import { WakuMessage } from "@waku/proto";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { DecodedMessage } from "./decoded_message.js";
|
||||
import {
|
||||
|
@ -29,7 +29,7 @@ import {
|
|||
export { generatePrivateKey, getPublicKey };
|
||||
export type { Encoder, Decoder, DecodedMessage };
|
||||
|
||||
const log = debug("waku:message-encryption:ecies");
|
||||
const log = new Logger("message-encryption:ecies");
|
||||
|
||||
class Encoder implements IEncoder {
|
||||
constructor(
|
||||
|
@ -130,7 +130,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
|
|||
const cipherPayload = protoMessage.payload;
|
||||
|
||||
if (protoMessage.version !== Version) {
|
||||
log(
|
||||
log.error(
|
||||
"Failed to decrypt due to incorrect version, expected:",
|
||||
Version,
|
||||
", actual:",
|
||||
|
@ -144,7 +144,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
|
|||
try {
|
||||
payload = await decryptAsymmetric(cipherPayload, this.privateKey);
|
||||
} catch (e) {
|
||||
log(
|
||||
log.error(
|
||||
`Failed to decrypt message using asymmetric decryption for contentTopic: ${this.contentTopic}`,
|
||||
e
|
||||
);
|
||||
|
@ -152,18 +152,22 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
|
|||
}
|
||||
|
||||
if (!payload) {
|
||||
log(`Failed to decrypt payload for contentTopic ${this.contentTopic}`);
|
||||
log.error(
|
||||
`Failed to decrypt payload for contentTopic ${this.contentTopic}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const res = postCipher(payload);
|
||||
|
||||
if (!res) {
|
||||
log(`Failed to decode payload for contentTopic ${this.contentTopic}`);
|
||||
log.error(
|
||||
`Failed to decode payload for contentTopic ${this.contentTopic}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
log("Message decrypted", protoMessage);
|
||||
log.info("Message decrypted", protoMessage);
|
||||
return new DecodedMessage(
|
||||
pubsubTopic,
|
||||
protoMessage,
|
||||
|
|
|
@ -10,7 +10,7 @@ import type {
|
|||
PubSubTopic
|
||||
} from "@waku/interfaces";
|
||||
import { WakuMessage } from "@waku/proto";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { DecodedMessage } from "./decoded_message.js";
|
||||
import {
|
||||
|
@ -25,7 +25,7 @@ import { generateSymmetricKey, OneMillion, Version } from "./index.js";
|
|||
export { generateSymmetricKey };
|
||||
export type { DecodedMessage, Encoder, Decoder };
|
||||
|
||||
const log = debug("waku:message-encryption:symmetric");
|
||||
const log = new Logger("message-encryption:symmetric");
|
||||
|
||||
class Encoder implements IEncoder {
|
||||
constructor(
|
||||
|
@ -126,7 +126,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
|
|||
const cipherPayload = protoMessage.payload;
|
||||
|
||||
if (protoMessage.version !== Version) {
|
||||
log(
|
||||
log.error(
|
||||
"Failed to decrypt due to incorrect version, expected:",
|
||||
Version,
|
||||
", actual:",
|
||||
|
@ -140,7 +140,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
|
|||
try {
|
||||
payload = await decryptSymmetric(cipherPayload, this.symKey);
|
||||
} catch (e) {
|
||||
log(
|
||||
log.error(
|
||||
`Failed to decrypt message using asymmetric decryption for contentTopic: ${this.contentTopic}`,
|
||||
e
|
||||
);
|
||||
|
@ -148,18 +148,22 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
|
|||
}
|
||||
|
||||
if (!payload) {
|
||||
log(`Failed to decrypt payload for contentTopic ${this.contentTopic}`);
|
||||
log.error(
|
||||
`Failed to decrypt payload for contentTopic ${this.contentTopic}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const res = postCipher(payload);
|
||||
|
||||
if (!res) {
|
||||
log(`Failed to decode payload for contentTopic ${this.contentTopic}`);
|
||||
log.error(
|
||||
`Failed to decode payload for contentTopic ${this.contentTopic}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
log("Message decrypted", protoMessage);
|
||||
log.info("Message decrypted", protoMessage);
|
||||
return new DecodedMessage(
|
||||
pubsubTopic,
|
||||
protoMessage,
|
||||
|
|
|
@ -7,7 +7,7 @@ import type {
|
|||
PeerInfo
|
||||
} from "@waku/interfaces";
|
||||
import { isDefined } from "@waku/utils";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
import all from "it-all";
|
||||
import * as lp from "it-length-prefixed";
|
||||
import { pipe } from "it-pipe";
|
||||
|
@ -17,7 +17,7 @@ import { PeerExchangeRPC } from "./rpc.js";
|
|||
|
||||
export const PeerExchangeCodec = "/vac/waku/peer-exchange/2.0.0-alpha1";
|
||||
|
||||
const log = debug("waku:peer-exchange");
|
||||
const log = new Logger("peer-exchange");
|
||||
|
||||
/**
|
||||
* Implementation of the Peer Exchange protocol (https://rfc.vac.dev/spec/34/)
|
||||
|
@ -63,7 +63,9 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
|
|||
const { response } = PeerExchangeRPC.decode(bytes);
|
||||
|
||||
if (!response) {
|
||||
log("PeerExchangeRPC message did not contains a `response` field");
|
||||
log.error(
|
||||
"PeerExchangeRPC message did not contains a `response` field"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -76,7 +78,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
|
|||
})
|
||||
);
|
||||
} catch (err) {
|
||||
log("Failed to decode push reply", err);
|
||||
log.error("Failed to decode push reply", err);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,11 +9,11 @@ import type { PeerId } from "@libp2p/interface/peer-id";
|
|||
import type { PeerInfo } from "@libp2p/interface/peer-info";
|
||||
import { encodeRelayShard } from "@waku/enr";
|
||||
import { Libp2pComponents, Tags } from "@waku/interfaces";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js";
|
||||
|
||||
const log = debug("waku:peer-exchange-discovery");
|
||||
const log = new Logger("peer-exchange-discovery");
|
||||
|
||||
const DEFAULT_PEER_EXCHANGE_REQUEST_NODES = 10;
|
||||
const DEFAULT_PEER_EXCHANGE_QUERY_INTERVAL_MS = 10 * 1000;
|
||||
|
@ -74,7 +74,7 @@ export class PeerExchangeDiscovery
|
|||
|
||||
this.queryingPeers.add(peerId.toString());
|
||||
this.startRecurringQueries(peerId).catch((error) =>
|
||||
log(`Error querying peer ${error}`)
|
||||
log.error(`Error querying peer ${error}`)
|
||||
);
|
||||
};
|
||||
|
||||
|
@ -94,7 +94,7 @@ export class PeerExchangeDiscovery
|
|||
return;
|
||||
}
|
||||
|
||||
log("Starting peer exchange node discovery, discovering peers");
|
||||
log.info("Starting peer exchange node discovery, discovering peers");
|
||||
|
||||
// might be better to use "peer:identify" or "peer:update"
|
||||
this.components.events.addEventListener(
|
||||
|
@ -108,7 +108,7 @@ export class PeerExchangeDiscovery
|
|||
*/
|
||||
stop(): void {
|
||||
if (!this.isStarted) return;
|
||||
log("Stopping peer exchange node discovery");
|
||||
log.info("Stopping peer exchange node discovery");
|
||||
this.isStarted = false;
|
||||
this.queryingPeers.clear();
|
||||
this.components.events.removeEventListener(
|
||||
|
@ -134,7 +134,7 @@ export class PeerExchangeDiscovery
|
|||
maxRetries = DEFAULT_MAX_RETRIES
|
||||
} = this.options;
|
||||
|
||||
log(
|
||||
log.info(
|
||||
`Querying peer: ${peerIdStr} (attempt ${
|
||||
this.queryAttempts.get(peerIdStr) ?? 1
|
||||
})`
|
||||
|
@ -152,7 +152,7 @@ export class PeerExchangeDiscovery
|
|||
setTimeout(() => {
|
||||
this.queryAttempts.set(peerIdStr, currentAttempt + 1);
|
||||
this.startRecurringQueries(peerId).catch((error) => {
|
||||
log(`Error in startRecurringQueries: ${error}`);
|
||||
log.error(`Error in startRecurringQueries: ${error}`);
|
||||
});
|
||||
}, queryInterval * currentAttempt);
|
||||
};
|
||||
|
@ -164,14 +164,14 @@ export class PeerExchangeDiscovery
|
|||
});
|
||||
|
||||
if (!peerInfos) {
|
||||
log("Peer exchange query failed, no peer info returned");
|
||||
log.error("Peer exchange query failed, no peer info returned");
|
||||
return;
|
||||
}
|
||||
|
||||
for (const _peerInfo of peerInfos) {
|
||||
const { ENR } = _peerInfo;
|
||||
if (!ENR) {
|
||||
log("No ENR in peerInfo object, skipping");
|
||||
log.warn("No ENR in peerInfo object, skipping");
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -200,7 +200,7 @@ export class PeerExchangeDiscovery
|
|||
})
|
||||
});
|
||||
|
||||
log(`Discovered peer: ${peerId.toString()}`);
|
||||
log.info(`Discovered peer: ${peerId.toString()}`);
|
||||
|
||||
this.dispatchEvent(
|
||||
new CustomEvent<PeerInfo>("peer", {
|
||||
|
@ -215,7 +215,7 @@ export class PeerExchangeDiscovery
|
|||
}
|
||||
|
||||
private abortQueriesForPeer(peerIdStr: string): void {
|
||||
log(`Aborting queries for peer: ${peerIdStr}`);
|
||||
log.info(`Aborting queries for peer: ${peerIdStr}`);
|
||||
this.queryingPeers.delete(peerIdStr);
|
||||
this.queryAttempts.delete(peerIdStr);
|
||||
}
|
||||
|
|
|
@ -27,13 +27,13 @@ import {
|
|||
} from "@waku/interfaces";
|
||||
import { isSizeUnderCap, toAsyncIterator } from "@waku/utils";
|
||||
import { pushOrInitMapSet } from "@waku/utils";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { RelayCodecs } from "./constants.js";
|
||||
import { messageValidator } from "./message_validator.js";
|
||||
import { TopicOnlyDecoder } from "./topic_only_message.js";
|
||||
|
||||
const log = debug("waku:relay");
|
||||
const log = new Logger("relay");
|
||||
|
||||
export type Observer<T extends IDecodedMessage> = {
|
||||
decoder: IDecoder<T>;
|
||||
|
@ -105,7 +105,7 @@ class Relay implements IRelay {
|
|||
|
||||
const { pubsubTopic } = encoder;
|
||||
if (!this.pubsubTopics.has(pubsubTopic)) {
|
||||
log("Failed to send waku relay: topic not configured");
|
||||
log.error("Failed to send waku relay: topic not configured");
|
||||
return {
|
||||
recipients,
|
||||
errors: [SendError.TOPIC_NOT_CONFIGURED]
|
||||
|
@ -113,7 +113,7 @@ class Relay implements IRelay {
|
|||
}
|
||||
|
||||
if (!isSizeUnderCap(message.payload)) {
|
||||
log("Failed to send waku relay: message is bigger that 1MB");
|
||||
log.error("Failed to send waku relay: message is bigger that 1MB");
|
||||
return {
|
||||
recipients,
|
||||
errors: [SendError.SIZE_TOO_BIG]
|
||||
|
@ -122,7 +122,7 @@ class Relay implements IRelay {
|
|||
|
||||
const msg = await encoder.toWire(message);
|
||||
if (!msg) {
|
||||
log("Failed to encode message, aborting publish");
|
||||
log.error("Failed to encode message, aborting publish");
|
||||
return {
|
||||
recipients,
|
||||
errors: [SendError.ENCODE_FAILED]
|
||||
|
@ -202,7 +202,7 @@ class Relay implements IRelay {
|
|||
): Promise<void> {
|
||||
const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj(bytes);
|
||||
if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) {
|
||||
log("Message does not have a content topic, skipping");
|
||||
log.warn("Message does not have a content topic, skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -226,7 +226,7 @@ class Relay implements IRelay {
|
|||
try {
|
||||
const protoMsg = await decoder.fromWireToProtoObj(bytes);
|
||||
if (!protoMsg) {
|
||||
log(
|
||||
log.error(
|
||||
"Internal error: message previously decoded failed on 2nd pass."
|
||||
);
|
||||
return;
|
||||
|
@ -235,10 +235,13 @@ class Relay implements IRelay {
|
|||
if (msg) {
|
||||
await callback(msg);
|
||||
} else {
|
||||
log("Failed to decode messages on", topicOnlyMsg.contentTopic);
|
||||
log.error(
|
||||
"Failed to decode messages on",
|
||||
topicOnlyMsg.contentTopic
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
log("Error while decoding message:", error);
|
||||
log.error("Error while decoding message:", error);
|
||||
}
|
||||
})();
|
||||
})
|
||||
|
@ -255,12 +258,11 @@ class Relay implements IRelay {
|
|||
"gossipsub:message",
|
||||
(event: CustomEvent<GossipsubMessage>) => {
|
||||
if (event.detail.msg.topic !== pubsubTopic) return;
|
||||
log(`Message received on ${pubsubTopic}`);
|
||||
|
||||
this.processIncomingMessage(
|
||||
event.detail.msg.topic,
|
||||
event.detail.msg.data
|
||||
).catch((e) => log("Failed to process incoming message", e));
|
||||
).catch((e) => log.error("Failed to process incoming message", e));
|
||||
}
|
||||
);
|
||||
|
||||
|
|
|
@ -2,16 +2,16 @@ import type { PeerId } from "@libp2p/interface/peer-id";
|
|||
import type { Message } from "@libp2p/interface/pubsub";
|
||||
import { TopicValidatorResult } from "@libp2p/interface/pubsub";
|
||||
import { proto_message as proto } from "@waku/proto";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
const log = debug("waku:relay");
|
||||
const log = new Logger("relay");
|
||||
|
||||
export function messageValidator(
|
||||
peer: PeerId,
|
||||
message: Message
|
||||
): TopicValidatorResult {
|
||||
const startTime = performance.now();
|
||||
log(`validating message from ${peer} received on ${message.topic}`);
|
||||
log.info(`validating message from ${peer} received on ${message.topic}`);
|
||||
let result = TopicValidatorResult.Accept;
|
||||
|
||||
try {
|
||||
|
@ -30,6 +30,18 @@ export function messageValidator(
|
|||
}
|
||||
|
||||
const endTime = performance.now();
|
||||
log(`Validation time (must be <100ms): ${endTime - startTime}ms`);
|
||||
|
||||
const timeTakenMs = endTime - startTime;
|
||||
|
||||
if (timeTakenMs > 100) {
|
||||
log.warn(
|
||||
`message validation took ${timeTakenMs}ms for peer ${peer} on topic ${message.topic}. This should be less than 100ms.`
|
||||
);
|
||||
} else {
|
||||
log.info(
|
||||
`message validation took ${timeTakenMs}ms for peer ${peer} on topic ${message.topic}`
|
||||
);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -5,9 +5,6 @@ import type {
|
|||
IProtoMessage
|
||||
} from "@waku/interfaces";
|
||||
import { TopicOnlyMessage as ProtoTopicOnlyMessage } from "@waku/proto";
|
||||
import debug from "debug";
|
||||
|
||||
const log = debug("waku:message:topic-only");
|
||||
|
||||
export class TopicOnlyMessage implements IDecodedMessage {
|
||||
public payload: Uint8Array = new Uint8Array();
|
||||
|
@ -32,7 +29,6 @@ export class TopicOnlyDecoder implements IDecoder<TopicOnlyMessage> {
|
|||
|
||||
fromWireToProtoObj(bytes: Uint8Array): Promise<IProtoMessage | undefined> {
|
||||
const protoMessage = ProtoTopicOnlyMessage.decode(bytes);
|
||||
log("Message decoded", protoMessage);
|
||||
return Promise.resolve({
|
||||
contentTopic: protoMessage.contentTopic,
|
||||
payload: new Uint8Array(),
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
import { DecodedMessage, DefaultPubSubTopic } from "@waku/core";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { AssertionError, expect } from "chai";
|
||||
import debug from "debug";
|
||||
import isEqual from "lodash/isEqual";
|
||||
|
||||
import { MessageRpcResponse } from "./node/interfaces.js";
|
||||
|
||||
import { base64ToUtf8, delay, NimGoNode } from "./index.js";
|
||||
|
||||
const log = debug("waku:test");
|
||||
const log = new Logger("test:message-collector");
|
||||
|
||||
/**
|
||||
* Class responsible for collecting messages.
|
||||
|
@ -22,7 +22,7 @@ export class MessageCollector {
|
|||
constructor(private nwaku?: NimGoNode) {
|
||||
if (!this.nwaku) {
|
||||
this.callback = (msg: DecodedMessage): void => {
|
||||
log("Got a message");
|
||||
log.info("Got a message");
|
||||
this.list.push(msg);
|
||||
};
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ export class MessageCollector {
|
|||
if (typeof message.payload === "string") {
|
||||
return message.payload === text;
|
||||
} else if (message.payload instanceof Uint8Array) {
|
||||
log(`Checking payload: ${bytesToUtf8(message.payload)}`);
|
||||
log.info(`Checking payload: ${bytesToUtf8(message.payload)}`);
|
||||
return isEqual(message.payload, utf8ToBytes(text));
|
||||
}
|
||||
return false;
|
||||
|
@ -79,7 +79,7 @@ export class MessageCollector {
|
|||
try {
|
||||
this.list = await this.nwaku.messages(pubsubTopic);
|
||||
} catch (error) {
|
||||
log(`Can't retrieve messages because of ${error}`);
|
||||
log.error(`Can't retrieve messages because of ${error}`);
|
||||
await delay(10);
|
||||
}
|
||||
}
|
||||
|
@ -95,7 +95,7 @@ export class MessageCollector {
|
|||
if (this.count == numMessages) {
|
||||
return true;
|
||||
} else {
|
||||
log(`Was expecting exactly ${numMessages} messages`);
|
||||
log.warn(`Was expecting exactly ${numMessages} messages`);
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
import fs from "fs";
|
||||
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
import Docker from "dockerode";
|
||||
|
||||
import { Args } from "./interfaces.js";
|
||||
|
||||
const log = debug("waku:docker");
|
||||
const log = new Logger("test:docker");
|
||||
|
||||
const NETWORK_NAME = "waku";
|
||||
const SUBNET = "172.18.0.0/16";
|
||||
|
@ -102,7 +102,7 @@ export default class Dockerode {
|
|||
}
|
||||
|
||||
const argsArrayWithIP = [...argsArray, `--nat=extip:${this.containerIp}`];
|
||||
log(`Args: ${argsArray.join(" ")}`);
|
||||
log.info(`Running node with args: ${argsArray.join(" ")}`);
|
||||
|
||||
const container = await this.docker.createContainer({
|
||||
Image: this.IMAGE_NAME,
|
||||
|
@ -152,15 +152,17 @@ export default class Dockerode {
|
|||
);
|
||||
|
||||
this.containerId = container.id;
|
||||
log(`${this.containerId} started at ${new Date().toLocaleTimeString()}`);
|
||||
log.info(
|
||||
`${this.containerId} started at ${new Date().toLocaleTimeString()}`
|
||||
);
|
||||
return container;
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
if (!this.container) {
|
||||
log("ContainerId not set");
|
||||
log.error("ContainerId not set");
|
||||
} else {
|
||||
log(
|
||||
log.info(
|
||||
`Shutting down container ID ${
|
||||
this.containerId
|
||||
} at ${new Date().toLocaleTimeString()}`
|
||||
|
@ -173,7 +175,7 @@ export default class Dockerode {
|
|||
}
|
||||
|
||||
private async confirmImageExistsOrPull(): Promise<void> {
|
||||
log(`Confirming that image ${this.IMAGE_NAME} exists`);
|
||||
log.info(`Confirming that image ${this.IMAGE_NAME} exists`);
|
||||
|
||||
const doesImageExist = this.docker.getImage(this.IMAGE_NAME);
|
||||
if (!doesImageExist) {
|
||||
|
@ -193,7 +195,7 @@ export default class Dockerode {
|
|||
});
|
||||
});
|
||||
}
|
||||
log(`Image ${this.IMAGE_NAME} successfully found`);
|
||||
log.info(`Image ${this.IMAGE_NAME} successfully found`);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3,8 +3,8 @@ import { peerIdFromString } from "@libp2p/peer-id";
|
|||
import { Multiaddr, multiaddr } from "@multiformats/multiaddr";
|
||||
import { DefaultPubSubTopic } from "@waku/core";
|
||||
import { isDefined } from "@waku/utils";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { bytesToHex, hexToBytes } from "@waku/utils/bytes";
|
||||
import debug from "debug";
|
||||
import pRetry from "p-retry";
|
||||
import portfinder from "portfinder";
|
||||
|
||||
|
@ -21,7 +21,7 @@ import {
|
|||
MessageRpcResponse
|
||||
} from "./interfaces.js";
|
||||
|
||||
const log = debug("waku:node");
|
||||
const log = new Logger("test:node");
|
||||
|
||||
const WAKU_SERVICE_NODE_PARAMS =
|
||||
process.env.WAKU_SERVICE_NODE_PARAMS ?? undefined;
|
||||
|
@ -160,17 +160,19 @@ export class NimGoNode {
|
|||
WAKU_SERVICE_NODE_PARAMS
|
||||
);
|
||||
} catch (error) {
|
||||
log("Nwaku node failed to start:", error);
|
||||
log.error("Nwaku node failed to start:", error);
|
||||
await this.stop();
|
||||
throw error;
|
||||
}
|
||||
try {
|
||||
log(`Waiting to see '${NODE_READY_LOG_LINE}' in ${this.type} logs`);
|
||||
log.info(
|
||||
`Waiting to see '${NODE_READY_LOG_LINE}' in ${this.type} logs`
|
||||
);
|
||||
await this.waitForLog(NODE_READY_LOG_LINE, 15000);
|
||||
if (process.env.CI) await delay(100);
|
||||
log(`${this.type} node has been started`);
|
||||
log.info(`${this.type} node has been started`);
|
||||
} catch (error) {
|
||||
log(`Error starting ${this.type}: ${error}`);
|
||||
log.error(`Error starting ${this.type}: ${error}`);
|
||||
if (this.docker.container) await this.docker.stop();
|
||||
throw error;
|
||||
}
|
||||
|
@ -380,7 +382,7 @@ export class NimGoNode {
|
|||
return await pRetry(
|
||||
async () => {
|
||||
try {
|
||||
log("RPC Query: ", method, params);
|
||||
log.info("Making an RPC Query: ", method, params);
|
||||
const res = await fetch(this.rpcUrl, {
|
||||
method: "POST",
|
||||
body: JSON.stringify({
|
||||
|
@ -392,10 +394,10 @@ export class NimGoNode {
|
|||
headers: new Headers({ "Content-Type": "application/json" })
|
||||
});
|
||||
const json = await res.json();
|
||||
log(`RPC Response: `, JSON.stringify(json));
|
||||
log.info(`Received RPC Response: `, JSON.stringify(json));
|
||||
return json.result;
|
||||
} catch (error) {
|
||||
log(`${this.rpcUrl} failed with error:`, error);
|
||||
log.error(`${this.rpcUrl} failed with error:`, error);
|
||||
await delay(10);
|
||||
throw error;
|
||||
}
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
import { Waku } from "@waku/interfaces";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
import pRetry from "p-retry";
|
||||
|
||||
import { NimGoNode } from "./index.js";
|
||||
|
||||
const log = debug("waku:test");
|
||||
const log = new Logger("test:teardown");
|
||||
|
||||
export async function tearDownNodes(
|
||||
nwakuNodes: NimGoNode | NimGoNode[],
|
||||
|
@ -20,7 +20,7 @@ export async function tearDownNodes(
|
|||
try {
|
||||
await nwaku.stop();
|
||||
} catch (error) {
|
||||
log("Nwaku failed to stop:", error);
|
||||
log.error("Nwaku failed to stop:", error);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
|
@ -36,7 +36,7 @@ export async function tearDownNodes(
|
|||
try {
|
||||
await waku.stop();
|
||||
} catch (error) {
|
||||
log("Waku failed to stop:", error);
|
||||
log.error("Waku failed to stop:", error);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
|
|
|
@ -18,9 +18,9 @@ import {
|
|||
createEncoder as symEncoder
|
||||
} from "@waku/message-encryption/symmetric";
|
||||
import { createLightNode } from "@waku/sdk";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
import debug from "debug";
|
||||
|
||||
import {
|
||||
delay,
|
||||
|
@ -31,7 +31,7 @@ import {
|
|||
} from "../src/index.js";
|
||||
import { NimGoNode } from "../src/node/node.js";
|
||||
|
||||
const log = debug("waku:test:ephemeral");
|
||||
const log = new Logger("test:ephemeral");
|
||||
|
||||
const TestContentTopic = "/test/1/ephemeral/utf8";
|
||||
const TestEncoder = createEncoder({
|
||||
|
@ -127,18 +127,18 @@ describe("Waku Message Ephemeral field", () => {
|
|||
nwaku.getMultiaddrWithId()
|
||||
]);
|
||||
|
||||
log("Waku nodes created");
|
||||
log.info("Waku nodes created");
|
||||
|
||||
await Promise.all([
|
||||
waku1.dial(nimWakuMultiaddr),
|
||||
waku2.dial(nimWakuMultiaddr)
|
||||
]);
|
||||
|
||||
log("Waku nodes connected to nwaku");
|
||||
log.info("Waku nodes connected to nwaku");
|
||||
|
||||
await waitForRemotePeer(waku1, [Protocols.LightPush]);
|
||||
|
||||
log("Sending messages using light push");
|
||||
log.info("Sending messages using light push");
|
||||
await Promise.all([
|
||||
waku1.lightPush.send(asymEncoder, asymMsg),
|
||||
waku1.lightPush.send(symEncoder, symMsg),
|
||||
|
@ -148,8 +148,8 @@ describe("Waku Message Ephemeral field", () => {
|
|||
await waitForRemotePeer(waku2, [Protocols.Store]);
|
||||
|
||||
const messages: DecodedMessage[] = [];
|
||||
log("Retrieve messages from store");
|
||||
|
||||
log.info("Retrieving messages from store");
|
||||
for await (const msgPromises of waku2.store.queryGenerator([
|
||||
asymDecoder,
|
||||
symDecoder,
|
||||
|
|
|
@ -7,14 +7,14 @@ import {
|
|||
} from "@waku/core";
|
||||
import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces";
|
||||
import { createLightNode } from "@waku/sdk";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import debug from "debug";
|
||||
import { Context } from "mocha";
|
||||
|
||||
import { makeLogFileName, NimGoNode, NOISE_KEY_1 } from "../../src/index.js";
|
||||
|
||||
// Constants for test configuration.
|
||||
export const log = debug("waku:test:filter");
|
||||
export const log = new Logger("test:filter");
|
||||
export const TestContentTopic = "/test/1/waku-filter";
|
||||
export const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
|
||||
export const TestDecoder = createDecoder(TestContentTopic);
|
||||
|
@ -88,7 +88,7 @@ export async function runNodes(
|
|||
});
|
||||
await waku.start();
|
||||
} catch (error) {
|
||||
log("jswaku node failed to start:", error);
|
||||
log.error("jswaku node failed to start:", error);
|
||||
}
|
||||
|
||||
if (waku) {
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
import { createEncoder, waitForRemotePeer } from "@waku/core";
|
||||
import { LightNode, Protocols } from "@waku/interfaces";
|
||||
import { createLightNode, utf8ToBytes } from "@waku/sdk";
|
||||
import debug from "debug";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { makeLogFileName, NimGoNode, NOISE_KEY_1 } from "../../src/index.js";
|
||||
|
||||
// Constants for test configuration.
|
||||
export const log = debug("waku:test:lightpush");
|
||||
export const log = new Logger("test:lightpush");
|
||||
export const TestContentTopic = "/test/1/waku-light-push/utf8";
|
||||
export const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
|
||||
export const messageText = "Light Push works!";
|
||||
|
@ -30,7 +30,7 @@ export async function runNodes(
|
|||
});
|
||||
await waku.start();
|
||||
} catch (error) {
|
||||
log("jswaku node failed to start:", error);
|
||||
log.error("jswaku node failed to start:", error);
|
||||
}
|
||||
|
||||
if (waku) {
|
||||
|
|
|
@ -20,9 +20,9 @@ import {
|
|||
generateSymmetricKey
|
||||
} from "@waku/message-encryption/symmetric";
|
||||
import { createRelayNode } from "@waku/sdk";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
import debug from "debug";
|
||||
|
||||
import {
|
||||
delay,
|
||||
|
@ -37,7 +37,7 @@ import { MessageRpcResponse } from "../src/node/interfaces.js";
|
|||
import { base64ToUtf8, NimGoNode } from "../src/node/node.js";
|
||||
import { generateRandomUint8Array } from "../src/random_array.js";
|
||||
|
||||
const log = debug("waku:test");
|
||||
const log = new Logger("test:relay");
|
||||
|
||||
const TestContentTopic = "/test/1/waku-relay/utf8";
|
||||
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
|
||||
|
@ -58,7 +58,7 @@ describe("Waku Relay [node only]", () => {
|
|||
beforeEach(async function () {
|
||||
this.timeout(10000);
|
||||
|
||||
log("Starting JS Waku instances");
|
||||
log.info("Starting JS Waku instances");
|
||||
[waku1, waku2] = await Promise.all([
|
||||
createRelayNode({ staticNoiseKey: NOISE_KEY_1 }).then((waku) =>
|
||||
waku.start().then(() => waku)
|
||||
|
@ -68,18 +68,18 @@ describe("Waku Relay [node only]", () => {
|
|||
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
||||
}).then((waku) => waku.start().then(() => waku))
|
||||
]);
|
||||
log("Instances started, adding waku2 to waku1's address book");
|
||||
log.info("Instances started, adding waku2 to waku1's address book");
|
||||
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
||||
multiaddrs: waku2.libp2p.getMultiaddrs()
|
||||
});
|
||||
await waku1.dial(waku2.libp2p.peerId);
|
||||
|
||||
log("Wait for mutual pubsub subscription");
|
||||
log.info("Wait for mutual pubsub subscription");
|
||||
await Promise.all([
|
||||
waitForRemotePeer(waku1, [Protocols.Relay]),
|
||||
waitForRemotePeer(waku2, [Protocols.Relay])
|
||||
]);
|
||||
log("before each hook done");
|
||||
log.info("before each hook done");
|
||||
});
|
||||
|
||||
afterEach(async function () {
|
||||
|
@ -90,7 +90,7 @@ describe("Waku Relay [node only]", () => {
|
|||
});
|
||||
|
||||
it("Subscribe", async function () {
|
||||
log("Getting subscribers");
|
||||
log.info("Getting subscribers");
|
||||
const subscribers1 = waku1.libp2p.services
|
||||
.pubsub!.getSubscribers(DefaultPubSubTopic)
|
||||
.map((p) => p.toString());
|
||||
|
@ -98,7 +98,7 @@ describe("Waku Relay [node only]", () => {
|
|||
.pubsub!.getSubscribers(DefaultPubSubTopic)
|
||||
.map((p) => p.toString());
|
||||
|
||||
log("Asserting mutual subscription");
|
||||
log.info("Asserting mutual subscription");
|
||||
expect(subscribers1).to.contain(waku2.libp2p.peerId.toString());
|
||||
expect(subscribers2).to.contain(waku1.libp2p.peerId.toString());
|
||||
});
|
||||
|
|
|
@ -264,11 +264,11 @@ describe("Waku Store, general", function () {
|
|||
waku2.dial(nimWakuMultiaddr)
|
||||
]);
|
||||
|
||||
log("Waku nodes connected to nwaku");
|
||||
log.info("Waku nodes connected to nwaku");
|
||||
|
||||
await waitForRemotePeer(waku, [Protocols.LightPush]);
|
||||
|
||||
log("Sending messages using light push");
|
||||
log.info("Sending messages using light push");
|
||||
await Promise.all([
|
||||
waku.lightPush.send(eciesEncoder, asymMsg),
|
||||
waku.lightPush.send(symEncoder, symMsg),
|
||||
|
@ -279,7 +279,7 @@ describe("Waku Store, general", function () {
|
|||
await waitForRemotePeer(waku2, [Protocols.Store]);
|
||||
|
||||
const messages: DecodedMessage[] = [];
|
||||
log("Retrieve messages from store");
|
||||
log.info("Retrieve messages from store");
|
||||
|
||||
for await (const query of waku2.store.queryGenerator([
|
||||
eciesDecoder,
|
||||
|
|
|
@ -8,12 +8,12 @@ import {
|
|||
} from "@waku/core";
|
||||
import { LightNode, Protocols } from "@waku/interfaces";
|
||||
import { createLightNode } from "@waku/sdk";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { expect } from "chai";
|
||||
import debug from "debug";
|
||||
|
||||
import { delay, NimGoNode, NOISE_KEY_1 } from "../../src";
|
||||
|
||||
export const log = debug("waku:test:store");
|
||||
export const log = new Logger("test:store");
|
||||
|
||||
export const TestContentTopic = "/test/1/waku-store/utf8";
|
||||
export const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
|
||||
|
@ -75,7 +75,7 @@ export async function startAndConnectLightNode(
|
|||
await waku.start();
|
||||
await waku.dial(await instance.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
log("Waku node created");
|
||||
log.info("Waku node created");
|
||||
return waku;
|
||||
}
|
||||
|
||||
|
|
|
@ -2,5 +2,14 @@ module.exports = {
|
|||
parserOptions: {
|
||||
tsconfigRootDir: __dirname,
|
||||
project: "./tsconfig.dev.json"
|
||||
}
|
||||
},
|
||||
overrides: [
|
||||
{
|
||||
files: ["src/logger/index.ts"],
|
||||
rules: {
|
||||
"no-console": "off",
|
||||
"no-restricted-imports": "off"
|
||||
}
|
||||
}
|
||||
]
|
||||
};
|
||||
|
|
|
@ -1 +1,2 @@
|
|||
export * from "./common/index.js";
|
||||
export { Logger } from "./logger/index.js";
|
||||
|
|
|
@ -1,12 +1,9 @@
|
|||
import type { Connection } from "@libp2p/interface/connection";
|
||||
import type { PeerId } from "@libp2p/interface/peer-id";
|
||||
import type { Peer, PeerStore } from "@libp2p/interface/peer-store";
|
||||
import debug from "debug";
|
||||
|
||||
import { bytesToUtf8 } from "../bytes/index.js";
|
||||
|
||||
const log = debug("waku:libp2p-utils");
|
||||
|
||||
/**
|
||||
* Returns a pseudo-random peer that supports the given protocol.
|
||||
* Useful for protocols such as store and light push
|
||||
|
@ -107,7 +104,6 @@ export async function selectPeerForProtocol(
|
|||
// Do not break as we want to keep the last value
|
||||
}
|
||||
}
|
||||
log(`Using codec ${protocol}`);
|
||||
if (!protocol) {
|
||||
throw new Error(
|
||||
`Peer does not register required protocols (${peer.id.toString()}): ${protocols}`
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
import debug, { Debugger } from "debug";
|
||||
|
||||
const APP_NAME = "js-waku";
|
||||
|
||||
export class Logger {
|
||||
private _info: Debugger;
|
||||
private _warn: Debugger;
|
||||
private _error: Debugger;
|
||||
|
||||
static createDebugNamespace(level: string, prefix?: string): string {
|
||||
return prefix ? `${APP_NAME}:${level}:${prefix}` : `${APP_NAME}:${level}`;
|
||||
}
|
||||
|
||||
constructor(prefix?: string) {
|
||||
this._info = debug(Logger.createDebugNamespace("INFO", prefix));
|
||||
this._warn = debug(Logger.createDebugNamespace("WARN", prefix));
|
||||
this._error = debug(Logger.createDebugNamespace("ERROR", prefix));
|
||||
|
||||
this._info.log = console.info.bind(console);
|
||||
this._warn.log = console.warn.bind(console);
|
||||
this._error.log = console.error.bind(console);
|
||||
}
|
||||
|
||||
get info(): Debugger {
|
||||
return this._info;
|
||||
}
|
||||
|
||||
get warn(): Debugger {
|
||||
return this._warn;
|
||||
}
|
||||
|
||||
get error(): Debugger {
|
||||
return this._error;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue