How to get the Corresponding Negation Terms used for a Set of Detected Negated Lexicons in NegSpacy?

123 Views Asked by At

I am working on a project with a clinical dataset. So far, I was able to detect all the diagnoses and whether they are negated or not. But, what I really like to get as well, is the negation term used to detect each negated lexicon. For example:

import spacy
from negspacy.negation import Negex

nlp = spacy.load("en_core_sci_lg")
nlp.add_pipe("negex")

doc = nlp("She has neither fever nor cough.")

for e in doc.ents:
print(e.text, e._.negex)

Output: fever True cough True

What more do I expect to get: The negation term for each negated lexicon detected:

Output:  = [[negated lexicon = "fever", corresponding negated term = "neither"], 
            [negated lexicon = 'cough', corresponding negated term = 'nor']]     

Here is the modified code I am using right now, but I know it is not giving me all the negated terms, and also it is not the optimum way for massive data.

from spacy.tokens import Token
import spacy
from negspacy.negation import Negex
import re

Token.set_extension("negex", default=False, force=True)

preceding_negation = ["not associated with", 'without any reactions or signs of', 'no', 'teaching the patient', 'test for',
                       'if you get', 'negative', 'h/o', "isn't", 'negative for', 'never developed', 'did not exhibit', 
                       'rules out','ruled the patient out', 'deny', 'cant', "didn't", 'didnt', 'reject', 'cannot', 
                       'without indication of', 'monitor for', "don't", "can't", 'never', 'denied', 'ruled out', 
                       'no evidence of', 'ruled patient out', 'denying', 'denies', 'instead of', 'wasnt', 'no signs of',
                       'without signs of', 'no cause of', 'neither', 'versus', 'symptoms atypical', 'monitored for', 'doesnt',
                       'refuses', 'evaluate for', 'fails to reveal', 'rule patient out', 'no sign of', "aren't",
                       'taught the patient', 'without sign of', 'declined', 'ruled him out', 'free of', 'nor', 'concern for',
                       'refuse', 'werent', 'if you experience', 'rule her out', 'educating the patient', 'which causes',
                       'doubt', 'educated the patient', 'leads to', "weren't", 'couldnt', 'not', "couldn't", 'ruled her out',
                       'rule him out', 'tested for', 'arent', 'patient was not', 'dont', 'history of', 'supposed', 'r/o',
                       'absence of', 'no complaints of', 'not demonstrate', "wasn't", 'isnt', 'without', 'educate the patient',
                       'unlikely', 'rule out','teach the patient', "doesn't", 'rule the patient out', 'never had']   
    
following_negation = ['nor', 'absent', 'denied', "weren't", 'refused', 'was not', 'negative', "wasn't", 'were ruled out', 
                       'decline', "wasn't", "weren't", 'deny', 'were not', 'not', 'unlikely', 'was ruled out', 'rejected',
                       'declined', 'free'] 

nlp = spacy.load("en_core_sci_lg")

nlp.add_pipe("negex", config={"chunk_prefix": ["no"]})

Clinical_note = [
    "neither chest pain nor backache, perceived to be tightening feeling in the central retrosternal area",
    "also in the back. She denied any sign of ASCVD.", "It is not associated with nauseation, dyspnea, or diaphoresis.", 
    "Review of systems is negative for orthopnea, PND, pleuritic, or known history of asthma.",
    "Atherosclerotic disease and Atherosclerotic Risk Factors: Hypertension. Any sign of fever was negative.", 
    "There is family history of cancer."]

