264 lines
8.1 KiB
Python
264 lines
8.1 KiB
Python
# -*- coding: utf-8 -*-
|
||
import re
|
||
import spacy
|
||
import functools
|
||
|
||
import textacy
|
||
|
||
|
||
class TextCleaner:
|
||
|
||
def __init__(self, parser, thesaurus=None, customClass_symbols=None, customClass_words=None, keep4All=None):
|
||
"""
|
||
:param parser: spacy-parser
|
||
:param thesaurus: [[syn1, syn2, ...],[syn1, syn2, ...], ...]
|
||
:param customClass_symbols:[str]
|
||
:param customClass_words:[str]
|
||
:param customClassPOS:[str]
|
||
:param keep4All: [str]
|
||
"""
|
||
if thesaurus is None:
|
||
DATAPATH_thesaurus = "openthesaurus.csv"
|
||
|
||
## !!!!!! list wichtig, da sonst nicht die gleichen Synonyme zurückgegeben werden, weil ein generator während der laufzeit pickt
|
||
self.thesaurus = list(textacy.fileio.read_csv(DATAPATH_thesaurus, delimiter=";"))
|
||
else:
|
||
self.thesaurus = thesaurus
|
||
|
||
self.parser = parser
|
||
|
||
|
||
|
||
self.whitespaceFinder = re.compile(r'(\r\n|\r|\n|(\s)+)', re.IGNORECASE)
|
||
self.mentionFinder = re.compile(r"@[a-z0-9_]{1,15}", re.IGNORECASE)
|
||
self.emailFinder = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE)
|
||
self.urlFinder = re.compile(r"^(?:https?:\/\/)?(?:www\.)?[a-zA-Z0-9./]+$", re.IGNORECASE)
|
||
|
||
|
||
|
||
# to remove
|
||
self.symbols = ["-----", "---", "...", "“", "”", ".", "-", "<", ">", ",", "?", "!", "..", "n’t", "n't", "|", "||",
|
||
";", ":",
|
||
"…", "’s", "'s", ".", "(", ")", "[", "]", "#"] + (customClass_symbols if customClass_symbols is not None else [])
|
||
self.stop_words = list(__import__("spacy." + self.parser.lang, globals(), locals(), ['object']).STOP_WORDS)+ (customClass_words if customClass_words is not None else [])
|
||
|
||
|
||
|
||
# to keep
|
||
self.entities2keep = ["WORK_OF_ART", "ORG", "PRODUCT", "LOC"] # ,"PERSON"]
|
||
self.pos2keep = ["NOUN"] # , "NUM" ]#,"VERB","ADJ"] #fürs TopicModeling nur Nomen http://aclweb.org/anthology/U15-1013
|
||
|
||
self.entities2keep = self.entities2keep + (keep4All if keep4All is not None else [])
|
||
self.pos2keep = self.pos2keep + (keep4All if keep4All is not None else [])
|
||
|
||
|
||
keep = (keep4All if hasattr(keep4All, '__iter__') else []) + self.pos2keep + self.entities2keep
|
||
|
||
|
||
# modify those to remove with those to keep
|
||
for sym in keep:
|
||
try:
|
||
self.symbols.remove(sym)
|
||
except ValueError:
|
||
pass
|
||
for sym in keep:
|
||
try:
|
||
self.stop_words.remove(sym)
|
||
except ValueError:
|
||
pass
|
||
|
||
|
||
# idee self.currentDoc = spacy.Doc für jeden String aber nicht füpr jede methode
|
||
def loadString(self,string):
|
||
self.currentDoc = self.parser(string)
|
||
|
||
"""
|
||
def removeWhitespace(self, string):
|
||
string = self.whitespaceFinder.sub(" ", string)
|
||
return string
|
||
"""
|
||
def removeWhitespace(self, string):
|
||
return string
|
||
|
||
#self.whitespaceFinder = re.compile(r'(\r\n|\r|\n|(\s)+)', re.IGNORECASE)
|
||
|
||
def removePunctuation(self, string, custom_symbols=None, keep=None):
|
||
|
||
|
||
symbols = self.symbols + (custom_symbols if custom_symbols is not None else [])
|
||
|
||
if hasattr(keep, '__iter__'):
|
||
for k in keep:
|
||
try:
|
||
symbols.remove(k)
|
||
except ValueError:
|
||
pass
|
||
|
||
|
||
# parse with spaCy
|
||
doc = self.parser(string)
|
||
tokens = []
|
||
|
||
# append Tokens to a list
|
||
for tok in doc:
|
||
if not tok.is_punct and not tok.is_space and tok.text not in symbols:
|
||
tokens.append(tok.text)
|
||
|
||
return " ".join(tokens)
|
||
|
||
def keepPOSandENT(self, string, customPOS=None, customEnt=None, remove=None):
|
||
|
||
pos2keep = self.pos2keep + (customPOS if customPOS is not None else [])
|
||
ent = self.entities2keep + (customEnt if customEnt is not None else [])
|
||
|
||
if hasattr(remove, '__iter__'):
|
||
for k in remove:
|
||
try:
|
||
ent.remove(k)
|
||
except ValueError:
|
||
try:
|
||
pos2keep.remove(k)
|
||
except ValueError:
|
||
pass
|
||
|
||
# parse with spaCy
|
||
spacy_doc = self.parser(string)
|
||
tokens = []
|
||
|
||
# append Tokens to a list
|
||
for tok in spacy_doc:
|
||
|
||
if tok.pos_ in pos2keep:
|
||
tokens.append(tok.text)
|
||
|
||
if tok.ent_type_ in ent:
|
||
tokens.append(tok.text)
|
||
|
||
return " ".join(set(tokens))
|
||
|
||
|
||
|
||
|
||
|
||
def resolveAbbreviations(self,string):
|
||
return string #todo
|
||
def removeWords(self,string, custom_words=None, keep=None, lemmatize=False):
|
||
|
||
wordlist = self.stop_words + (custom_words if custom_words is not None else [])
|
||
if hasattr(keep, '__iter__'):
|
||
for k in keep:
|
||
try:
|
||
wordlist.remove(k)
|
||
except ValueError:
|
||
pass
|
||
|
||
|
||
|
||
string = self.urlFinder.sub("URL", string)
|
||
string = self.emailFinder.sub("EMAIL", string)
|
||
string = self.mentionFinder.sub("MENTION", string)
|
||
string = string.replace("&", "and").replace(">", ">").replace("<", "<")
|
||
|
||
|
||
# parse with spaCy
|
||
spacy_doc = self.parser(string)
|
||
tokens = []
|
||
|
||
# append Tokens to a list
|
||
for tok in spacy_doc:
|
||
|
||
#do not include stopwords/customwords and single chars
|
||
if tok.text not in wordlist and len(tok)>1:
|
||
if lemmatize:
|
||
tokens.append(tok.lemma_)
|
||
else:
|
||
tokens.append(tok.lower_)
|
||
return " ".join(set(tokens))
|
||
|
||
|
||
def normalizeSynonyms(self, string, default_return_first_Syn=False):
|
||
# parse with spaCy
|
||
spacy_doc = self.parser(string)
|
||
tokens = []
|
||
|
||
tokens = [str(self.getFirstSynonym(tok, self.thesaurus, default_return_first_Syn=default_return_first_Syn)) for tok in spacy_doc]
|
||
|
||
return " ".join(set(tokens))
|
||
|
||
def getFirstSynonym(self,word, thesaurus, default_return_first_Syn=False):
|
||
if not isinstance(word, str):
|
||
return word
|
||
|
||
|
||
word = word.lower()
|
||
|
||
|
||
# durch den thesaurrus iterieren
|
||
for syn_block in thesaurus: # syn_block ist eine liste mit Synonymen
|
||
|
||
for syn in syn_block:
|
||
syn = syn.lower()
|
||
if re.match(r'\A[\w-]+\Z', syn): # falls syn einzelwort ist
|
||
if word == syn:
|
||
return self.getHauptform(syn_block, word, default_return_first_Syn=default_return_first_Syn)
|
||
else: # falls es ein satz ist
|
||
if word in syn:
|
||
return self.getHauptform(syn_block, word, default_return_first_Syn=default_return_first_Syn)
|
||
return word # zur Not, das ursrpüngliche Wort zurückgeben
|
||
|
||
def getHauptform(self,syn_block, word, default_return_first_Syn=False):
|
||
|
||
for syn in syn_block:
|
||
syn = syn.lower()
|
||
|
||
if "hauptform" in syn and len(syn.split(" ")) <= 2:
|
||
# nicht ausgeben, falls es in Klammern steht
|
||
for w in syn.split(" "):
|
||
if not re.match(r'\([^)]+\)', w):
|
||
return w
|
||
|
||
if default_return_first_Syn:
|
||
# falls keine hauptform enthalten ist, das erste Synonym zurückgeben, was kein satz ist und nicht in klammern steht
|
||
for w in syn_block:
|
||
if not re.match(r'\([^)]+\)', w):
|
||
return w
|
||
return word # zur Not, das ursrpüngliche Wort zurückgeben
|
||
|
||
|
||
|
||
|
||
"""
|
||
#################################################################################################################
|
||
|
||
#todo funzt irgendwie nich wie's soll: https://mathieularose.com/function-composition-in-python/
|
||
def compose(self,*functions):
|
||
return functools.reduce(lambda f, g: lambda x: f(g(x)), functions, lambda x: x)
|
||
|
||
pipeline = compose(functools.partial(cleaner.keepPOSandENT,lemmatize=True))#, cleaner.normalizeSynonyms)
|
||
|
||
#################################################################################################################
|
||
"""
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|