diff --git a/src/analysis/timeline/timelinePagerank.js b/src/analysis/timeline/timelinePagerank.js new file mode 100644 index 0000000..f7a42ca --- /dev/null +++ b/src/analysis/timeline/timelinePagerank.js @@ -0,0 +1,243 @@ +// @flow + +/** + * Core logic for computing timeline PageRank on a graph. + */ +import {sum} from "d3-array"; +import * as NullUtil from "../../util/null"; +import {Graph, type NodeAddressT, type Edge, type Node} from "../../core/graph"; +import {type NodeAndEdgeTypes} from "../types"; +import {type Weights} from "../weights"; +import {type Interval, partitionGraph} from "./interval"; +import { + nodeWeightEvaluator, + edgeWeightEvaluator, + type NodeWeightEvaluator, + type EdgeWeightEvaluator, +} from "../weightEvaluator"; +import {weightedDistribution} from "../../core/attribution/nodeDistribution"; +import {type Distribution} from "../../core/attribution/distribution"; +import { + createOrderedSparseMarkovChain, + createConnections, +} from "../../core/attribution/graphToMarkovChain"; +import { + findStationaryDistribution, + type PagerankParams, + type SparseMarkovChain, +} from "../../core/attribution/markovChain"; + +/** + * Represents raw PageRank distributions on a graph over time. + */ +export type TimelineDistributions = $ReadOnlyArray<{| + // The interval for this slice + +interval: Interval, + // The total node weight within this interval (normalized to account for the + // exponential decay). We can use this to normalize the amount of cred + // created in this interval. + +intervalWeight: number, + // The raw score distribution over nodes for this interval (i.e. sums to 1). + // Uses the canonical graph node order. + +distribution: Distribution, +|}>; + +/** + * Runs timeline PageRank on a graph. + * + * The graph is partitioned into weeklong intervals. For each interval, we run + * PageRank, assigning seed weight to nodes which have already been created, + * and considering edges which have already been created. Node weights and edge + * weights both decay by the `intervalDecay` every week. + * + * Detailed description: + * + * First, we partition the graph into week-long intervals using the `interval` + * module. For each such interval, we will produce a score distribution over + * all the nodes in the graph, and a total weight for the interval, based on + * the decayed weights of all the nodes that had been created as of that + * interval. + * + * To get the score distribution, we create a node weight map for each + * interval, and a markov chain for each interval. + * + * In the node weight map, every node is initially assigned full weight in the + * interval of its creation, and in every interval thereafter its weight decays + * by `intervalDecay`. However, note that the 'full weight' is normalized based + * on the interval decay. To make this concrete, suppose that we have an + * interval decay of 0.5, and a node with a base weight of 8. We want the total + * weight given to this node (across all intervals) to equal 8, so that + * weight-based cred normalization will be independent of the interval decay. + * So we give it a normalized weight of 4 in the first interval, 2 in the + * second, and so forth. + * + * For each interval, we also create a markov chain which describes the graph + * state as of that interval. That markov chain only contains weight for edges + * which were already created as of the interval; any edges that are not yet + * created effectively have a weight of 0. Like the node weights, edge weights + * are created with full weight, and then decay by `intervalDecay` each + * interval. Unlike the node weights, we don't need to normalize based on + * intervalDecay, since markov chains always normalize the edge weights. + * + * Given the markov chain and node weights for the interval, we compute the + * score distribution by running finding the stationary distribution for that + * markov chain, using the node weights as the seed vector. As ever, the + * `alpha` parameter determines how much probability mass returns to the seed + * vector. (See the docs in markovChain.js for more details.) The score + * distribution uses the Graph's canonical node ordering. + * + * The output array contains an element for each interval in the graph + * partitioning. Each element contains the interval, the distribution, as well + * as the sum of all node weights for that interval (so that we can normalize + * cred). + * + * The method is factored apart into a wrapper function which does validation, + * and iterators that produce the node weights and the markov chains. This + * facilitates testing the timelime logic separately. Finally, we compose all + * the pieces and run PageRank for each interval. + */ +export async function timelinePagerank( + graph: Graph, + types: NodeAndEdgeTypes, + weights: Weights, + intervalDecay: number, + alpha: number +): Promise { + if (intervalDecay < 0 || intervalDecay > 1 || !isFinite(intervalDecay)) { + throw new Error(`invalid intervalDecay: ${intervalDecay}`); + } + if (alpha < 0 || alpha > 1 || !isFinite(alpha)) { + throw new Error(`invalid alpha: ${alpha}`); + } + // Produce the evaluators we will use to get the baseline weight for each + // node and edge + const nodeEvaluator = nodeWeightEvaluator(types.nodeTypes, weights); + const edgeEvaluator = edgeWeightEvaluator(types.edgeTypes, weights); + + const graphPartitionSlices = partitionGraph(graph); + if (graphPartitionSlices.length === 0) { + return []; + } + const intervals = graphPartitionSlices.map((x) => x.interval); + const nodeCreationHistory = graphPartitionSlices.map((x) => x.nodes); + const edgeCreationHistory = graphPartitionSlices.map((x) => x.edges); + const nodeOrder = Array.from(graph.nodes()).map((x) => x.address); + const nodeWeightIterator = _timelineNodeWeights( + nodeCreationHistory, + nodeEvaluator, + intervalDecay + ); + const markovChainIterator = _timelineMarkovChain( + graph, + edgeCreationHistory, + edgeEvaluator, + intervalDecay + ); + return _computeTimelineDistribution( + nodeOrder, + intervals, + nodeWeightIterator, + markovChainIterator, + alpha + ); +} + +export function* _timelineNodeWeights( + nodeCreationHistory: $ReadOnlyArray<$ReadOnlyArray>, + nodeEvaluator: NodeWeightEvaluator, + intervalDecay: number +): Iterator> { + let lastNodeWeights = new Map(); + for (const nodes of nodeCreationHistory) { + const nodeWeights = new Map(); + // Decay all the previous weights. + for (const [address, weight] of lastNodeWeights.entries()) { + nodeWeights.set(address, weight * intervalDecay); + } + // Add new nodes at full weight. + for (const {address} of nodes) { + // Normalize by (1 - intervalDecay) so that the total weight of a node across + // intervals converges to the full base weight + const normalizedWeight = nodeEvaluator(address) * (1 - intervalDecay); + nodeWeights.set(address, normalizedWeight); + } + yield nodeWeights; + lastNodeWeights = nodeWeights; + } +} + +export function* _timelineMarkovChain( + graph: Graph, + edgeCreationHistory: $ReadOnlyArray<$ReadOnlyArray>, + edgeEvaluator: EdgeWeightEvaluator, + intervalDecay: number +): Iterator { + const edgeWeights = new Map(); + for (const edges of edgeCreationHistory) { + for (const [address, {forwards, backwards}] of edgeWeights.entries()) { + edgeWeights.set(address, { + forwards: forwards * intervalDecay, + backwards: backwards * intervalDecay, + }); + } + for (const {address} of edges) { + edgeWeights.set(address, edgeEvaluator(address)); + } + const defaultEdgeWeight = Object.freeze({forwards: 0, backwards: 0}); + const currentEdgeWeight = (e: Edge) => { + return NullUtil.orElse(edgeWeights.get(e.address), defaultEdgeWeight); + }; + // Construct a new Markov chain corresponding to the current weights + // of the edges. + // TODO: Rather than constructing a markov chain from scratch, we can + // update the markov chain in-place. This should result in a significant + // performance improvement. We will need to change the markov chain + // representation to do so (we should add a `totalOutWeight` array to the + // chain, so that we can efficiently update the total weight as we add new + // connections, rather than needing to re-normalize the whole chain for + // each interval). + const chain = createOrderedSparseMarkovChain( + createConnections(graph, currentEdgeWeight, 1e-3) + ).chain; + yield chain; + } +} + +// Warning: This function is untested. +// Modify with care. +export async function _computeTimelineDistribution( + nodeOrder: $ReadOnlyArray, + intervals: $ReadOnlyArray, + nodeWeightIterator: Iterator>, + markovChainIterator: Iterator, + alpha: number +): Promise { + const results = []; + let pi0: Distribution | null = null; + for (const interval of intervals) { + const nodeWeights = NullUtil.get(nodeWeightIterator.next().value); + const chain = NullUtil.get(markovChainIterator.next().value); + + const seed = weightedDistribution(nodeOrder, nodeWeights); + if (pi0 == null) { + pi0 = seed; + } + const params: PagerankParams = {chain, alpha, seed, pi0}; + const distributionResult = await findStationaryDistribution(params, { + verbose: false, + convergenceThreshold: 1e-7, + maxIterations: 255, + yieldAfterMs: 30, + }); + const intervalWeight = sum(nodeWeights.values()); + results.push({ + interval, + intervalWeight, + distribution: distributionResult.pi, + }); + // Use the latest convergce results as the starting point for the next run + // of PageRank + pi0 = distributionResult.pi; + } + return results; +} diff --git a/src/analysis/timeline/timelinePagerank.test.js b/src/analysis/timeline/timelinePagerank.test.js new file mode 100644 index 0000000..3e1b5e4 --- /dev/null +++ b/src/analysis/timeline/timelinePagerank.test.js @@ -0,0 +1,101 @@ +// @flow + +import {sum} from "d3-array"; +import * as NullUtil from "../../util/null"; +import {node, edge} from "../../core/graphTestUtil"; +import {Graph, type EdgeAddressT, type Edge} from "../../core/graph"; +import {_timelineNodeWeights, _timelineMarkovChain} from "./timelinePagerank"; +import { + createConnections, + createOrderedSparseMarkovChain, + type EdgeWeight, +} from "../../core/attribution/graphToMarkovChain"; +import {type SparseMarkovChain} from "../../core/attribution/markovChain"; + +describe("src/analysis/timeline/timelinePagerank", () => { + describe("_timelineNodeWeights", () => { + it("works in a simple case", () => { + const foo = node("foo"); + const bar = node("bar"); + const nodeCreationHistory = [[foo], [], [bar]]; + const evaluator = (n) => (n === foo.address ? 4 : 1); + const weights = Array.from( + _timelineNodeWeights(nodeCreationHistory, evaluator, 0.5) + ); + const expected = [ + new Map([[foo.address, 2]]), + new Map([[foo.address, 1]]), + new Map([[foo.address, 0.5], [bar.address, 0.5]]), + ]; + expect(weights).toEqual(expected); + }); + it("properly normalizes based on the interval decay", () => { + const foo = node("foo"); + const history = new Array(100).fill([]); + history[0] = [foo]; + const evaluator = (_) => 42; + const sumWeight = (decay: number) => { + const weightsIterator = _timelineNodeWeights(history, evaluator, decay); + return sum(weightsIterator, (x) => x.get(foo.address)); + }; + expect(sumWeight(0.5)).toBeCloseTo(42); + expect(sumWeight(0.9)).toBeCloseTo(42); + expect(sumWeight(0.1)).toBeCloseTo(42); + }); + it("handles an empty history", () => { + const weights = _timelineNodeWeights([], (_) => 1, 0.5); + expect(Array.from(weights)).toHaveLength(0); + }); + }); + + describe("_timelineMarkovChain", () => { + it("works for a simple case", () => { + const a = node("a"); + const b = node("b"); + const e1 = edge("e1", a, b); + const e2 = edge("e2", a, a); + const graph = new Graph() + .addNode(a) + .addNode(b) + .addEdge(e1) + .addEdge(e2); + + function weightsToChain( + w: Map + ): SparseMarkovChain { + const edgeWeight = (e: Edge) => + NullUtil.orElse(w.get(e.address), {forwards: 0, backwards: 0}); + const connections = createConnections(graph, edgeWeight, 1e-3); + return createOrderedSparseMarkovChain(connections).chain; + } + + const edgeCreationHistory = [[], [e1], [], [e2]]; + const edgeEvaluator = (_) => ({forwards: 1, backwards: 0}); + const chainIterator = _timelineMarkovChain( + graph, + edgeCreationHistory, + edgeEvaluator, + 0.5 + ); + const chains = Array.from(chainIterator); + + const w1 = new Map(); + const chain1 = weightsToChain(w1); + expect(chains[0]).toEqual(chain1); + + const w2 = new Map().set(e1.address, {forwards: 1, backwards: 0}); + const chain2 = weightsToChain(w2); + expect(chains[1]).toEqual(chain2); + + const w3 = new Map().set(e1.address, {forwards: 1 / 2, backwards: 0}); + const chain3 = weightsToChain(w3); + expect(chains[2]).toEqual(chain3); + + const w4 = new Map() + .set(e1.address, {forwards: 1 / 4, backwards: 0}) + .set(e2.address, {forwards: 1, backwards: 0}); + const chain4 = weightsToChain(w4); + expect(chains[3]).toEqual(chain4); + }); + }); +});