Source code for processors.paths

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from processors.utils import LabelManager
from collections import Counter
import networkx as nx
import collections
import re


[docs]class DependencyUtils(object): """ A set of utilities for analyzing syntactic dependency graphs. Methods ------- build_networkx_graph(roots, edges, name) Constructs a networkx.Graph shortest_path(g, start, end) Finds the shortest path in a `networkx.Graph` between any element in a list of start nodes and any element in a list of end nodes. retrieve_edges(dep_graph, path) Converts output of `shortest_path` into a list of triples that include the grammatical relation (and direction) for each node-node "hop" in the syntactic dependency graph. simplify_tag(tag) Maps part of speech (PoS) tag to a subset of PoS tags to better consolidate categorical labels. lexicalize_path(sentence, path, words=False, lemmas=False, tags=False, simple_tags=False, entities=False, limit_to=None) Lexicalizes path in syntactic dependency graph using Odin-style token constraints. pagerank(networkx_graph, alpha=0.85, personalization=None, max_iter=1000, tol=1e-06, nstart=None, weight='weight', dangling=None) Measures node activity in a `networkx.Graph` using a thin wrapper around `networkx` implementation of pagerank algorithm (see `networkx.algorithms.link_analysis.pagerank`). Use with `processors.ds.DirectedGraph.graph`. """ UNKNOWN = LabelManager.UNKNOWN @staticmethod def build_networkx_graph(roots, edges, name, reverse=False): """ Converts a `processors` dependency graph into a networkx graph """ G = nx.DiGraph() graph_name = name # store roots G.graph["roots"] = roots # reversing the graph is useful if you want to run pagerank to highlight predicate and argument nodes if reverse: edges = [(edge.destination, edge.source, {"relation": edge.relation}) for edge in edges] else: edges = [(edge.source, edge.destination, {"relation": edge.relation}) for edge in edges] G.add_edges_from(edges) return G @staticmethod def shortest_paths(g, start, end): """ Find the shortest paths between two nodes. Note that if `g` is a directed graph, a path will not be found. Parameters ---------- g : a networkx graph The networkx graph to explore. start : int or [int] A single token index or list of token indices serving as the start of the graph traversal. end : int or [int] A single token index or list of token indices serving as the end of the graph traversal. Returns ------- None or [[(int, int)]] None if no paths are found. Otherwise, a list of lists of (source index, target index) tuples representing path segments. """ # converts single int to [int] start = start if isinstance(start, collections.Iterable) else [start] end = end if isinstance(end, collections.Iterable) else [end] # node list -> edges (i.e., (source, dest) pairs) def path_to_edges(g, path): return [(path[i], path[i+1]) for i in range(len(path) - 1)] shortest_paths = [] # pathfinding b/w pairs of nodes for s in start: for e in end: try: paths = nx.algorithms.all_shortest_paths(g, s, e) for path in paths: shortest_paths.append(path_to_edges(g, path)) # no path found... except: #print("No path found between '{}' and '{}'".format(s, e)) continue return None if len(shortest_paths) == 0 else shortest_paths @staticmethod def shortest_path(g, start, end, scoring_func=lambda path: -len(path)): """ Find the shortest path between two nodes. Note that pathfinding is sensitive to direction. If you want to ignore direction, convert your networkx.Digraph to a networkx.Graph. Parameters ---------- g : a networkx graph The networkx graph to explore. start : int or [int] A single token index or list of token indices serving as the start of the graph traversal. end : int or [int] A single token index or list of token indices serving as the end of the graph traversal. scoring_func : function A function that scores each path in a list of paths. Each path has the form [(source index, relation, destination index)]. The path with the maximum score will be returned. Returns ------- None or [(int, int)] None if no paths are found. Otherwise, a list of (source index, target index) tuples representing path segments. """ paths = DependencyUtils.shortest_paths(g, start, end) return None if len(shortest_paths) == 0 else max(paths, key=scoring_func) @staticmethod def directed_relation(source_idx, destination_idx, relation, deps): """ Converts relation to a directed relation (incoming v. outgoing) if such a relation links `source_idx` and `destination_idx` in `deps`. Parameters ---------- source_idx : int The token index for the source node destination_idx : int The token index for the destination node relation : str The undirected relation (i.e., the grammatical/semantic relation that connects the two nodes) deps : processors.ds.DirectedGraph The directed graph to be referenced Returns ------- str or None The directed relation that connects the `source_idx` to the `destination_idx` in `deps`. """ matches = [">{}".format(rel) for d, rel in deps.outgoing[source_idx] if d == destination_idx and rel == relation] + \ ["<{}".format(rel) for d, rel in deps.incoming[source_idx] if d == destination_idx and rel == relation] return None if len(matches) == 0 else matches[0] @staticmethod def retrieve_edges(dep_graph, path): """ Converts output of `DependencyUtils.shortest_path` into a list of triples that include the grammatical relation (and direction) for each node-node "hop" in the syntactic dependency graph. Parameters ---------- dep_graph : processors.ds.DirectedGraph The `DirectedGraph` used to retrieve the grammatical relations for each edge in the `path`. path : [(int, int)] A list of tuples representing the shortest path from A to B in `dep_graph`. Returns ------- [(int, str, int)] the shortest path (`path`) enhanced with the directed grammatical relations (ex. `>nsubj` for `predicate` to `subject` vs. `<nsubj` for `subject` to `predicate`). """ shortest_path = [] for (s, d) in path: # build dictionaries from incoming/outgoing outgoing = {dest_idx:">{}".format(rel) for (dest_idx, rel) in dep_graph.outgoing[s]} incoming = {source_idx:"<{}".format(rel) for (source_idx, rel) in dep_graph.incoming[s]} relation = outgoing[d] if d in outgoing else incoming[d] shortest_path.append((s, relation, d)) return shortest_path @staticmethod def simplify_tag(tag): """ Maps part of speech (PoS) tag to a subset of PoS tags to better consolidate categorical labels. Parameters ---------- tag : str The Penn-style PoS tag to be mapped to a simplified form. Returns ------- str A simplified form of `tag`. In some cases, the returned form may be identical to `tag`. """ simple_tag = "\"{}\"".format(tag) # collapse plurals if tag.startswith("NNP"): simple_tag = "/^NNP/" # collapse plurals elif tag.startswith("NN"): simple_tag = "/^N/" elif tag.startswith("VB"): simple_tag = "/^V/" # collapse comparative, superlatives, etc. elif tag.startswith("JJ"): simple_tag = "/^J/" # collapse comparative, superlatives, etc. elif tag.startswith("RB"): simple_tag = "/^RB/" # collapse possessive/non-possesive pronouns elif tag.startswith("PRP"): simple_tag = "/^PRP/" # treat WH determiners as DT elif tag == "WDT": simple_tag = "/DT$/" # treat DT the same as WDT elif tag == "DT": simple_tag = "/DT$/" return simple_tag @staticmethod def lexicalize_path(sentence, path, words=False, lemmas=False, tags=False, simple_tags=False, entities=False, limit_to=None, ): """ Lexicalizes path in syntactic dependency graph using Odin-style token constraints. Operates on output of `DependencyUtils.retrieve_edges` Parameters ---------- sentence : processors.ds.Sentence The `Sentence` from which the `path` was found. Used to lexicalize the `path`. path : list A list of (source index, relation, target index) triples. words : bool Whether or not to encode nodes in the `path` with a token constraint constructed from `Sentence.words` lemmas : bool Whether or not to encode nodes in the `path` with a token constraint constructed from `Sentence.lemmas` tags : bool Whether or not to encode nodes in the `path` with a token constraint constructed from `Sentence.tags` simple_tags : bool Whether or not to encode nodes in the `path` with a token constraint constructed from `DependencyUtils.simplify_tag` applied to `Sentence.tags` entities : bool Whether or not to encode nodes in the `path` with a token constraint constructed from `Sentence._entities` limit_to : [int] or None Selectively apply lexicalization only to the this list of token indices. None means apply the specified lexicalization to all token indices in the path. Returns ------- [str] The lexicalized form of `path`, encoded according to the specified parameters. """ UNKNOWN = LabelManager.UNKNOWN lexicalized_path = [] relations = [] nodes = [] # gather edges and nodes for edge in path: relations.append(edge[1]) nodes.append(edge[0]) nodes.append(path[-1][-1]) for (i, node) in enumerate(nodes): if not limit_to or node in limit_to: # build token constraints token_constraints = [] # words if words: token_constraints.append("word=\"{}\"".format(sentence.words[node])) # PoS tags if tags and sentence.tags[node] != UNKNOWN: token_constraints.append("tag=\"{}\"".format(sentence.tags[node])) # lemmas if lemmas and sentence.lemmas[node] != UNKNOWN: token_constraints.append("lemma=\"{}\"".format(sentence.lemmas[node])) # NE labels if entities and sentence._entities[node] != UNKNOWN: token_constraints.append("entity=\"{}\"".format(sentence.entity[node])) # simple tags if simple_tags and sentence.tags[node] != UNKNOWN: token_constraints.append("tag={}".format(DependencyUtils.simplify_tag(sentence.tags[node]))) # build node pattern if len(token_constraints) > 0: node_pattern = "[{}]".format(" & ".join(token_constraints)) # store lexicalized representation of node lexicalized_path.append(node_pattern) # append next edge if i < len(relations): lexicalized_path.append(relations[i]) return lexicalized_path @staticmethod def pagerank(networkx_graph, alpha=0.85, personalization=None, max_iter=1000, tol=1e-06, nstart=None, weight='weight', dangling=None): """ Measures node activity in a `networkx.Graph` using a thin wrapper around `networkx` implementation of pagerank algorithm (see `networkx.algorithms.link_analysis.pagerank`). Use with `processors.ds.DirectedGraph.graph`. Parameters ---------- networkx_graph : networkx.Graph Corresponds to `G` parameter of `networkx.algorithms.link_analysis.pagerank`. See Also -------- Method parameters correspond to those of [`networkx.algorithms.link_analysis.pagerank`](https://networkx.github.io/documentation/development/reference/generated/networkx.algorithms.link_analysis.pagerank_alg.pagerank.html#networkx.algorithms.link_analysis.pagerank_alg.pagerank) Returns ------- collections.Counter A collections.Counter of node -> pagerank weights """ pg_res = nx.algorithms.link_analysis.pagerank(G=networkx_graph, alpha=alpha, personalization=personalization, max_iter=max_iter, tol=tol, nstart=nstart, weight=weight, dangling=dangling) return Counter(pg_res)
[docs]class HeadFinder(object): import processors @staticmethod def semantic_head(sentence, graph_name="stanford-collapsed", valid_tags={r"^N", "VBG"}, valid_indices=None): """ Finds the token with the highest pagerank score that meets the filtering criteria. Parameters ---------- sentence : processors.ds.Sentence The Sentence to be analyzed. graph_name : str The name of the graph upon which to run the algorithm. Default is "stanford-collapsed". valid_tags : set or None An optional set of str or regexes representing valid tokens. valid_indices : list or None A optional list of int representing the indices that should be considered. Returns ------- int or None The index of the highest scoring token meeting the criteria. """ from processors.ds import Sentence as Sent def is_valid_tag(tag): return True if not valid_tags else any(re.match(tag_pattern, tag) for tag_pattern in valid_tags) # ensure we're dealing with a Sentence if not isinstance(sentence, Sent): return None valid_indices = valid_indices if valid_indices else list(range(sentence.length)) # corner case: if the sentence is a single token, pagerank doesn't apply. # check tag and index if sentence.length == 1: return 0 if is_valid_tag(sentence.tags[0]) and 0 in valid_indices else None dependencies = sentence.graphs.get(graph_name, None) if not dependencies: return None scored_toks = dependencies.pagerank().most_common() remaining = [i for (i, score) in scored_toks \ if i in valid_indices and is_valid_tag(sentence.tags[i])] # take token with the highest pagerank score return remaining[0] if len(remaining) > 0 else None