# -*- coding: utf-8 -*- import configparser as ConfigParser import csv import functools import logging import random import re import sys from pathlib import Path import pickle import spacy import textacy from scipy import * import os import glob, os from textacy.fileio import open_sesame import json from spacy.tokens.doc import Doc as SpacyDoc csv.field_size_limit(sys.maxsize) FILEPATH = os.path.dirname(os.path.realpath(__file__)) + "/" # load config config_ini = FILEPATH + "config.ini" config = ConfigParser.ConfigParser() with open(config_ini) as f: config.read_file(f) # config logging filename = FILEPATH + config.get("logging","filename") level = config.get("logging","level") if level == "INFO": level = logging.INFO elif level == "DEBUG": level = logging.DEBUG elif level == "WARNING": level = logging.WARNING logging.basicConfig(filename=filename, level=level) def logprint(string, level="INFO"): """log and prints""" print(string) if level == "INFO": logging.info(string) elif level == "DEBUG": logging.debug(string) elif level == "WARNING": logging.warning(string) def compose(*functions): def compose2(f, g): return lambda x: f(g(x)) return functools.reduce(compose2, functions, lambda x: x) def get_calling_function(): """finds the calling function in many decent cases. https://stackoverflow.com/questions/39078467/python-how-to-get-the-calling-function-not-just-its-name """ fr = sys._getframe(1) # inspect.stack()[1][0] co = fr.f_code for get in ( lambda: fr.f_globals[co.co_name], lambda: getattr(fr.f_locals['self'], co.co_name), lambda: getattr(fr.f_locals['cls'], co.co_name), lambda: fr.f_back.f_locals[co.co_name], # nested lambda: fr.f_back.f_locals['func'], # decorators lambda: fr.f_back.f_locals['meth'], lambda: fr.f_back.f_locals['f'], ): try: func = get() except (KeyError, AttributeError): pass else: if func.__code__ == co: return func raise AttributeError("func not found") def save_obj(obj, path): with open(path , 'wb') as f: pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL) def load_obj(path): with open(path, 'rb') as f: return pickle.load(f) def replaceRockDots(): return lambda string: re.sub(r'[ß]', "ss", (re.sub(r'[ö]', "oe", (re.sub(r'[ü]', "ue", (re.sub(r'[ä]', "ae", string.lower()))))))) def list_from_files(*paths): """ create string-list from file like n1 n2 n3 :param paths: list(str) or str if single path :return: list(str) """ listlist = [] for path in paths: listlist.append(list(textacy.fileio.read_file_lines(path))) #liste von listen zu einer liste liste = [item for sublist in listlist for item in sublist] return list(map(textacy.preprocess.normalize_whitespace, liste)) def breakpoint(): pass def normalize(string): # replaceRockDots string = re.sub(r'[ß]', "ss", string.lower()) string = re.sub(r'[ö]', "oe", string) string = re.sub(r'[ü]', "ue", string) string = re.sub(r'[ä]', "ae", string) string = textacy.preprocess.normalize_whitespace(string) return string def deprecated(func): """This is a decorator which can be used to mark functions as deprecated. It will result in a warning being emmitted when the function is used.""" @functools.wraps(func) def new_func(*args, **kwargs): warnings.simplefilter('always', DeprecationWarning) #turn off filter warnings.warn("Call to deprecated function {}.".format(func.__name__), category=DeprecationWarning, stacklevel=2) warnings.simplefilter('default', DeprecationWarning) #reset filter return func(*args, **kwargs) return new_func def flatten(liste): return [item for sublist in liste for item in sublist] def printRandomDoc(textacyCorpus): """ printlogss random doc out of a textacy-Corpus :param textacyCorpus: """ print() if len(textacyCorpus) == 0: logprint("NO DOCS IN CORPUS") else: #printlog("len(textacyCorpus) = %i" % len(textacyCorpus)) randIndex = int((len(textacyCorpus) - 1) * random.random()) logprint("Index: {0} \n Text: {1} \n categoryName: {2}\n".format(randIndex, textacyCorpus[randIndex].text, textacyCorpus[randIndex].metadata['categoryName'])) print() def get_list_from_config(section,option): return list(map(textacy.preprocess.normalize_whitespace,config.get(section,option).split(","))) def corpus2Text(corpus): for doc in corpus: yield doc.text def corpus2Meta(corpus): for doc in corpus: yield doc.metadata def savelabledCorpiLines(corpus,filepath): textacy.fileio.write_file_lines(gen_labledLines(corpus), filepath=filepath) def gen_labledLines(corpus): for doc in corpus: # generate [topic1, topic2....] tok1 tok2 tok3 out of corpi yield "[" + doc.metadata["categoryName"] + "] " + doc.text def save_corpus(corpus, corpus_path, corpus_name): """ saves a textacy-corpus including spacy-parser :param corpus: textacy-Corpus :param corpus_path: str :param corpus_name: str (should content the language like "_de_") """ # save parser parser = corpus.spacy_lang parserpath = corpus_path + str(parser.lang) + '_parser' parser.save_to_directory(parserpath) # save plain content + meta plainpath = corpus_path + corpus_name + "_content.json" textacy.fileio.write_json_lines(gen_dicts(corpus), plainpath) def gen_dicts(corpus): for doc in corpus: dict = {"index" : doc.corpus_index, "content" : doc.text} dict.update(doc.metadata) yield dict def load_corpus(corpus_path, corpus_name, lang="de"): """ Load textacy-Corpus including spacy-parser out from file :param corpus_path: str :param corpus_name: str (should content the language like "_de_") :param lang: str (language code) ir spacy.Language :return: texracy.Corpus, spacy.language """ #ckeck for language if "de_" in corpus_name: lang="de" elif "en_" in corpus_name: lang ="en" # load parser parser = spacy.load(lang) stringstorepath = corpus_path + str(lang) + '_parser'+'/vocab/strings.json' with open(stringstorepath) as file: parser.vocab.strings.load(file) vocabpath = Path(corpus_path + str(lang) + '_parser'+'/vocab/lexemes.bin') parser.vocab.load_lexemes(vocabpath) #load corpus corpus = textacy.Corpus(parser) plainpath = corpus_path + corpus_name + "_content.json" plain_stream = textacy.fileio.read_json_lines(plainpath) # yields {int : str} for plain in plain_stream: meta = {} for key,value in plain.items(): if key != "content" and key != "index": meta[key] = value corpus.add_doc(textacy.Doc(plain["content"], lang=corpus.spacy_lang, metadata=meta)) return corpus, corpus.spacy_lang