2023-11-09 16:37:18 -08:00
import { sha256 } from "@noble/hashes/sha256" ;
2023-11-16 15:17:17 +03:00
import {
2024-01-18 00:37:25 -08:00
DEFAULT_CLUSTER_ID ,
2023-11-16 15:17:17 +03:00
DefaultPubsubTopic ,
PubsubTopic ,
2024-01-19 20:42:52 +05:30
ShardInfo ,
2023-11-16 15:17:17 +03:00
ShardingParams ,
SingleShardInfo
} from "@waku/interfaces" ;
2023-10-10 20:18:02 +05:30
2023-11-09 16:37:18 -08:00
import { concat , utf8ToBytes } from "../bytes/index.js" ;
2023-11-28 15:57:18 +05:30
export const singleShardInfoToPubsubTopic = (
shardInfo : SingleShardInfo
) : PubsubTopic = > {
2023-11-29 17:37:59 +05:30
if ( shardInfo . clusterId === undefined || shardInfo . shard === undefined )
2023-11-28 15:57:18 +05:30
throw new Error ( "Invalid shard" ) ;
2023-11-29 17:37:59 +05:30
return ` /waku/2/rs/ ${ shardInfo . clusterId } / ${ shardInfo . shard } ` ;
2023-11-28 15:57:18 +05:30
} ;
2024-01-19 20:42:52 +05:30
export const singleShardInfosToShardInfo = (
singleShardInfos : SingleShardInfo [ ]
) : ShardInfo = > {
if ( singleShardInfos . length === 0 ) throw new Error ( "Invalid shard" ) ;
const clusterIds = singleShardInfos . map ( ( shardInfo ) = > shardInfo . clusterId ) ;
if ( new Set ( clusterIds ) . size !== 1 ) {
throw new Error ( "Passed shard infos have different clusterIds" ) ;
}
const shards = singleShardInfos
. map ( ( shardInfo ) = > shardInfo . shard )
. filter ( ( shard ) : shard is number = > shard !== undefined ) ;
return {
clusterId : singleShardInfos [ 0 ] . clusterId ,
shards
} ;
} ;
2023-11-14 21:22:52 +05:30
export const shardInfoToPubsubTopics = (
2024-01-18 00:37:25 -08:00
shardInfo : Partial < ShardingParams >
2023-11-14 21:22:52 +05:30
) : PubsubTopic [ ] = > {
2024-01-18 00:37:25 -08:00
if ( "contentTopics" in shardInfo && shardInfo . contentTopics ) {
2024-01-09 23:34:30 -08:00
// Autosharding: explicitly defined content topics
return Array . from (
new Set (
shardInfo . contentTopics . map ( ( contentTopic ) = >
contentTopicToPubsubTopic ( contentTopic , shardInfo . clusterId )
)
)
2023-11-16 15:17:17 +03:00
) ;
2024-01-09 23:34:30 -08:00
} else if ( "shards" in shardInfo ) {
// Static sharding
2023-11-16 15:17:17 +03:00
if ( shardInfo . shards === undefined ) throw new Error ( "Invalid shard" ) ;
2024-01-09 23:34:30 -08:00
return Array . from (
new Set (
shardInfo . shards . map (
2024-01-18 00:37:25 -08:00
( index ) = >
` /waku/2/rs/ ${ shardInfo . clusterId ? ? DEFAULT_CLUSTER_ID } / ${ index } `
2024-01-09 23:34:30 -08:00
)
)
2023-11-16 15:17:17 +03:00
) ;
2024-01-18 00:37:25 -08:00
} else if ( "application" in shardInfo && "version" in shardInfo ) {
2024-01-09 23:34:30 -08:00
// Autosharding: single shard from application and version
return [
contentTopicToPubsubTopic (
` / ${ shardInfo . application } / ${ shardInfo . version } /default/default `
)
] ;
2024-01-18 00:37:25 -08:00
} else {
throw new Error ( "Missing required configuration in shard parameters" ) ;
2023-11-16 15:17:17 +03:00
}
2023-10-10 20:18:02 +05:30
} ;
2023-09-27 15:28:07 +05:30
2023-11-28 15:57:18 +05:30
export const pubsubTopicToSingleShardInfo = (
pubsubTopics : PubsubTopic
) : SingleShardInfo = > {
const parts = pubsubTopics . split ( "/" ) ;
2023-11-29 17:37:59 +05:30
if (
parts . length != 6 ||
parts [ 1 ] !== "waku" ||
parts [ 2 ] !== "2" ||
parts [ 3 ] !== "rs"
)
throw new Error ( "Invalid pubsub topic" ) ;
2023-11-28 15:57:18 +05:30
2023-11-29 17:37:59 +05:30
const clusterId = parseInt ( parts [ 4 ] ) ;
const shard = parseInt ( parts [ 5 ] ) ;
if ( isNaN ( clusterId ) || isNaN ( shard ) )
throw new Error ( "Invalid clusterId or shard" ) ;
return {
clusterId ,
shard
} ;
2023-11-28 15:57:18 +05:30
} ;
2024-03-11 18:50:34 +05:30
//TODO: move part of BaseProtocol instead of utils
// return `SendError.TOPIC_NOT_CONFIGURED` instead of throwing
2023-09-27 15:28:07 +05:30
export function ensurePubsubTopicIsConfigured (
2023-11-14 21:22:52 +05:30
pubsubTopic : PubsubTopic ,
configuredTopics : PubsubTopic [ ]
2023-09-27 15:28:07 +05:30
) : void {
if ( ! configuredTopics . includes ( pubsubTopic ) ) {
throw new Error (
2023-11-14 21:22:52 +05:30
` Pubsub topic ${ pubsubTopic } has not been configured on this instance. Configured topics are: ${ configuredTopics } . Please update your configuration by passing in the topic during Waku node instantiation. `
2023-09-27 15:28:07 +05:30
) ;
}
}
2023-11-07 20:06:44 -08:00
2023-11-09 16:37:18 -08:00
interface ContentTopic {
generation : number ;
application : string ;
version : string ;
topicName : string ;
encoding : string ;
}
2023-11-07 20:06:44 -08:00
/ * *
* Given a string , will throw an error if it is not formatted as a valid content topic for autosharding based on https : //rfc.vac.dev/spec/51/
* @param contentTopic String to validate
2023-11-09 16:37:18 -08:00
* @returns Object with each content topic field as an attribute
2023-11-07 20:06:44 -08:00
* /
2023-11-09 16:37:18 -08:00
export function ensureValidContentTopic ( contentTopic : string ) : ContentTopic {
2023-11-07 20:06:44 -08:00
const parts = contentTopic . split ( "/" ) ;
if ( parts . length < 5 || parts . length > 6 ) {
throw Error ( "Content topic format is invalid" ) ;
}
// Validate generation field if present
2023-11-09 16:37:18 -08:00
let generation = 0 ;
2023-11-07 20:06:44 -08:00
if ( parts . length == 6 ) {
2023-11-09 16:37:18 -08:00
generation = parseInt ( parts [ 1 ] ) ;
2023-11-07 20:06:44 -08:00
if ( isNaN ( generation ) ) {
throw new Error ( "Invalid generation field in content topic" ) ;
}
if ( generation > 0 ) {
throw new Error ( "Generation greater than 0 is not supported" ) ;
}
}
// Validate remaining fields
const fields = parts . splice ( - 4 ) ;
// Validate application field
if ( fields [ 0 ] . length == 0 ) {
throw new Error ( "Application field cannot be empty" ) ;
}
// Validate version field
if ( fields [ 1 ] . length == 0 ) {
throw new Error ( "Version field cannot be empty" ) ;
}
// Validate topic name field
if ( fields [ 2 ] . length == 0 ) {
throw new Error ( "Topic name field cannot be empty" ) ;
}
// Validate encoding field
if ( fields [ 3 ] . length == 0 ) {
throw new Error ( "Encoding field cannot be empty" ) ;
}
2023-11-09 16:37:18 -08:00
return {
generation ,
application : fields [ 0 ] ,
version : fields [ 1 ] ,
topicName : fields [ 2 ] ,
encoding : fields [ 3 ]
} ;
}
/ * *
* Given a string , determines which autoshard index to use for its pubsub topic .
* Based on the algorithm described in the RFC : https : //rfc.vac.dev/spec/51//#algorithm
* /
export function contentTopicToShardIndex (
contentTopic : string ,
networkShards : number = 8
) : number {
const { application , version } = ensureValidContentTopic ( contentTopic ) ;
const digest = sha256 (
concat ( [ utf8ToBytes ( application ) , utf8ToBytes ( version ) ] )
) ;
const dataview = new DataView ( digest . buffer . slice ( - 8 ) ) ;
return Number ( dataview . getBigUint64 ( 0 , false ) % BigInt ( networkShards ) ) ;
}
export function contentTopicToPubsubTopic (
contentTopic : string ,
2024-01-18 00:37:25 -08:00
clusterId : number = DEFAULT_CLUSTER_ID ,
2023-11-09 16:37:18 -08:00
networkShards : number = 8
) : string {
const shardIndex = contentTopicToShardIndex ( contentTopic , networkShards ) ;
return ` /waku/2/rs/ ${ clusterId } / ${ shardIndex } ` ;
2023-11-07 20:06:44 -08:00
}
2023-11-16 15:17:17 +03:00
/ * *
* Given an array of content topics , groups them together by their Pubsub topic as derived using the algorithm for autosharding .
* If any of the content topics are not properly formatted , the function will throw an error .
* /
export function contentTopicsByPubsubTopic (
contentTopics : string [ ] ,
2024-01-18 00:37:25 -08:00
clusterId : number = DEFAULT_CLUSTER_ID ,
2023-11-16 15:17:17 +03:00
networkShards : number = 8
) : Map < string , Array < string > > {
const groupedContentTopics = new Map ( ) ;
for ( const contentTopic of contentTopics ) {
const pubsubTopic = contentTopicToPubsubTopic (
contentTopic ,
clusterId ,
networkShards
) ;
let topics = groupedContentTopics . get ( pubsubTopic ) ;
if ( ! topics ) {
groupedContentTopics . set ( pubsubTopic , [ ] ) ;
topics = groupedContentTopics . get ( pubsubTopic ) ;
}
topics . push ( contentTopic ) ;
}
return groupedContentTopics ;
}
/ * *
* Used when creating encoders / decoders to determine which pubsub topic to use
* /
export function determinePubsubTopic (
contentTopic : string ,
2024-01-09 23:34:30 -08:00
pubsubTopicShardInfo : SingleShardInfo | PubsubTopic = DefaultPubsubTopic
2023-11-16 15:17:17 +03:00
) : string {
2024-01-09 23:34:30 -08:00
if ( typeof pubsubTopicShardInfo == "string" ) {
return pubsubTopicShardInfo ;
} else {
return pubsubTopicShardInfo
? pubsubTopicShardInfo . shard
? singleShardInfoToPubsubTopic ( pubsubTopicShardInfo )
: contentTopicToPubsubTopic (
contentTopic ,
pubsubTopicShardInfo . clusterId
)
: DefaultPubsubTopic ;
}
2023-11-16 15:17:17 +03:00
}
2024-01-18 00:37:25 -08:00
/ * *
* Validates sharding configuration and sets defaults where possible .
* @returns Validated sharding parameters , with any missing values set to defaults
* /
export const ensureShardingConfigured = (
shardInfo : Partial < ShardingParams >
) : {
shardingParams : ShardingParams ;
shardInfo : ShardInfo ;
pubsubTopics : PubsubTopic [ ] ;
} = > {
const clusterId = shardInfo . clusterId ? ? DEFAULT_CLUSTER_ID ;
const shards = "shards" in shardInfo ? shardInfo . shards : [ ] ;
const contentTopics =
"contentTopics" in shardInfo ? shardInfo . contentTopics : [ ] ;
const [ application , version ] =
"application" in shardInfo && "version" in shardInfo
? [ shardInfo . application , shardInfo . version ]
: [ undefined , undefined ] ;
const isShardsConfigured = shards && shards . length > 0 ;
const isContentTopicsConfigured = contentTopics && contentTopics . length > 0 ;
const isApplicationVersionConfigured = application && version ;
if ( isShardsConfigured ) {
return {
shardingParams : { clusterId , shards } ,
shardInfo : { clusterId , shards } ,
pubsubTopics : shardInfoToPubsubTopics ( { clusterId , shards } )
} ;
}
if ( isContentTopicsConfigured ) {
const pubsubTopics = Array . from (
new Set (
contentTopics . map ( ( topic ) = >
contentTopicToPubsubTopic ( topic , clusterId )
)
)
) ;
const shards = Array . from (
2024-01-25 20:07:58 -08:00
new Set ( contentTopics . map ( ( topic ) = > contentTopicToShardIndex ( topic ) ) )
2024-01-18 00:37:25 -08:00
) ;
return {
shardingParams : { clusterId , contentTopics } ,
shardInfo : { clusterId , shards } ,
pubsubTopics
} ;
}
if ( isApplicationVersionConfigured ) {
const pubsubTopic = contentTopicToPubsubTopic (
` / ${ application } / ${ version } /default/default `
) ;
return {
shardingParams : { clusterId , application , version } ,
shardInfo : {
clusterId ,
shards : [ pubsubTopicToSingleShardInfo ( pubsubTopic ) . shard ]
} ,
pubsubTopics : [ pubsubTopic ]
} ;
}
throw new Error (
"Missing minimum required configuration options for static sharding or autosharding."
) ;
} ;