# -*- coding: utf-8 -*- from datetime import datetime import csv import sys from miscellaneous import * from datetime import datetime import time import textacy from scipy import * import os from preprocessing import removePOS from preprocessing import filterTokens csv.field_size_limit(sys.maxsize) FILEPATH = os.path.dirname(os.path.realpath(__file__)) + "/" # load config config_ini = FILEPATH + "config.ini" config = ConfigParser.ConfigParser() with open(config_ini) as f: config.read_file(f) REGEX_SPECIALCHAR = r'[`\=~%^&*()_+\[\]{};\'"|]' #+r',.-\\:' #+r',.?!' WORDS= {} ########################## Spellchecking ########################################## # http://norvig.com/spell-correct.html # http://wortschatz.uni-leipzig.de/en/download import re def words(text): return re.findall(r'\w+', text.lower()) def P(word, N=sum(WORDS.values())): "Probability of `word`." return WORDS[word] / N def correction(word): "Most probable spelling correction for word." return max(candidates(word), key=P) def candidates(word): "Generate possible spelling corrections for word." return (known([word]) or known(edits1(word)) or known(edits2(word)) or [word]) def known(words): "The subset of `words` that appear in the dictionary of WORDS." return set(w for w in words if w in WORDS) def edits1(word): "All edits that are one edit away from `word`." letters = 'abcdefghijklmnopqrstuvwxyz' splits = [(word[:i], word[i:]) for i in range(len(word) + 1)] deletes = [L + R[1:] for L, R in splits if R] transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R) > 1] replaces = [L + c + R[1:] for L, R in splits if R for c in letters] inserts = [L + c + R for L, R in splits for c in letters] return set(deletes + transposes + replaces + inserts) def edits2(word): "All edits that are two edits away from `word`." return (e2 for e1 in edits1(word) for e2 in edits1(e1)) def autocorrectWord(word): try: return correction(word) except: return word ############# stringcleaning def clean(stringstream,autocorrect=False): for string in stringstream: # fixUnicode string = textacy.preprocess.fix_bad_unicode(string.lower(), normalization=u'NFC') # seperate_words_on_regex: string = " ".join(re.compile(REGEX_SPECIALCHAR).split(string)) #normalize whitespace string = textacy.preprocess.normalize_whitespace(string) #remove linebreaks string = re.sub(r'[\n]', " ", string) # replaceRockDots string = re.sub(r'[ß]', "ss", string) string = re.sub(r'[ö]', "oe", string) string = re.sub(r'[ü]', "ue", string) string = re.sub(r'[ä]', "ae", string) #frage autocorrect? idee http://lexitron.nectec.or.th/public/COLING-2010_Beijing_China/POSTERS/pdf/POSTERS022.pdf if autocorrect: string = " ".join([autocorrectWord(word) for word in string.split()]) yield string def processDictstream(dictstream, funcdict, parser): """ :param dictstream: dict-gen :param funcdict: clean_in_meta = { "Solution":funclist, ... } :param parser: spacy-parser :return: dict-gen """ for dic in dictstream: result = {} for key, value in dic.items(): if key in funcdict: doc = parser(value) tokens = [tok for tok in doc] funclist = funcdict[key] tokens = filterTokens(tokens, funclist) result[key] = " ".join([tok.lower_ for tok in tokens]) else: result[key] = value yield result ################################################################################################## ressources_path = FILEPATH + "ressources/" path2wordsdict = ressources_path + config.get("spellchecking", "pickle_file") corpus_de_path = FILEPATH + config.get("de_corpus", "path") corpus_en_path = FILEPATH + config.get("en_corpus", "path") autocorrect = config.getboolean("preprocessing", "autocorrect") def cleanCorpus(corpus_path, clean_in_meta, lang="de", printrandom=10,autocorrect=False): logprint("Clean {0}_corpus at {1}".format(lang, datetime.now())) rawCorpus_name = lang + "_raw_ticket" cleanCorpus_name = lang + "_clean_ticket" #load raw corpus and create new one raw_corpus, parser = load_corpus(corpus_name=rawCorpus_name, corpus_path=corpus_path) clean_corpus = textacy.Corpus(parser) ## process and add files to textacy-corpi, clean_corpus.add_texts( clean(corpus2Text(raw_corpus),autocorrect=autocorrect), processDictstream(corpus2Meta(raw_corpus), clean_in_meta,parser=parser) ) # leere docs aus corpi kicken clean_corpus.remove(lambda doc: len(doc) == 0) for i in range(printrandom): printRandomDoc(clean_corpus) #save corpus save_corpus(corpus=clean_corpus, corpus_path=corpus_path, corpus_name=cleanCorpus_name) return clean_corpus def main(): start = time.time() WORDS = load_obj(path2wordsdict) clean_in_meta = { "Solution": [removePOS(["SPACE"])], "Subject": [removePOS(["SPACE", "PUNCT"])], "categoryName": [removePOS(["SPACE", "PUNCT"])] } corpus = cleanCorpus(corpus_de_path, clean_in_meta, "de",printrandom=5, autocorrect=autocorrect ) end = time.time() logprint("Time Elapsed Cleaning:{0} min".format((end - start) / 60)) if __name__ == "__main__": main()