# -*- coding: utf-8 -*- from datetime import datetime import configparser as ConfigParser import csv import functools import logging import random import re import sys from pathlib import Path import pickle import spacy import textacy from scipy import * import os import glob, os from textacy.fileio import open_sesame import json from spacy.tokens.doc import Doc as SpacyDoc import operator csv.field_size_limit(sys.maxsize) FILEPATH = os.path.dirname(os.path.realpath(__file__)) + "/" # load config config_ini = FILEPATH + "config.ini" config = ConfigParser.ConfigParser() with open(config_ini) as f: config.read_file(f) # config logging filename = FILEPATH + config.get("logging","filename") level = config.get("logging","level") if level == "INFO": level = logging.INFO elif level == "DEBUG": level = logging.DEBUG elif level == "WARNING": level = logging.WARNING logging.basicConfig(filename=filename, level=level) def logprint(string, level="INFO"): """log and prints""" string = "{}\t".format(datetime.now()) + str(string) print(string) if level == "INFO": logging.info(string) elif level == "DEBUG": logging.debug(string) elif level == "WARNING": logging.warning(string) def compose(*functions): def compose2(f, g): return lambda x: f(g(x)) return functools.reduce(compose2, functions, lambda x: x) def get_calling_function(): """finds the calling function in many decent cases. https://stackoverflow.com/questions/39078467/python-how-to-get-the-calling-function-not-just-its-name """ fr = sys._getframe(1) # inspect.stack()[1][0] co = fr.f_code for get in ( lambda: fr.f_globals[co.co_name], lambda: getattr(fr.f_locals['self'], co.co_name), lambda: getattr(fr.f_locals['cls'], co.co_name), lambda: fr.f_back.f_locals[co.co_name], # nested lambda: fr.f_back.f_locals['func'], # decorators lambda: fr.f_back.f_locals['meth'], lambda: fr.f_back.f_locals['f'], ): try: func = get() except (KeyError, AttributeError): pass else: if func.__code__ == co: return func raise AttributeError("func not found") def save_obj(obj, path): with open(path , 'wb') as f: pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL) def load_obj(path): with open(path, 'rb') as f: return pickle.load(f) def replaceRockDots_lambda(): return lambda string : re.sub(r'[ß]', "ss", (re.sub(r'[ö]', "oe", (re.sub(r'[Ö]', "Oe", (re.sub(r'[ü]', "ue", (re.sub(r'[Ü]', "Ue", (re.sub(r'[ä]', "ae", (re.sub(r'[Ä]', "Ae", string))))))))))))) def replaceRockDots(string): return re.sub(r'[ß]', "ss", (re.sub(r'[ö]', "oe", (re.sub(r'[Ö]', "Oe", (re.sub(r'[ü]', "ue", (re.sub(r'[Ü]', "Ue", (re.sub(r'[ä]', "ae", (re.sub(r'[Ä]', "Ae", string))))))))))))) def list_from_files(*paths): """ create string-list from file like n1 n2 n3 :param paths: list(str) or str if single path :return: list(str) """ listlist = [] for path in paths: listlist.append(list(textacy.fileio.read_file_lines(path))) #liste von listen zu einer liste liste = [item for sublist in listlist for item in sublist] return list(map(textacy.preprocess.normalize_whitespace, liste)) def breakpoint(): pass def sort_dictionary(dict): return sorted(dict.items(), key=operator.itemgetter(1)) def normalize_str(string): """ replaceRockDots textacy.preprocess.normalize_whitespace :param string: str :return: str """ return textacy.preprocess.normalize_whitespace(replaceRockDots(string)) def deprecated(func): """This is a decorator which can be used to mark functions as deprecated. It will result in a warning being emmitted when the function is used.""" @functools.wraps(func) def new_func(*args, **kwargs): warnings.simplefilter('always', DeprecationWarning) #turn off filter warnings.warn("Call to deprecated function {}.".format(func.__name__), category=DeprecationWarning, stacklevel=2) warnings.simplefilter('default', DeprecationWarning) #reset filter return func(*args, **kwargs) return new_func def flatten(liste): return [item for sublist in liste for item in sublist] def printRandomDoc(textacyCorpus): """ printlogss random doc out of a textacy-Corpus :param textacyCorpus: """ print() if len(textacyCorpus) == 0: logprint("NO DOCS IN CORPUS") else: #printlog("len(textacyCorpus) = %i" % len(textacyCorpus)) randIndex = int((len(textacyCorpus) - 1) * random.random()) logprint("Index: {0} \n Text: {1} \n categoryName: {2}\n".format(randIndex, textacyCorpus[randIndex].text, textacyCorpus[randIndex].metadata['categoryName'])) print() def get_list_from_config(section,option): return list(map(textacy.preprocess.normalize_whitespace,config.get(section,option).split(","))) def corpus2Text(corpus): for doc in corpus: yield doc.text def corpus2Meta(corpus): for doc in corpus: yield doc.metadata def savelabledCorpiLines_cat(corpus, filepath): textacy.fileio.write_file_lines(gen_labledLines(corpus), filepath=filepath) def gen_labledLines(corpus, label ="categoryName"): for doc in corpus: # generate [topic1, topic2....] tok1 tok2 tok3 out of corpi yield "[" + doc.metadata[label] + "] " + doc.text def save_corpus(corpus, corpus_path, corpus_name): """ saves a textacy-corpus including spacy-parser :param corpus: textacy-Corpus :param corpus_path: str :param corpus_name: str (should content the language like "_de_") """ # save parser parser = corpus.spacy_lang parserpath = corpus_path + str(parser.lang) + '_parser' parser.save_to_directory(parserpath) # save plain content + meta plainpath = corpus_path + corpus_name + "_content.json" textacy.fileio.write_json_lines(gen_dicts(corpus), plainpath) def gen_dicts(corpus): for doc in corpus: dict = {"index" : doc.corpus_index, "content" : doc.text} dict.update(doc.metadata) yield dict def multisub(subs, subject): #https://stackoverflow.com/questions/764360/a-list-of-string-replacements-in-python "Simultaneously perform all substitutions on the subject string." pattern = '|'.join('(%s)' % re.escape(p) for p, s in subs) substs = [s for p, s in subs] replace = lambda m: substs[m.lastindex - 1] return re.sub(pattern, replace, subject) def load_corpus(corpus_path, corpus_name, lang="de"): """ Load textacy-Corpus including spacy-parser out from file :param corpus_path: str :param corpus_name: str (should content the language like "_de_") :param lang: str (language code) ir spacy.Language :return: texracy.Corpus, spacy.language """ #ckeck for language if "de_" in corpus_name: lang="de" elif "en_" in corpus_name: lang ="en" # load parser parser = spacy.load(lang) stringstorepath = corpus_path + str(lang) + '_parser'+'/vocab/strings.json' with open(stringstorepath) as file: parser.vocab.strings.load(file) vocabpath = Path(corpus_path + str(lang) + '_parser'+'/vocab/lexemes.bin') parser.vocab.load_lexemes(vocabpath) #load corpus corpus = textacy.Corpus(parser) plainpath = corpus_path + corpus_name + "_content.json" plain_stream = textacy.fileio.read_json_lines(plainpath) # yields {int : str} for plain in plain_stream: meta = {} for key,value in plain.items(): if key != "content" and key != "index": meta[key] = value corpus.add_doc(textacy.Doc(plain["content"], lang=corpus.spacy_lang, metadata=meta)) return corpus, corpus.spacy_lang