for x in Clinical_note:
    
    x = x.lower()
    doc = nlp(x)
    
    a = re.findall(r"\bascvd|pnd|pleuritic|dyspnea|atherosclerotic disease|fever|chest pain|diaphoresis\b", str(x))
    
    for e in doc.ents: 
        Prec_neg = []   
        Fol_neg = []

        for b in a:
            if e.text == b and e._.negex == True:

                prec = x.split(e.text)[0].rstrip(" ").split(' ')
                Prec_sent = ' '.join(prec[-8:])
                fol = x.split(e.text)[1].lstrip(" ").split(' ')
                Fol_sent = ' '.join(fol[:3])

                for i in preceding_negation:
                    if i in Prec_sent:
                        Prec_neg.append(i)                
                    continue

                else:    
                    for k in following_negation:
                        if k in Fol_sent:
                            Fol_neg.add(k)
                if Prec_neg:                                           
                    print("Preceding Output is: ('{}', '{}')".format(max(Prec_neg, key=len), e.text))

                elif Fol_neg:
                    print("Following Output is: ('{}', '{}')".format(e.text, max(Fol_neg, key=len)))

I also tried to clone all the repositories of negspacy from github and modify the negation.py by defining a new term "._.negate_term" and set the extension for spacy by:

Token.set_extension("negate_term", default= "", force=True )

and also:

nlp.add_pipe("negex", config={"chunk_prefix": ["no"], "term_extension_name": "negate_term"})

But I got the following error and I'm nor sure how should I fix it:

AttributeError: [E047] Can't assign a value to unregistered extension attribute 'negate_term'. Did you forget to call the `set_extension` method?

Here is what my negation.py looks like (I have some modification at the very beginning of the script, the def___init__ and also def negex(self, doc)):

from spacy.language import Language
from spacy.tokens import Token, Doc, Span
from spacy.matcher import PhraseMatcher
import logging

from negspacy.negspacy.termsets import termset

default_ts = termset("en_clinical_sensitive").get_patterns()


@Language.factory(
    "negex",
    default_config={
        "neg_termset": default_ts,
        "ent_types": list(),
        "extension_name": "negex",
        "chunk_prefix": list(),
        "term_extension_name": "negate_term",
    },
)
class Negex:

    def __init__(
        self,
        nlp: Language,
        name: str,
        neg_termset: dict,
        ent_types: list,
        extension_name: str,
        chunk_prefix: list,
        term_extension_name: str,
    ):
        

        if not Span.has_extension(extension_name):
            Span.set_extension(extension_name, default=False, force=True)

