topicModelingTickets/textCleaning.py

271 lines
8.1 KiB
Python
Raw Normal View History

2017-09-01 14:27:03 +02:00
# -*- coding: utf-8 -*-
import re
import spacy
import functools
import textacy
class TextCleaner:
def __init__(self, parser, thesaurus=None, customClass_symbols=None, customClass_words=None, keep4All=None):
2017-09-01 14:27:03 +02:00
"""
:param parser: spacy-parser
:param thesaurus: [[syn1, syn2, ...],[syn1, syn2, ...], ...]
:param customClass_symbols:[str]
:param customClass_words:[str]
:param customClassPOS:[str]
:param keep4All: [str]
2017-09-01 14:27:03 +02:00
"""
if thesaurus is None:
DATAPATH_thesaurus = "openthesaurus.csv"
## !!!!!! list wichtig, da sonst nicht die gleichen Synonyme zurückgegeben werden, weil ein generator während der laufzeit pickt
self.thesaurus = list(textacy.fileio.read_csv(DATAPATH_thesaurus, delimiter=";"))
else:
self.thesaurus = thesaurus
self.parser = parser
self.whitespaceFinder = re.compile(r'(\r\n|\r|\n|(\s)+)', re.IGNORECASE)
self.mentionFinder = re.compile(r"@[a-z0-9_]{1,15}", re.IGNORECASE)
self.emailFinder = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE)
self.urlFinder = re.compile(r"^(?:https?:\/\/)?(?:www\.)?[a-zA-Z0-9./]+$", re.IGNORECASE)
# to remove
self.symbols = ["-----", "---", "...", "", "", ".", "-", "<", ">", ",", "?", "!", "..", "nt", "n't", "|", "||",
";", ":",
"", "s", "'s", ".", "(", ")", "[", "]", "#"] + (customClass_symbols if customClass_symbols is not None else [])
self.stop_words = list(__import__("spacy." + self.parser.lang, globals(), locals(), ['object']).STOP_WORDS)+ (customClass_words if customClass_words is not None else [])
# to keep
self.entities2keep = ["WORK_OF_ART", "ORG", "PRODUCT", "LOC"] # ,"PERSON"]
self.pos2keep = ["NOUN"] # , "NUM" ]#,"VERB","ADJ"] #fürs TopicModeling nur Nomen http://aclweb.org/anthology/U15-1013
self.entities2keep = self.entities2keep + (keep4All if keep4All is not None else [])
self.pos2keep = self.pos2keep + (keep4All if keep4All is not None else [])
2017-09-01 14:27:03 +02:00
keep = (keep4All if hasattr(keep4All, '__iter__') else []) + self.pos2keep + self.entities2keep
2017-09-01 14:27:03 +02:00
# modify those to remove with those to keep
for sym in keep:
try:
self.symbols.remove(sym)
except ValueError:
pass
for sym in keep:
try:
self.stop_words.remove(sym)
except ValueError:
pass
2017-09-01 14:27:03 +02:00
# idee self.currentDoc = spacy.Doc für jeden String aber nicht füpr jede methode
def loadString(self,string):
self.currentDoc = self.parser(string)
2017-09-01 14:27:03 +02:00
"""
2017-09-01 14:27:03 +02:00
def removeWhitespace(self, string):
string = self.whitespaceFinder.sub(" ", string)
return string
"""
def removeWhitespace(self, string):
return string
2017-09-01 14:27:03 +02:00
2017-09-07 14:59:59 +02:00
#self.whitespaceFinder = re.compile(r'(\r\n|\r|\n|(\s)+)', re.IGNORECASE)
2017-09-01 14:27:03 +02:00
def removePunctuation(self, string, custom_symbols=None, keep=None):
symbols = self.symbols + (custom_symbols if custom_symbols is not None else [])
if hasattr(keep, '__iter__'):
for k in keep:
try:
symbols.remove(k)
except ValueError:
pass
# parse with spaCy
doc = self.parser(string)
tokens = []
# append Tokens to a list
for tok in doc:
if not tok.is_punct and not tok.is_space and tok.text not in symbols:
tokens.append(tok.text)
return " ".join(tokens)
def resolveAbbreviations(self,string):
return string #todo
def keepPOSandENT(self, string, customPOS=None, customEnt=None, remove=None):
pos2keep = self.pos2keep + (customPOS if customPOS is not None else [])
ent = self.entities2keep + (customEnt if customEnt is not None else [])
if hasattr(remove, '__iter__'):
for k in remove:
try:
ent.remove(k)
except ValueError:
try:
pos2keep.remove(k)
except ValueError:
pass
# parse with spaCy
spacy_doc = self.parser(string)
tokens = []
# append Tokens to a list
for tok in spacy_doc:
if tok.pos_ in pos2keep:
tokens.append(tok.text)
if tok.ent_type_ in ent:
tokens.append(tok.text)
return " ".join(set(tokens))
def removeWords(self,string, custom_words=None, keep=None, lemmatize=False):
wordlist = self.stop_words + (custom_words if custom_words is not None else [])
if hasattr(keep, '__iter__'):
for k in keep:
try:
wordlist.remove(k)
except ValueError:
pass
string = self.urlFinder.sub("URL", string)
string = self.emailFinder.sub("EMAIL", string)
string = self.mentionFinder.sub("MENTION", string)
string = string.replace("&amp;", "and").replace("&gt;", ">").replace("&lt;", "<")
# parse with spaCy
spacy_doc = self.parser(string)
tokens = []
# append Tokens to a list
for tok in spacy_doc:
#do not include stopwords/customwords and single chars
if tok.text not in wordlist and len(tok)>1:
if lemmatize:
tokens.append(tok.lemma_)
else:
tokens.append(tok.lower_)
return " ".join(set(tokens))
def normalizeSynonyms(self, string, default_return_first_Syn=False):
# parse with spaCy
spacy_doc = self.parser(string)
tokens = []
tokens = [str(self.getFirstSynonym(tok, self.thesaurus, default_return_first_Syn=default_return_first_Syn)) for tok in spacy_doc]
return " ".join(set(tokens))
def getFirstSynonym(self,word, thesaurus, default_return_first_Syn=False):
if not isinstance(word, str):
return word
word = word.lower()
# durch den thesaurrus iterieren
for syn_block in thesaurus: # syn_block ist eine liste mit Synonymen
for syn in syn_block:
syn = syn.lower()
if re.match(r'\A[\w-]+\Z', syn): # falls syn einzelwort ist
if word == syn:
return self.getHauptform(syn_block, word, default_return_first_Syn=default_return_first_Syn)
else: # falls es ein satz ist
if word in syn:
return self.getHauptform(syn_block, word, default_return_first_Syn=default_return_first_Syn)
return word # zur Not, das ursrpüngliche Wort zurückgeben
def getHauptform(self,syn_block, word, default_return_first_Syn=False):
for syn in syn_block:
syn = syn.lower()
if "hauptform" in syn and len(syn.split(" ")) <= 2:
# nicht ausgeben, falls es in Klammern steht
for w in syn.split(" "):
if not re.match(r'\([^)]+\)', w):
return w
if default_return_first_Syn:
# falls keine hauptform enthalten ist, das erste Synonym zurückgeben, was kein satz ist und nicht in klammern steht
for w in syn_block:
if not re.match(r'\([^)]+\)', w):
return w
return word # zur Not, das ursrpüngliche Wort zurückgeben
"""
2017-09-01 14:27:03 +02:00
#################################################################################################################
#todo funzt irgendwie nich wie's soll: https://mathieularose.com/function-composition-in-python/
def compose(self,*functions):
return functools.reduce(lambda f, g: lambda x: f(g(x)), functions, lambda x: x)
pipeline = compose(functools.partial(cleaner.keepPOSandENT,lemmatize=True))#, cleaner.normalizeSynonyms)
#################################################################################################################
"""
2017-09-01 14:27:03 +02:00