# -*- coding: utf-8 -*- import csv import functools import os.path import re import subprocess import time import xml.etree.ElementTree as ET import sys import spacy import textacy from scipy import * from textacy import Vectorizer csv.field_size_limit(sys.maxsize) path2xml = "ticket.xml" import de_core_news_md PARSER = de_core_news_md.load() corpus = textacy.Corpus(PARSER) thesauruspath = "openthesaurus.csv" THESAURUS = list(textacy.fileio.read_csv(thesauruspath, delimiter=";")) def printRandomDoc(textacyCorpus): import random print() print("len(textacyCorpus) = %i" % len(textacyCorpus)) randIndex = int((len(textacyCorpus) - 1) * random.random()) print("Index: {0} ; Text: {1} ; Metadata: {2}".format(randIndex, textacyCorpus[randIndex].text, textacyCorpus[randIndex].metadata)) print() def generateMainTextfromTicketXML(path2xml, main_textfield='Beschreibung'): """ generates strings from XML :param path2xml: :param main_textfield: :param cleaning_function: :yields strings """ tree = ET.parse(path2xml, ET.XMLParser(encoding="utf-8")) root = tree.getroot() for ticket in root: for field in ticket: if field.tag == main_textfield: yield field.text def generateMetadatafromTicketXML(path2xml, leave_out=['Beschreibung']): tree = ET.parse(path2xml, ET.XMLParser(encoding="utf-8")) root = tree.getroot() for ticket in root: metadata = {} for field in ticket: if field.tag not in leave_out: metadata[field.tag] = field.text yield metadata def processTextstream(textstream, funclist, parser=PARSER): # input:str-stream output:str-stream pipe = parser.pipe(textstream) for doc in pipe: tokens = [tok for tok in doc] for f in funclist: if 'bool' in str(f.__annotations__): tokens = list(filter(f,tokens)) elif 'str' in str(f.__annotations__): x=0 tokens = list(map(f, tokens)) #tokens = [f(tok.lower_) for tok in tokens] #purer text doc = parser(" ".join(tokens)) #geparsed tokens = [tok for tok in doc] #nur tokens elif 'spacy.tokens.Doc' in str(f.__annotations__): tokens = [tok for tok in f(tokens)] yield " ".join([tok.lower_ for tok in tokens]) def processDictstream(dictstream, funcdict, parser=PARSER): #todo das selbe wie mit textstream idee: processDoc(doc,funcs) for dic in dictstream: result = {} for key, value in dic.items(): if key in funcdict: result[key] = funcdict[key](parser(value)) else: result[key] = value yield result def keepPOS(pos_list) -> bool: ret = lambda tok : tok.pos_ in pos_list ret.__annotations__ = keepPOS.__annotations__ return ret def removePOS(pos_list)-> bool: ret = lambda tok : tok.pos_ not in pos_list ret.__annotations__ = removePOS.__annotations__ return ret def removeWords(words, keep=None)-> bool: #todo in:str oder str-list if hasattr(keep, '__iter__'): for k in keep: try: words.remove(k) except ValueError: pass ret = lambda tok : tok.lower_ not in words ret.__annotations__ = removeWords.__annotations__ return ret def keepENT(ent_list) -> bool: ret = lambda tok : tok.ent_type_ in ent_list ret.__annotations__ = keepENT.__annotations__ return ret def removeENT(ent_list) -> bool: ret = lambda tok: tok.ent_type_ not in ent_list ret.__annotations__ = removeENT.__annotations__ return ret def keepUniqueTokens() -> spacy.tokens.Doc: ret = lambda doc: (set([tok.lower_ for tok in doc])) ret.__annotations__ = keepUniqueTokens.__annotations__ return ret def lemmatize() -> str: ret = lambda tok: tok.lemma_ ret.__annotations__ = lemmatize.__annotations__ return ret mentionFinder = re.compile(r"@[a-z0-9_]{1,15}", re.IGNORECASE) emailFinder = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE) urlFinder = re.compile(r"^(?:https?:\/\/)?(?:www\.)?[a-zA-Z0-9./]+$", re.IGNORECASE) def replaceEmails(replace_with="EMAIL") -> str: ret = lambda tok : emailFinder.sub(replace_with, tok.lower_) ret.__annotations__ = replaceEmails.__annotations__ return ret def replaceURLs(replace_with="URL") -> str: ret = lambda tok: textacy.preprocess.replace_urls(tok.lower_,replace_with=replace_with) #ret = lambda tok: urlFinder.sub(replace_with,tok.lower_) ret.__annotations__ = replaceURLs.__annotations__ return ret def replaceTwitterMentions(replace_with="TWITTER_MENTION") -> str: ret = lambda tok : mentionFinder.sub(replace_with,tok.lower_) ret.__annotations__ = replaceTwitterMentions.__annotations__ return ret def replaceNumbers(replace_with="NUMBER") -> str: ret = lambda tok: textacy.preprocess.replace_numbers(tok.lower_, replace_with=replace_with) ret.__annotations__ = replaceNumbers.__annotations__ return ret def replacePhonenumbers(replace_with="PHONENUMBER",parser=PARSER): ret = lambda tok: textacy.preprocess.replace_phone_numbers(tok.lower_, replace_with=replace_with) ret.__annotations__ = replacePhonenumbers.__annotations__ return ret def resolveAbbreviations(): pass #todo def normalizeSynonyms(default_return_first_Syn=False) -> str: ret = lambda tok : getFirstSynonym(tok.lower_, default_return_first_Syn=default_return_first_Syn) ret.__annotations__ = normalizeSynonyms.__annotations__ return ret def getFirstSynonym(word, thesaurus=THESAURUS, default_return_first_Syn=False): if not isinstance(word, str): return str(word) word = word.lower() # durch den thesaurrus iterieren for syn_block in thesaurus: # syn_block ist eine liste mit Synonymen for syn in syn_block: syn = syn.lower() if re.match(r'\A[\w-]+\Z', syn): # falls syn einzelwort ist if word == syn: return str(getHauptform(syn_block, word, default_return_first_Syn=default_return_first_Syn)) else: # falls es ein satz ist if word in syn: return str(getHauptform(syn_block, word, default_return_first_Syn=default_return_first_Syn)) return str(word) # zur Not, das ursrpüngliche Wort zurückgeben def getHauptform(syn_block, word, default_return_first_Syn=False): for syn in syn_block: syn = syn.lower() if "hauptform" in syn and len(syn.split(" ")) <= 2: # nicht ausgeben, falls es in Klammern steht#todo gibts macnmal?? klammern aus for w in syn.split(" "): if not re.match(r'\([^)]+\)', w): return w if default_return_first_Syn: # falls keine hauptform enthalten ist, das erste Synonym zurückgeben, was kein satz ist und nicht in klammern steht for w in syn_block: if not re.match(r'\([^)]+\)', w): return w return word # zur Not, das ursrpüngliche Wort zurückgeben stop_words=list(__import__("spacy." + PARSER.lang, globals(), locals(), ['object']).STOP_WORDS) clean_in_content=[ removePOS(["SPACE"]), removeWords(["dezernat"]), removePOS(["PUNCT"]), replaceURLs(), removePOS(["NUM"]), lemmatize(), removeWords(stop_words), keepUniqueTokens(), normalizeSynonyms() ] ## add files to textacy-corpus, print("add texts to textacy-corpus...") corpus.add_texts( processTextstream(generateMainTextfromTicketXML(path2xml), clean_in_content), ) printRandomDoc(corpus)