#         if not Token.has_extension(term_extension_name):
#             Token.set_extension(term_extension_name, default="", force=True)
            
        if not Token.has_extension("negate_term"):
            Token.set_extension("negate_term", default="", force=True)

        ts = neg_termset
        expected_keys = [
            "pseudo_negations",
            "preceding_negations",
            "following_negations",
            "termination",
        ]
        if not set(ts.keys()) == set(expected_keys):
            raise KeyError(
                f"Unexpected or missing keys in 'neg_termset', expected: {expected_keys}, instead got: {list(ts.keys())}"
            )

        self.pseudo_negations = ts["pseudo_negations"]
        self.preceding_negations = ts["preceding_negations"]
        self.following_negations = ts["following_negations"]
        self.termination = ts["termination"]

        self.nlp = nlp
        self.ent_types = ent_types
        self.extension_name = extension_name
        self.term_extension_name = term_extension_name
        #self.term_extension_name = "negate_term"
        self.build_patterns()
        self.chunk_prefix = list(nlp.tokenizer.pipe(chunk_prefix))

    def build_patterns(self):
        # efficiently build spaCy matcher patterns
        self.matcher = PhraseMatcher(self.nlp.vocab, attr="LOWER")

        self.pseudo_patterns = list(self.nlp.tokenizer.pipe(self.pseudo_negations))
        self.matcher.add("pseudo", None, *self.pseudo_patterns)
        for pattern in self.pseudo_patterns:
            for t in pattern:
                t._.set("neg_term", t.text)

        self.preceding_patterns = list(
            self.nlp.tokenizer.pipe(self.preceding_negations)
        )
        self.matcher.add("Preceding", None, *self.preceding_patterns)
        for pattern in self.preceding_patterns:
            for t in pattern:
                t._.set("neg_term", t.text)

        self.following_patterns = list(
            self.nlp.tokenizer.pipe(self.following_negations)
        )
        self.matcher.add("Following", None, *self.following_patterns)
        for pattern in self.following_patterns:
            for t in pattern:
                t._.set("neg_term", t.text)

        self.termination_patterns = list(self.nlp.tokenizer.pipe(self.termination))
        self.matcher.add("Termination", None, *self.termination_patterns)
        for pattern in self.termination_patterns:
            for t in pattern:
                t._.set("neg_term", t.text)


    def process_negations(self, doc):

        preceding = list()
        following = list()
        terminating = list()

        matches = self.matcher(doc)
        pseudo = [
            (match_id, start, end)
            for match_id, start, end in matches
            if self.nlp.vocab.strings[match_id] == "pseudo"
        ]

        for match_id, start, end in matches:
            if self.nlp.vocab.strings[match_id] == "pseudo":
                continue
            pseudo_flag = False
            for p in pseudo:
                if start >= p[1] and start <= p[2]:
                    pseudo_flag = True
                    continue
            if not pseudo_flag:
                if self.nlp.vocab.strings[match_id] == "Preceding":
                    preceding.append((match_id, start, end))
                elif self.nlp.vocab.strings[match_id] == "Following":
                    following.append((match_id, start, end))
                elif self.nlp.vocab.strings[match_id] == "Termination":
                    terminating.append((match_id, start, end))
                else:
                    logging.warnings(
                        f"phrase {doc[start:end].text} not in one of the expected matcher types."
                    )
        return preceding, following, terminating

    def termination_boundaries(self, doc, terminating):

        sent_starts = [sent.start for sent in doc.sents]
        terminating_starts = [t[1] for t in terminating]
        starts = sent_starts + terminating_starts + [len(doc)]
        starts.sort()
        boundaries = list()
        index = 0
        for i, start in enumerate(starts):
            if not i == 0:
                boundaries.append((index, start))
            index = start
        return boundaries
    

    def negex(self, doc):
        preceding, following, terminating = self.process_negations(doc)
        boundaries = self.termination_boundaries(doc, terminating)
        for b in boundaries:
            sub_preceding = [i for i in preceding if b[0] <= i[1] < b[1]]
            sub_following = [i for i in following if b[0] <= i[1] < b[1]]

            for e in doc[b[0] : b[1]].ents:
                if self.ent_types:
                    if e.label_ not in self.ent_types:
                        continue
                        
                if any(pre < e.start for pre in [i[1] for i in sub_preceding]):
#                     negation_term = doc[pre:e.start].text
#                     print("Negation term:", negation_term)
                    e._.set(self.extension_name, True)
                    e._.set(self.term_extension_name, " ".join([t._.neg_term for t in doc[e.start - 1 : e.end]])) 
                    #e._.set(self.term_extension_name, " ".join([t.text for t in doc[e.start - 1 : e.end]]))
                    #e._.negate_term = " ".join([t.text for t in doc[e.start - 1 : e.end]])
                    continue
                    
                if any(fol > e.end for fol in [i[2] for i in sub_following]):                   
#                     negation_term = doc[e.end:fol].text
#                     print("Negation term:", negation_term)
                    e._.set(self.extension_name, True)
                    e._.set(self.term_extension_name, " ".join([t._.neg_term for t in doc[e.start + 1 : e.end]])) 
                    #e._.set(self.term_extension_name, " ".join([t.text for t in doc[e.start + 1 : e.end]]))
                    #e._.negate_term = " ".join([t.text for t in doc[e.start + 1 : e.end]])
                    continue
                    
                if self.chunk_prefix:
                    if any(
                        e.text.lower().startswith(c.text.lower())
                        for c in self.chunk_prefix
                    ):
                        e._.set(self.extension_name, True)
                        e._.set(self.term_extension_name, "")
        return doc

    def __call__(self, doc):
        return self.negex(doc)

I think modifying the negspacy.negation.py would be a better way in terms of computation time and also accuracy, but I'm not sure how should I suppose to fix the problems. It would be very appreciated if someone could help me.

0

There are 0 best solutions below