223 lines
5.7 KiB
Python
223 lines
5.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
import configparser as ConfigParser
|
|
import csv
|
|
import functools
|
|
import logging
|
|
import random
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
import pickle
|
|
import spacy
|
|
import textacy
|
|
from scipy import *
|
|
import os
|
|
|
|
csv.field_size_limit(sys.maxsize)
|
|
FILEPATH = os.path.dirname(os.path.realpath(__file__)) + "/"
|
|
|
|
|
|
|
|
# load config
|
|
config_ini = FILEPATH + "config.ini"
|
|
|
|
config = ConfigParser.ConfigParser()
|
|
with open(config_ini) as f:
|
|
config.read_file(f)
|
|
|
|
|
|
|
|
# config logging
|
|
filename = FILEPATH + config.get("logging","filename")
|
|
level = config.get("logging","level")
|
|
if level == "INFO":
|
|
level = logging.INFO
|
|
elif level == "DEBUG":
|
|
level = logging.DEBUG
|
|
elif level == "WARNING":
|
|
level = logging.WARNING
|
|
logging.basicConfig(filename=filename, level=level)
|
|
|
|
|
|
|
|
def printlog(string, level="INFO"):
|
|
"""log and prints"""
|
|
print(string)
|
|
if level == "INFO":
|
|
logging.info(string)
|
|
elif level == "DEBUG":
|
|
logging.debug(string)
|
|
elif level == "WARNING":
|
|
logging.warning(string)
|
|
|
|
|
|
def compose(*functions):
|
|
def compose2(f, g):
|
|
return lambda x: f(g(x))
|
|
|
|
return functools.reduce(compose2, functions, lambda x: x)
|
|
|
|
|
|
def get_calling_function():
|
|
"""finds the calling function in many decent cases.
|
|
https://stackoverflow.com/questions/39078467/python-how-to-get-the-calling-function-not-just-its-name
|
|
"""
|
|
fr = sys._getframe(1) # inspect.stack()[1][0]
|
|
co = fr.f_code
|
|
for get in (
|
|
lambda: fr.f_globals[co.co_name],
|
|
lambda: getattr(fr.f_locals['self'], co.co_name),
|
|
lambda: getattr(fr.f_locals['cls'], co.co_name),
|
|
lambda: fr.f_back.f_locals[co.co_name], # nested
|
|
lambda: fr.f_back.f_locals['func'], # decorators
|
|
lambda: fr.f_back.f_locals['meth'],
|
|
lambda: fr.f_back.f_locals['f'],
|
|
):
|
|
try:
|
|
func = get()
|
|
except (KeyError, AttributeError):
|
|
pass
|
|
else:
|
|
if func.__code__ == co:
|
|
return func
|
|
raise AttributeError("func not found")
|
|
|
|
|
|
def save_obj(obj, path):
|
|
with open(path , 'wb') as f:
|
|
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
|
|
|
|
def load_obj(path):
|
|
with open(path, 'rb') as f:
|
|
return pickle.load(f)
|
|
|
|
def replaceRockDots():
|
|
return lambda string: re.sub(r'[ß]', "ss",
|
|
(re.sub(r'[ö]', "oe",
|
|
(re.sub(r'[ü]', "ue", (re.sub(r'[ä]', "ae", string.lower())))))))
|
|
|
|
def list_from_files(*paths):
|
|
"""
|
|
create string-list from file like
|
|
n1
|
|
n2
|
|
n3
|
|
|
|
:param paths: list(str) or str if single path
|
|
:return: list(str)
|
|
"""
|
|
|
|
listlist = []
|
|
for path in paths:
|
|
listlist.append(list(textacy.fileio.read_file_lines(path)))
|
|
|
|
#liste von listen zu einer liste
|
|
liste = [item for sublist in listlist for item in sublist]
|
|
|
|
return list(map(textacy.preprocess.normalize_whitespace, liste))
|
|
|
|
|
|
|
|
|
|
|
|
def printRandomDoc(textacyCorpus):
|
|
"""
|
|
printlogss random doc out of a textacy-Corpus
|
|
:param textacyCorpus:
|
|
"""
|
|
print()
|
|
if len(textacyCorpus) == 0:
|
|
printlog("NO DOCS IN CORPUS")
|
|
else:
|
|
printlog("len(textacyCorpus) = %i" % len(textacyCorpus))
|
|
randIndex = int((len(textacyCorpus) - 1) * random.random())
|
|
printlog("Index: {0} \n Text: {1} \n Metadata: {2}\n".format(randIndex, textacyCorpus[randIndex].text,
|
|
textacyCorpus[randIndex].metadata['categoryName']))
|
|
|
|
print()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_corpus(corpus, corpus_path, corpus_name):
|
|
"""
|
|
saves a textacy-corpus including spacy-parser
|
|
:param corpus: textacy-Corpus
|
|
:param corpus_path: str
|
|
:param corpus_name: str (should content the language like "_de_")
|
|
"""
|
|
|
|
"""
|
|
# save stringstore
|
|
stringstore_path = corpus_path + corpus_name + '_strings.json'
|
|
with open(stringstore_path, "w") as file:
|
|
parser.vocab.strings.dump(file)
|
|
|
|
#todo save vocab?
|
|
"""
|
|
|
|
# save parser
|
|
parser = corpus.spacy_lang
|
|
parserpath = corpus_path + str(parser.lang) + '_parser'
|
|
parser.save_to_directory(parserpath)
|
|
|
|
# save content
|
|
contentpath = corpus_path + corpus_name + "_content.bin"
|
|
textacy.fileio.write_spacy_docs((doc.spacy_doc for doc in corpus), contentpath)
|
|
|
|
# save meta
|
|
metapath = corpus_path + corpus_name + "_meta.json"
|
|
textacy.fileio.write_json_lines((doc.metadata for doc in corpus), metapath)
|
|
|
|
|
|
|
|
|
|
|
|
def load_corpus(corpus_path, corpus_name, lang="de"):
|
|
"""
|
|
Load textacy-Corpus including spacy-parser out from file
|
|
:param corpus_path: str
|
|
:param corpus_name: str (should content the language like "_de_")
|
|
:param lang: str (language code) ir spacy.Language
|
|
:return: texracy.Corpus, spacy.language
|
|
"""
|
|
|
|
#ckeck for language
|
|
if "de_" in corpus_name:
|
|
lang="de"
|
|
elif "en_" in corpus_name:
|
|
lang ="en"
|
|
|
|
|
|
# load parser
|
|
parser = spacy.load(lang)
|
|
|
|
|
|
stringstorepath = corpus_path + str(lang) + '_parser'+'/vocab/strings.json'
|
|
with open(stringstorepath) as file:
|
|
parser.vocab.strings.load(file)
|
|
|
|
vocabpath = Path(corpus_path + str(lang) + '_parser'+'/vocab/lexemes.bin')
|
|
parser.vocab.load_lexemes(vocabpath)
|
|
|
|
#load corpus
|
|
corpus = textacy.Corpus(parser)
|
|
|
|
|
|
contentpath = corpus_path + corpus_name + "_content.bin"
|
|
metapath = corpus_path + corpus_name + "_meta.json"
|
|
|
|
|
|
metadata_stream = textacy.fileio.read_json_lines(metapath)
|
|
spacy_docs = textacy.fileio.read_spacy_docs(corpus.spacy_vocab, contentpath)
|
|
for spacy_doc, metadata in zip(spacy_docs, metadata_stream):
|
|
corpus.add_doc(
|
|
textacy.Doc(spacy_doc, lang=corpus.spacy_lang, metadata=metadata))
|
|
return corpus, corpus.spacy_lang
|
|
|
|
|
|
|
|
|