Source code for processors.ds

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Gus Hahn-Powell 2015
# data structures for storing processors-server output
# based on conventions from the CLU lab's processors library (
from __future__ import unicode_literals
from itertools import chain
from collections import defaultdict, Counter
from processors.paths import DependencyUtils, HeadFinder
from processors.utils import LabelManager
import networkx as nx
import hashlib
import json
import re

[docs]class NLPDatum(object): def to_JSON_dict(self): return dict() def to_JSON(self, pretty=False): """ Returns JSON as String. """ num_spaces = 4 if pretty else None return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=num_spaces)
[docs]class Document(NLPDatum): """ Storage class for annotated text. Based on [`org.clulab.processors.Document`]( Parameters ---------- sentences : [processors.ds.Sentence] The sentences comprising the `Document`. Attributes ---------- id : str or None A unique ID for the `Document`. size : int The number of `sentences`. sentences : sentences The sentences comprising the `Document`. words : [str] A list of the `Document`'s tokens. tags : [str] A list of the `Document`'s tokens represented using part of speech (PoS) tags. lemmas : [str] A list of the `Document`'s tokens represented using lemmas. _entities : [str] A list of the `Document`'s tokens represented using IOB-style named entity (NE) labels. nes : dict A dictionary of NE labels represented in the `Document` -> a list of corresponding text spans. bag_of_labeled_deps : [str] The labeled dependencies from all sentences in the `Document`. bag_of_unlabeled_deps : [str] The unlabeled dependencies from all sentences in the `Document`. text : str or None The original text of the `Document`. Methods ------- bag_of_labeled_dependencies_using(form) Produces a list of syntactic dependencies where each edge is labeled with its grammatical relation. bag_of_unlabeled_dependencies_using(form) Produces a list of syntactic dependencies where each edge is left unlabeled without its grammatical relation. """ def __init__(self, sentences): NLPDatum.__init__(self) = None self.size = len(sentences) self.sentences = sentences # easily access token attributes from all sentences self.words = list(chain(*[s.words for s in self.sentences])) self.tags = list(chain(*[s.tags for s in self.sentences])) self.lemmas = list(chain(*[s.lemmas for s in self.sentences])) self._entities = list(chain(*[s._entities for s in self.sentences])) self.nes = merge_entity_dicts = self._merge_ne_dicts() self.bag_of_labeled_deps = list(chain(*[s.dependencies.labeled for s in self.sentences])) self.bag_of_unlabeled_deps = list(chain(*[s.dependencies.unlabeled for s in self.sentences])) self.text = None def __hash__(self): return hash(self.to_JSON()) def __unicode__(self): return self.text def __str__(self): return "Document w/ {} Sentence{}".format(self.size, "" if self.size == 1 else "s") def __eq__(self, other): if isinstance(other, self.__class__): return self.to_JSON() == other.to_JSON() else: return False def __ne__(self, other): return not self.__eq__(other) def bag_of_labeled_dependencies_using(self, form): return list(chain(*[s.labeled_dependencies_from_tokens(s._get_tokens(form)) for s in self.sentences])) def bag_of_unlabeled_dependencies_using(self, form): return list(chain(*[s.unlabeled_dependencies_from_tokens(s._get_tokens(form)) for s in self.sentences])) def _merge_ne_dicts(self): # Get the set of all NE labels found in the Doc's sentences entity_labels = set(chain(*[s.nes.keys() for s in self.sentences])) # Do we have any labels? if entity_labels == None: return None # If we have labels, consolidate the NEs under the appropriate label else: nes_dict = dict() for e in entity_labels: entities = [] for s in self.sentences: entities += s.nes[e] nes_dict[e] = entities return nes_dict def to_JSON_dict(self): doc_dict = dict() doc_dict["sentences"] = [s.to_JSON_dict() for s in self.sentences] doc_dict["text"] = self.text # can the ID be set? if != None: doc_dict["id"] = return doc_dict @staticmethod def load_from_JSON(json_dict): sentences = [] for s in json_dict["sentences"]: kwargs = { "words": s["words"], "startOffsets": s["startOffsets"], "endOffsets": s["endOffsets"], "tags": s.get("tags", None), "lemmas": s.get("lemmas", None), "chunks": s.get("chunks", None), "entities": s.get("entities", None), "graphs": s.get("graphs", None) } sent = Sentence(**kwargs) sentences.append(sent) doc = Document(sentences) # set id and text doc.text = json_dict.get("text", None) = json_dict.get("id", None) return doc
[docs]class Sentence(NLPDatum): """ Storage class for an annotated sentence. Based on [`org.clulab.processors.Sentence`]( Parameters ---------- text : str or None The text of the `Sentence`. words : [str] A list of the `Sentence`'s tokens. startOffsets : [int] The character offsets starting each token (inclusive). endOffsets : [int] The character offsets marking the end of each token (exclusive). tags : [str] A list of the `Sentence`'s tokens represented using part of speech (PoS) tags. lemmas : [str] A list of the `Sentence`'s tokens represented using lemmas. chunks : [str] A list of the `Sentence`'s tokens represented using IOB-style phrase labels (ex. `B-NP`, `I-NP`, `B-VP`, etc.). entities : [str] A list of the `Sentence`'s tokens represented using IOB-style named entity (NE) labels. graphs : dict A dictionary of {graph-name -> {edges: [{source, destination, relation}], roots: [int]}} Attributes ---------- text : str The text of the `Sentence`. startOffsets : [int] The character offsets starting each token (inclusive). endOffsets : [int] The character offsets marking the end of each token (exclusive). length : int The number of tokens in the `Sentence` graphs : dict A dictionary (str -> `processors.ds.DirectedGraph`) mapping the graph type/name to a `processors.ds.DirectedGraph`. basic_dependencies : processors.ds.DirectedGraph A `processors.ds.DirectedGraph` using basic Stanford dependencies. collapsed_dependencies : processors.ds.DirectedGraph A `processors.ds.DirectedGraph` using collapsed Stanford dependencies. dependencies : processors.ds.DirectedGraph A pointer to the prefered syntactic dependency graph type for this `Sentence`. _entities : [str] The IOB-style Named Entity (NE) labels corresponding to each token. _chunks : [str] The IOB-style chunk labels corresponding to each token. nes : dict A dictionary of NE labels represented in the `Document` -> a list of corresponding text spans (ex. {"PERSON": [phrase 1, ..., phrase n]}). Built from `Sentence._entities` phrases : dict A dictionary of chunk labels represented in the `Document` -> a list of corresponding text spans (ex. {"NP": [phrase 1, ..., phrase n]}). Built from `Sentence._chunks` Methods ------- bag_of_labeled_dependencies_using(form) Produces a list of syntactic dependencies where each edge is labeled with its grammatical relation. bag_of_unlabeled_dependencies_using(form) Produces a list of syntactic dependencies where each edge is left unlabeled without its grammatical relation. """ UNKNOWN = LabelManager.UNKNOWN # the O in IOB notation O = LabelManager.O def __init__(self, **kwargs): NLPDatum.__init__(self) self.words = kwargs["words"] self.startOffsets = kwargs["startOffsets"] self.endOffsets = kwargs["endOffsets"] self.length = len(self.words) self.tags = self._set_toks(kwargs.get("tags", None)) self.lemmas = self._set_toks(kwargs.get("lemmas", None)) self._chunks = self._set_toks(kwargs.get("chunks", None)) self._entities = self._set_toks(kwargs.get("entities", None)) self.text = kwargs.get("text", None) or " ".join(self.words) self.graphs = self._build_directed_graph_from_dict(kwargs.get("graphs", None)) self.basic_dependencies = self.graphs.get(DirectedGraph.STANFORD_BASIC_DEPENDENCIES, None) self.collapsed_dependencies = self.graphs.get(DirectedGraph.STANFORD_COLLAPSED_DEPENDENCIES, None) self.dependencies = self.collapsed_dependencies if self.collapsed_dependencies != None else self.basic_dependencies # IOB tokens -> {label: [phrase 1, ..., phrase n]} self.nes = self._handle_iob(self._entities) self.phrases = self._handle_iob(self._chunks) def __eq__(self, other): if isinstance(other, self.__class__): return self.to_JSON() == other.to_JSON() else: return False def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash(self.to_JSON(pretty=False)) def deduplication_hash(self): """ Generates a deduplication hash for the sentence """ return hashlib.sha256(self.to_JSON(pretty=False).encode()).hexdigest() def _get_tokens(self, form): f = form.lower() if f == "words": tokens = self.words elif f == "tags": tokens = self.tags elif f == "lemmas": tokens = self.lemmas elif f == "entities": tokens = self.nes elif f == "index": tokens = list(range(self.length)) # unrecognized form else: raise Exception("""form must be 'words', 'tags', 'lemmas', or 'index'""") return tokens def _set_toks(self, toks): return toks if toks else [Sentence.UNKNOWN]*self.length def _handle_iob(self, iob): """ Consolidates consecutive tokens in IOB notation under the appropriate label. Regexs control for bionlp annotator, which uses IOB notation. """ entity_dict = defaultdict(list) # initialize to empty label current = Sentence.O start = None end = None for i, tok in enumerate(iob): # we don't have an I or O if tok == Sentence.O: # did we have an entity with the last token? current = re.sub('(B-|I-)','', str(current)) if current == Sentence.O: continue else: # the last sequence has ended end = i # store the entity named_entity = ' '.join(self.words[start:end]) entity_dict[current].append(named_entity) # reset our book-keeping vars current = Sentence.O start = None end = None # we have a tag! else: # our old sequence continues current = re.sub('(B-|I-)','', str(current)) tok = re.sub('(B-|I-)','', str(tok)) if tok == current: end = i # our old sequence has ended else: # do we have a previous NE? if current != Sentence.O: end = i named_entity = ' '.join(self.words[start:end]) entity_dict[current].append(named_entity) # update our book-keeping vars current = tok start = i end = None # this might be empty return entity_dict def _build_directed_graph_from_dict(self, graphs): deps_dict = dict() if graphs and len(graphs) > 0: # process each stored graph for (kind, deps) in graphs.items(): deps_dict[kind] = DirectedGraph(kind, deps, self.words) return deps_dict return None def __unicode__(self): return self.text def to_string(self): return ' '.join("{w}__{p}".format(w=self.words[i],p=self.tags[i]) for i in range(self.length)) def bag_of_labeled_dependencies_using(self, form): """ Produces a list of syntactic dependencies where each edge is labeled with its grammatical relation. """ tokens = self._get_tokens(form) return self.labeled_dependencies_from_tokens(tokens) if tokens else None def bag_of_unlabeled_dependencies_using(self, form): """ Produces a list of syntactic dependencies where each edge is left unlabeled without its grammatical relation. """ tokens = self._get_tokens(form) return self.unlabeled_dependencies_from_tokens(tokens) if tokens else None def labeled_dependencies_from_tokens(self, tokens): """ Generates a list of labeled dependencies for a sentence using the provided tokens """ deps = self.dependencies labeled = [] return [(tokens[out], rel, tokens[dest]) \ for out in deps.outgoing \ for (dest, rel) in deps.outgoing[out]] def unlabeled_dependencies_from_tokens(self, tokens): """ Generate a list of unlabeled dependencies for a sentence using the provided tokens """ return [(head, dep) for (head, rel, dep) in self.labeled_dependencies_from_tokens(tokens)] def semantic_head(self, graph_name="stanford-collapsed", valid_tags={r"^N", "VBG"}, valid_indices=None): return HeadFinder.semantic_head(self, graph_name, valid_tags, valid_indices) def to_JSON_dict(self): sentence_dict = dict() sentence_dict["words"] = self.words sentence_dict["startOffsets"] = self.startOffsets sentence_dict["endOffsets"] = self.endOffsets sentence_dict["tags"] = self.tags sentence_dict["lemmas"] = self.lemmas sentence_dict["entities"] = self._entities sentence_dict["chunks"] = self._chunks # add graphs sentence_dict["graphs"] = dict() for (kind, graph) in self.graphs.items(): sentence_dict["graphs"][kind] = graph._graph_to_JSON_dict() return sentence_dict @staticmethod def load_from_JSON(json_dict): sent = Sentence( words=json_dict["words"], startOffsets=json_dict["startOffsets"], endOffsets=json_dict["endOffsets"], lemmas=json_dict.get("lemmas", None), tags=json_dict.get("tags", None), entities=json_dict.get("entities", None), text=json_dict.get("text", None), graphs=json_dict.get("graphs", None), chunks=json_dict.get("chunks", None) ) return sent
[docs]class Edge(NLPDatum): def __init__(self, source, destination, relation): NLPDatum.__init__(self) self.source = source self.destination = destination self.relation = relation def __unicode__(self): return self.to_string() def to_string(self): return "Edge(source: {}, destination: {}, relation: {})".format(self.source, self.destination, self.relation) def __eq__(self, other): if isinstance(other, self.__class__): return self.to_JSON() == other.to_JSON() else: return False def to_JSON_dict(self): edge_dict = dict() edge_dict["source"] = self.source edge_dict["destination"] = self.destination edge_dict["relation"] = self.relation return edge_dict
[docs]class DirectedGraph(NLPDatum): """ Storage class for directed graphs. Parameters ---------- kind : str The name of the directed graph. deps : dict A dictionary of {edges: [{source, destination, relation}], roots: [int]} words : [str] A list of the word form of the tokens from the originating `Sentence`. Attributes ---------- _words : [str] A list of the word form of the tokens from the originating `Sentence`. roots : [int] A list of indices for the syntactic dependency graph's roots. Generally this is a single token index. edges: list[processors.ds.Edge] A list of `processors.ds.Edge` incoming : A dictionary of {int -> [int]} encoding the incoming edges for each node in the graph. outgoing : A dictionary of {int -> [int]} encoding the outgoing edges for each node in the graph. labeled : [str] A list of strings where each element in the list represents an edge encoded as source index, relation, and destination index ("source_relation_destination"). unlabeled : [str] A list of strings where each element in the list represents an edge encoded as source index and destination index ("source_destination"). graph : networkx.Graph A `networkx.graph` representation of the `DirectedGraph`. Used by `shortest_path` Methods ------- bag_of_labeled_dependencies_from_tokens(form) Produces a list of syntactic dependencies where each edge is labeled with its grammatical relation. bag_of_unlabeled_dependencies_from_tokens(form) Produces a list of syntactic dependencies where each edge is left unlabeled without its grammatical relation. """ STANFORD_BASIC_DEPENDENCIES = "stanford-basic" STANFORD_COLLAPSED_DEPENDENCIES = "stanford-collapsed" def __init__(self, kind, deps, words): NLPDatum.__init__(self) self._words = [w.lower() for w in words] self.kind = kind self.roots = deps.get("roots", []) self.edges = [Edge(e["source"], e["destination"], e["relation"]) for e in deps["edges"]] self.incoming = self._build_incoming(self.edges) self.outgoing = self._build_outgoing(self.edges) self.labeled = self._build_labeled() self.unlabeled = self._build_unlabeled() self.directed_graph = DependencyUtils.build_networkx_graph(roots=self.roots, edges=self.edges, name=self.kind, reverse=False) self.undirected_graph = self.directed_graph.to_undirected() def __unicode__(self): return self.edges def __eq__(self, other): if isinstance(other, self.__class__): return self.to_JSON() == other.to_JSON() else: return False def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash(self.to_JSON()) def shortest_paths(self, start, end): """ Find the shortest paths in the syntactic depedency graph between the provided start and end nodes. Parameters ---------- start : int or [int] A single token index or list of token indices serving as the start of the graph traversal. end : int or [int] A single token index or list of token indices serving as the end of the graph traversal. See Also -------- `processors.paths.DependencyUtils.shortest_path` """ paths = DependencyUtils.shortest_paths(self.undirected_graph, start, end) return None if not paths else [DependencyUtils.retrieve_edges(self, path) for path in paths] def shortest_path(self, start, end, scoring_func=lambda path: -len(path)): """ Find the shortest path in the syntactic depedency graph between the provided start and end nodes. Parameters ---------- start : int or [int] A single token index or list of token indices serving as the start of the graph traversal. end : int or [int] A single token index or list of token indices serving as the end of the graph traversal. scoring_func : function A function that scores each path in a list of [(source index, directed relation, destination index)] paths. Each path has the form [(source index, relation, destination index)]. The path with the maximum score will be returned. See Also -------- `processors.paths.DependencyUtils.shortest_path` """ paths = self.shortest_paths(start, end) return None if not paths else max(paths, key=scoring_func) def degree_centrality(self): """ Compute the degree centrality for nodes. See Also -------- """ return Counter(nx.degree_centrality(self.directed_graph)) def in_degree_centrality(self): """ Compute the in-degree centrality for nodes. See Also -------- """ return Counter(nx.in_degree_centrality(self.directed_graph)) def out_degree_centrality(self): """ Compute the out-degree centrality for nodes. See Also -------- """ return Counter(nx.out_degree_centrality(self.directed_graph)) def pagerank(self, alpha=0.85, personalization=None, max_iter=1000, tol=1e-06, nstart=None, weight='weight', dangling=None, use_directed=True, reverse=True): """ Measures node activity in a `networkx.Graph` using a thin wrapper around `networkx` implementation of pagerank algorithm (see `networkx.algorithms.link_analysis.pagerank`). Use with `processors.ds.DirectedGraph.graph`. Note that by default, the directed graph is reversed in order to highlight predicate-argument nodes (refer to pagerank algorithm to understand why). See Also -------- `processors.paths.DependencyUtils.pagerank` Method parameters correspond to those of [`networkx.algorithms.link_analysis.pagerank`]( """ # check whether or not to reverse directed graph dg = self.directed_graph if not reverse else DependencyUtils.build_networkx_graph(roots=self.roots, edges=self.edges, name=self.kind, reverse=True) # determine graph to use graph = dg if use_directed else self.undirected_graph return DependencyUtils.pagerank(graph, alpha=alpha, personalization=personalization, max_iter=max_iter, tol=tol, nstart=nstart, weight=weight, dangling=dangling) def _build_incoming(self, edges): dep_dict = defaultdict(list) for edge in edges: dep_dict[edge.destination].append((edge.source, edge.relation)) return dep_dict def _build_outgoing(self, edges): dep_dict = defaultdict(list) for edge in edges: dep_dict[edge.source].append((edge.destination, edge.relation)) return dep_dict def _build_labeled(self): labeled = [] for out in self.outgoing: for (dest, rel) in self.outgoing[out]: labeled.append("{}_{}_{}".format(self._words[out], rel.upper(), self._words[dest])) return labeled def _build_unlabeled(self): unlabeled = [] for out in self.outgoing: for (dest, _) in self.outgoing[out]: unlabeled.append("{}_{}".format(self._words[out], self._words[dest])) return unlabeled def _graph_to_JSON_dict(self): dg_dict = dict() dg_dict["edges"] = [e.to_JSON_dict() for e in self.edges] dg_dict["roots"] = self.roots return dg_dict def to_JSON_dict(self): return {self.kind:self._graph_to_JSON_dict()}
[docs]class Interval(NLPDatum): """ Defines a token or character span Parameters ---------- start : str The token or character index where the interval begins. end : str The 1 + the index of the last token/character in the span. Methods ------- contains(that) Test whether `that` (int or Interval) overlaps with span of this Interval. overlaps(that) Test whether this Interval contains another. Equivalent Intervals will overlap. """ def __init__(self, start, end): NLPDatum.__init__(self) assert (start < end), "Interval start must precede end." self.start = start self.end = end def to_JSON_dict(self): return {"start":self.start, "end":self.end} def size(self): return self.end - self.start def contains(self, that): """ Checks if this interval contains another (that) """ if isinstance(that, self.__class__): return self.start <= that.start and self.end >= that.end else: return False def overlaps(self, that): """ Checks for overlap. """ if isinstance(that, int): return self.start <= other < self.end elif isinstance(that, self.__class__): return ((that.start <= self.start < that.end) or (self.start <= that.start < self.end)) else: return False @staticmethod def load_from_JSON(json): return Interval(start=json["start"], end=json["end"])