255 lines
7.1 KiB
Python
255 lines
7.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
import configparser as ConfigParser
|
|
import csv
|
|
import functools
|
|
import logging
|
|
import random
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
import pickle
|
|
import spacy
|
|
import textacy
|
|
from scipy import *
|
|
import os
|
|
import glob, os
|
|
from textacy.fileio import open_sesame
|
|
import json
|
|
from spacy.tokens.doc import Doc as SpacyDoc
|
|
|
|
csv.field_size_limit(sys.maxsize)
|
|
FILEPATH = os.path.dirname(os.path.realpath(__file__)) + "/"
|
|
|
|
|
|
|
|
# load config
|
|
config_ini = FILEPATH + "config.ini"
|
|
|
|
config = ConfigParser.ConfigParser()
|
|
with open(config_ini) as f:
|
|
config.read_file(f)
|
|
|
|
|
|
|
|
# config logging
|
|
filename = FILEPATH + config.get("logging","filename")
|
|
level = config.get("logging","level")
|
|
if level == "INFO":
|
|
level = logging.INFO
|
|
elif level == "DEBUG":
|
|
level = logging.DEBUG
|
|
elif level == "WARNING":
|
|
level = logging.WARNING
|
|
logging.basicConfig(filename=filename, level=level)
|
|
|
|
|
|
|
|
def logprint(string, level="INFO"):
|
|
"""log and prints"""
|
|
print(string)
|
|
if level == "INFO":
|
|
logging.info(string)
|
|
elif level == "DEBUG":
|
|
logging.debug(string)
|
|
elif level == "WARNING":
|
|
logging.warning(string)
|
|
|
|
|
|
def compose(*functions):
|
|
def compose2(f, g):
|
|
return lambda x: f(g(x))
|
|
|
|
return functools.reduce(compose2, functions, lambda x: x)
|
|
|
|
|
|
def get_calling_function():
|
|
"""finds the calling function in many decent cases.
|
|
https://stackoverflow.com/questions/39078467/python-how-to-get-the-calling-function-not-just-its-name
|
|
"""
|
|
fr = sys._getframe(1) # inspect.stack()[1][0]
|
|
co = fr.f_code
|
|
for get in (
|
|
lambda: fr.f_globals[co.co_name],
|
|
lambda: getattr(fr.f_locals['self'], co.co_name),
|
|
lambda: getattr(fr.f_locals['cls'], co.co_name),
|
|
lambda: fr.f_back.f_locals[co.co_name], # nested
|
|
lambda: fr.f_back.f_locals['func'], # decorators
|
|
lambda: fr.f_back.f_locals['meth'],
|
|
lambda: fr.f_back.f_locals['f'],
|
|
):
|
|
try:
|
|
func = get()
|
|
except (KeyError, AttributeError):
|
|
pass
|
|
else:
|
|
if func.__code__ == co:
|
|
return func
|
|
raise AttributeError("func not found")
|
|
|
|
|
|
def save_obj(obj, path):
|
|
with open(path , 'wb') as f:
|
|
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
|
|
|
|
def load_obj(path):
|
|
with open(path, 'rb') as f:
|
|
return pickle.load(f)
|
|
|
|
|
|
def replaceRockDots():
|
|
return lambda string: re.sub(r'[ß]', "ss",
|
|
(re.sub(r'[ö]', "oe",
|
|
(re.sub(r'[ü]', "ue", (re.sub(r'[ä]', "ae", string.lower())))))))
|
|
|
|
def list_from_files(*paths):
|
|
"""
|
|
create string-list from file like
|
|
n1
|
|
n2
|
|
n3
|
|
|
|
:param paths: list(str) or str if single path
|
|
:return: list(str)
|
|
"""
|
|
|
|
listlist = []
|
|
for path in paths:
|
|
listlist.append(list(textacy.fileio.read_file_lines(path)))
|
|
|
|
#liste von listen zu einer liste
|
|
liste = [item for sublist in listlist for item in sublist]
|
|
|
|
return list(map(textacy.preprocess.normalize_whitespace, liste))
|
|
|
|
def debug():
|
|
pass
|
|
|
|
def normalize(string):
|
|
# replaceRockDots
|
|
string = re.sub(r'[ß]', "ss", string.lower())
|
|
string = re.sub(r'[ö]', "oe", string)
|
|
string = re.sub(r'[ü]', "ue", string)
|
|
string = re.sub(r'[ä]', "ae", string)
|
|
string = textacy.preprocess.normalize_whitespace(string)
|
|
return string
|
|
|
|
|
|
def deprecated(func):
|
|
"""This is a decorator which can be used to mark functions
|
|
as deprecated. It will result in a warning being emmitted
|
|
when the function is used."""
|
|
|
|
@functools.wraps(func)
|
|
def new_func(*args, **kwargs):
|
|
warnings.simplefilter('always', DeprecationWarning) #turn off filter
|
|
warnings.warn("Call to deprecated function {}.".format(func.__name__), category=DeprecationWarning, stacklevel=2)
|
|
warnings.simplefilter('default', DeprecationWarning) #reset filter
|
|
return func(*args, **kwargs)
|
|
|
|
return new_func
|
|
|
|
|
|
def printRandomDoc(textacyCorpus):
|
|
"""
|
|
printlogss random doc out of a textacy-Corpus
|
|
:param textacyCorpus:
|
|
"""
|
|
print()
|
|
if len(textacyCorpus) == 0:
|
|
logprint("NO DOCS IN CORPUS")
|
|
else:
|
|
#printlog("len(textacyCorpus) = %i" % len(textacyCorpus))
|
|
randIndex = int((len(textacyCorpus) - 1) * random.random())
|
|
logprint("Index: {0} \n Text: {1} \n categoryName: {2}\n".format(randIndex, textacyCorpus[randIndex].text,
|
|
textacyCorpus[randIndex].metadata['categoryName']))
|
|
|
|
print()
|
|
|
|
def get_list_from_config(section,option):
|
|
return list(map(textacy.preprocess.normalize_whitespace,config.get(section,option).split(",")))
|
|
|
|
def corpus2Text(corpus):
|
|
for doc in corpus:
|
|
yield doc.text
|
|
|
|
def corpus2Meta(corpus):
|
|
for doc in corpus:
|
|
yield doc.metadata
|
|
|
|
def savelabledCorpiLines(corpus,filepath):
|
|
|
|
textacy.fileio.write_file_lines(gen_labledLines(corpus), filepath=filepath)
|
|
|
|
def gen_labledLines(corpus):
|
|
for doc in corpus:
|
|
# generate [topic1, topic2....] tok1 tok2 tok3 out of corpi
|
|
yield "[" + doc.metadata["categoryName"] + "] " + doc.text
|
|
|
|
|
|
def save_corpus(corpus, corpus_path, corpus_name):
|
|
"""
|
|
saves a textacy-corpus including spacy-parser
|
|
:param corpus: textacy-Corpus
|
|
:param corpus_path: str
|
|
:param corpus_name: str (should content the language like "_de_")
|
|
"""
|
|
|
|
# save parser
|
|
parser = corpus.spacy_lang
|
|
parserpath = corpus_path + str(parser.lang) + '_parser'
|
|
parser.save_to_directory(parserpath)
|
|
|
|
# save plain content + meta
|
|
plainpath = corpus_path + corpus_name + "_content.json"
|
|
textacy.fileio.write_json_lines(gen_dicts(corpus), plainpath)
|
|
|
|
|
|
|
|
def gen_dicts(corpus):
|
|
for doc in corpus:
|
|
dict = {"index" : doc.corpus_index, "content" : doc.text}
|
|
dict.update(doc.metadata)
|
|
yield dict
|
|
|
|
|
|
|
|
def load_corpus(corpus_path, corpus_name, lang="de"):
|
|
"""
|
|
Load textacy-Corpus including spacy-parser out from file
|
|
:param corpus_path: str
|
|
:param corpus_name: str (should content the language like "_de_")
|
|
:param lang: str (language code) ir spacy.Language
|
|
:return: texracy.Corpus, spacy.language
|
|
"""
|
|
|
|
#ckeck for language
|
|
if "de_" in corpus_name:
|
|
lang="de"
|
|
elif "en_" in corpus_name:
|
|
lang ="en"
|
|
|
|
|
|
# load parser
|
|
parser = spacy.load(lang)
|
|
|
|
stringstorepath = corpus_path + str(lang) + '_parser'+'/vocab/strings.json'
|
|
with open(stringstorepath) as file:
|
|
parser.vocab.strings.load(file)
|
|
|
|
vocabpath = Path(corpus_path + str(lang) + '_parser'+'/vocab/lexemes.bin')
|
|
parser.vocab.load_lexemes(vocabpath)
|
|
|
|
#load corpus
|
|
corpus = textacy.Corpus(parser)
|
|
|
|
plainpath = corpus_path + corpus_name + "_content.json"
|
|
plain_stream = textacy.fileio.read_json_lines(plainpath) # yields {int : str}
|
|
|
|
for plain in plain_stream:
|
|
meta = {}
|
|
for key,value in plain.items():
|
|
if key != "content" and key != "index":
|
|
meta[key] = value
|
|
corpus.add_doc(textacy.Doc(plain["content"], lang=corpus.spacy_lang, metadata=meta))
|
|
|
|
return corpus, corpus.spacy_lang |