Expose edge-level score flows in timelinePagerank (#1802)

This commit makes a big step towards realizing the v3 output format
(see #1773). Specifically, it modifies the timelinePagerank interval
result format so that in addition to the distribution (the score for
each ndoe), we also track:
- How much score flowed to each node from the seed
- How much score flowed across each edge in a forwards (src->dst)
direction
- How much score flowed across each edge in a backwards (dst->src)
direction
- How much score flowed to each node from its synthetic self loop

The result is that we can now precisely decompose where a node's score
came from (and where it went to). Specficially, for every node we have
the invariant that the node's score is equal to the sum of its seed
score, its synthetic loop score, and the forward flow on each edge for
which the node was dst, and the backward flow for each edge on which the
node was src.

Test plan:
I've added unit tests which verify that the invariant above holds for
real PageRank results on a small example graph.
This commit is contained in:
Dandelion Mané 2020-05-30 14:37:23 -07:00 committed by GitHub
parent 27aeacc906
commit 5279c34c6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 169 additions and 4 deletions

View File

@ -12,11 +12,19 @@ describe("src/core/algorithm/distributionToCred", () => {
interval: {startTimeMs: 0, endTimeMs: 10},
intervalWeight: 2,
distribution: new Float64Array([0.5, 0.5]),
backwardFlow: new Float64Array([]),
forwardFlow: new Float64Array([]),
seedFlow: new Float64Array([0.5, 0.5]),
syntheticLoopFlow: new Float64Array([0, 0]),
},
{
interval: {startTimeMs: 10, endTimeMs: 20},
intervalWeight: 10,
distribution: new Float64Array([0.9, 0.1]),
backwardFlow: new Float64Array([]),
forwardFlow: new Float64Array([]),
seedFlow: new Float64Array([0.5, 0.5]),
syntheticLoopFlow: new Float64Array([0, 0]),
},
];
const nodeOrder = [na("foo"), na("bar")];
@ -39,11 +47,19 @@ describe("src/core/algorithm/distributionToCred", () => {
interval: {startTimeMs: 0, endTimeMs: 10},
intervalWeight: 2,
distribution: new Float64Array([0.5, 0.5]),
backwardFlow: new Float64Array([]),
forwardFlow: new Float64Array([]),
seedFlow: new Float64Array([0.5, 0.5]),
syntheticLoopFlow: new Float64Array([0, 0]),
},
{
interval: {startTimeMs: 10, endTimeMs: 20},
intervalWeight: 10,
distribution: new Float64Array([0.9, 0.1]),
backwardFlow: new Float64Array([]),
forwardFlow: new Float64Array([]),
seedFlow: new Float64Array([0.5, 0.5]),
syntheticLoopFlow: new Float64Array([0, 0]),
},
];
const nodeOrder = [na("foo"), na("bar")];
@ -66,11 +82,19 @@ describe("src/core/algorithm/distributionToCred", () => {
interval: {startTimeMs: 0, endTimeMs: 10},
intervalWeight: 2,
distribution: new Float64Array([0.5, 0.5]),
backwardFlow: new Float64Array([]),
forwardFlow: new Float64Array([]),
seedFlow: new Float64Array([0.5, 0.5]),
syntheticLoopFlow: new Float64Array([0, 0]),
},
{
interval: {startTimeMs: 10, endTimeMs: 20},
intervalWeight: 10,
distribution: new Float64Array([0.9, 0.1]),
backwardFlow: new Float64Array([]),
forwardFlow: new Float64Array([]),
seedFlow: new Float64Array([0.9, 0.1]),
syntheticLoopFlow: new Float64Array([0, 0]),
},
];
const nodeOrder = [na("foo"), na("bar")];
@ -93,6 +117,10 @@ describe("src/core/algorithm/distributionToCred", () => {
interval: {startTimeMs: 0, endTimeMs: 10},
intervalWeight: 2,
distribution: new Float64Array([0.5, 0.5]),
backwardFlow: new Float64Array([]),
forwardFlow: new Float64Array([]),
seedFlow: new Float64Array([0.5, 0.5]),
syntheticLoopFlow: new Float64Array([0, 0]),
},
];
const nodeOrder = [na("foo"), na("bar")];
@ -112,6 +140,10 @@ describe("src/core/algorithm/distributionToCred", () => {
interval: {startTimeMs: 0, endTimeMs: 10},
intervalWeight: 2,
distribution: new Float64Array([1, 0]),
backwardFlow: new Float64Array([]),
forwardFlow: new Float64Array([]),
seedFlow: new Float64Array([1, 0]),
syntheticLoopFlow: new Float64Array([0, 0]),
},
];
const nodeOrder = [na("foo"), na("bar")];
@ -136,11 +168,19 @@ describe("src/core/algorithm/distributionToCred", () => {
interval: {startTimeMs: 0, endTimeMs: 10},
intervalWeight: 2,
distribution: new Float64Array([0.5, 0.5]),
backwardFlow: new Float64Array([]),
forwardFlow: new Float64Array([]),
seedFlow: new Float64Array([0.5, 0.5]),
syntheticLoopFlow: new Float64Array([0, 0]),
},
{
interval: {startTimeMs: 10, endTimeMs: 20},
intervalWeight: 10,
distribution: new Float64Array([0.9, 0.1]),
backwardFlow: new Float64Array([]),
forwardFlow: new Float64Array([]),
seedFlow: new Float64Array([0.9, 0.1]),
syntheticLoopFlow: new Float64Array([0, 0]),
},
];
const nodeOrder = [na("foo"), na("bar")];

View File

@ -6,7 +6,13 @@
import deepFreeze from "deep-freeze";
import {sum} from "d3-array";
import * as NullUtil from "../../util/null";
import {Graph, type NodeAddressT, type Edge, type Node} from "../graph";
import {
Graph,
type NodeAddressT,
type EdgeAddressT,
type Edge,
type Node,
} from "../graph";
import {type WeightedGraph} from "../weightedGraph";
import {type Interval, partitionGraph} from "../interval";
import {
@ -20,6 +26,7 @@ import {type Distribution} from "./distribution";
import {
createOrderedSparseMarkovChain,
createConnections,
adjacencySource,
type NodeToConnections,
} from "./graphToMarkovChain";
import {findStationaryDistribution, type PagerankParams} from "./markovChain";
@ -34,6 +41,22 @@ export type IntervalResult = {|
// The raw score distribution over nodes for this interval (i.e. sums to 1).
// Uses the canonical graph node order.
+distribution: Distribution,
// For each edge, how much forward score flow there was (in terms of raw
// probability mass). Uses the canonical graph edge order.
+forwardFlow: Distribution,
// For each edge, how much backward score flow there was (in terms of raw
// probability mass). Uses the canonical graph edge order.
+backwardFlow: Distribution,
// For each node, how much score flowed along its synthetic self loop edge.
+syntheticLoopFlow: Distribution,
// For each node, how much score flowed to it from the seed vector.
+seedFlow: Distribution,
// Invariant: A node's score in the distribution is equal (modulo floating point error)
// to the sum of:
// - its seedFlow
// - its syntheticLoopFlow
// - the forwardFlow of each edge for which the node is the destination
// - the backwardFlow of each edge for which the node is the source
|};
/**
* Represents raw PageRank distributions on a graph over time.
@ -122,6 +145,9 @@ export async function timelinePagerank(
const nodeOrder = Array.from(weightedGraph.graph.nodes()).map(
(x) => x.address
);
const edgeOrder = Array.from(
weightedGraph.graph.edges({showDangling: false})
).map((x) => x.address);
const nodeWeightIterator = _timelineNodeWeights(
nodeCreationHistory,
nodeEvaluator,
@ -135,6 +161,7 @@ export async function timelinePagerank(
);
return _computeTimelineDistribution(
nodeOrder,
edgeOrder,
intervals,
nodeWeightIterator,
nodeToConnectionsIterator,
@ -195,6 +222,7 @@ export function* _timelineNodeToConnections(
// Modify with care.
export async function _computeTimelineDistribution(
nodeOrder: $ReadOnlyArray<NodeAddressT>,
edgeOrder: $ReadOnlyArray<EdgeAddressT>,
intervals: $ReadOnlyArray<Interval>,
nodeWeightIterator: Iterator<Map<NodeAddressT, number>>,
nodeToConnectionsIterator: Iterator<NodeToConnections>,
@ -202,6 +230,7 @@ export async function _computeTimelineDistribution(
): Promise<TimelineDistributions> {
const results = [];
let pi0: Distribution | null = null;
for (const interval of intervals) {
const nodeWeights = NullUtil.get(nodeWeightIterator.next().value);
const nodeToConnections = NullUtil.get(
@ -211,6 +240,7 @@ export async function _computeTimelineDistribution(
nodeWeights,
nodeToConnections,
nodeOrder,
edgeOrder,
interval,
pi0,
alpha
@ -227,11 +257,14 @@ export async function _intervalResult(
nodeWeights: Map<NodeAddressT, number>,
nodeToConnections: NodeToConnections,
nodeOrder: $ReadOnlyArray<NodeAddressT>,
edgeOrder: $ReadOnlyArray<EdgeAddressT>,
interval: Interval,
pi0: Distribution | null,
alpha: number
): Promise<IntervalResult> {
const {chain} = createOrderedSparseMarkovChain(nodeToConnections);
const nodeToIndex = new Map(nodeOrder.map((x, i) => [x, i]));
const edgeToIndex = new Map(edgeOrder.map((x, i) => [x, i]));
const seed = weightedDistribution(nodeOrder, nodeWeights);
if (pi0 == null) {
@ -245,9 +278,63 @@ export async function _intervalResult(
yieldAfterMs: 30,
});
const intervalWeight = sum(nodeWeights.values());
const forwardFlow = new Float64Array(edgeOrder.length);
const backwardFlow = new Float64Array(edgeOrder.length);
const syntheticLoopFlow = new Float64Array(nodeOrder.length);
const seedFlow = seed.map((x) => x * alpha);
for (const [target, connections] of nodeToConnections.entries()) {
for (const {adjacency, weight} of connections) {
// We now iterate over every "adjacency" in the markov chain. Every edge corresponds to
// two adjacencies (one forward, one backward), and every synthetic loop also corresponds
// to one adjacency.
// The adjacencies are used to construct the underling Markov Chain, and crucially their
// "weights" are normalized probabilities rather than raw pre-normalized weights. This means
// we can now calculate the exact amount of score that flowed along each adjacency.
// The score flow on an adjacency is equal to (1-alpha) * (sourceScore * adjacencyWeight),
// where sourceScore is the score of the node that is the "source" of that adjacency (the
// src for an IN_EDGE adjacency, the dst for an OUT_EDGE adjacency, or the node itself
// for a synthetic loop adjacency). Since the sum of the outbound weights for any source
// is `1`, we have an invariant that the total outbound score flow for any node is that node's
// score times (1-alpha), which satisfies the PageRank property.
const source = adjacencySource(target, adjacency);
const sourceIndex = NullUtil.get(nodeToIndex.get(source));
const contribution =
weight * distributionResult.pi[sourceIndex] * (1 - alpha);
switch (adjacency.type) {
case "SYNTHETIC_LOOP": {
syntheticLoopFlow[sourceIndex] = contribution;
break;
}
case "IN_EDGE": {
// IN_EDGE from the perspective of the target, i.e. it's forward flow
const edgeIndex = NullUtil.get(
edgeToIndex.get(adjacency.edge.address)
);
forwardFlow[edgeIndex] = contribution;
break;
}
case "OUT_EDGE": {
// OUT_EDGE from the perspective of the target, i.e. it's backwards flow
const edgeIndex = NullUtil.get(
edgeToIndex.get(adjacency.edge.address)
);
backwardFlow[edgeIndex] = contribution;
break;
}
default: {
throw new Error((adjacency.type: empty));
}
}
}
}
return {
interval,
intervalWeight,
distribution: distributionResult.pi,
forwardFlow,
backwardFlow,
syntheticLoopFlow,
seedFlow,
};
}

View File

@ -3,7 +3,7 @@
import {sum} from "d3-array";
import * as NullUtil from "../../util/null";
import {node, edge, advancedGraph} from "../graphTestUtil";
import {Graph, type EdgeAddressT, type Edge} from "../graph";
import {Graph, type NodeAddressT, type EdgeAddressT, type Edge} from "../graph";
import {
_timelineNodeWeights,
_timelineNodeToConnections,
@ -119,6 +119,9 @@ describe("src/core/algorithm/timelinePagerank", () => {
const edgeFn = (_unused_edge) => ({forwards: 1, backwards: 0.5});
const nodeToConnections = createConnections(g, edgeFn, 1e-3);
const nodeOrder = Array.from(g.nodes()).map((x) => x.address);
const edgeOrder = Array.from(g.edges({showDangling: false})).map(
(x) => x.address
);
const interval = {endTimeMs: 1000, startTimeMs: 0};
const pi0 = null;
const alpha = 0.05;
@ -126,6 +129,7 @@ describe("src/core/algorithm/timelinePagerank", () => {
nodeWeights,
nodeToConnections,
nodeOrder,
edgeOrder,
interval,
pi0,
alpha
@ -134,6 +138,7 @@ describe("src/core/algorithm/timelinePagerank", () => {
graph: g,
nodes,
nodeOrder,
edgeOrder,
nodeWeights,
edgeFn,
nodeToConnections,
@ -164,11 +169,44 @@ describe("src/core/algorithm/timelinePagerank", () => {
const isoScore = getScore(nodes.isolated);
const srcScore = getScore(nodes.src);
const dstScore = getScore(nodes.dst);
expect(isoScore + srcScore + dstScore).toBeCloseTo(1);
expect(isoScore + srcScore + dstScore).toBeCloseTo(1, 4);
// It has 2/3rd weight, and is isolated, so it's simple
expect(isoScore).toBeCloseTo(2 / 3);
expect(isoScore).toBeCloseTo(2 / 3, 4);
// src has the weight, and dst doesnt, so it should have a higher score
expect(srcScore).toBeGreaterThan(dstScore);
});
it("satisfies the flow conservation invariants", async () => {
const {nodeOrder, edgeOrder, result, graph} = await example();
const {
distribution,
forwardFlow,
backwardFlow,
syntheticLoopFlow,
seedFlow,
} = result;
// For any node: Its score is equal to the sum of:
// - The score it received from the seed vector
// - The score it received from every in edge
// - The score it received from every out edge
// - The score it received from its self loop
const nodeToExpectedScore = new Map();
function addScore(a: NodeAddressT, score: number) {
const existing = nodeToExpectedScore.get(a) || 0;
nodeToExpectedScore.set(a, existing + score);
}
nodeOrder.forEach((na, i) => {
addScore(na, seedFlow[i] + syntheticLoopFlow[i]);
});
edgeOrder.forEach((ea, i) => {
const {src, dst} = NullUtil.get(graph.edge(ea));
addScore(src, backwardFlow[i]);
addScore(dst, forwardFlow[i]);
});
nodeOrder.forEach((na, i) => {
const expected = nodeToExpectedScore.get(na);
const actual = distribution[i];
expect(expected).toBeCloseTo(actual, 4);
});
});
});
});