topicModelingTickets/miscellaneous.py

286 lines
8.0 KiB
Python

# -*- coding: utf-8 -*-
import configparser as ConfigParser
import csv
import functools
import logging
import random
import re
import sys
from pathlib import Path
import pickle
import spacy
import textacy
from scipy import *
import os
import glob, os
from textacy.fileio import open_sesame
import json
from spacy.tokens.doc import Doc as SpacyDoc
import operator
csv.field_size_limit(sys.maxsize)
FILEPATH = os.path.dirname(os.path.realpath(__file__)) + "/"
# load config
config_ini = FILEPATH + "config.ini"
config = ConfigParser.ConfigParser()
with open(config_ini) as f:
config.read_file(f)
# config logging
filename = FILEPATH + config.get("logging","filename")
level = config.get("logging","level")
if level == "INFO":
level = logging.INFO
elif level == "DEBUG":
level = logging.DEBUG
elif level == "WARNING":
level = logging.WARNING
logging.basicConfig(filename=filename, level=level)
def logprint(string, level="INFO"):
"""log and prints"""
print(string)
if level == "INFO":
logging.info(string)
elif level == "DEBUG":
logging.debug(string)
elif level == "WARNING":
logging.warning(string)
def compose(*functions):
def compose2(f, g):
return lambda x: f(g(x))
return functools.reduce(compose2, functions, lambda x: x)
def get_calling_function():
"""finds the calling function in many decent cases.
https://stackoverflow.com/questions/39078467/python-how-to-get-the-calling-function-not-just-its-name
"""
fr = sys._getframe(1) # inspect.stack()[1][0]
co = fr.f_code
for get in (
lambda: fr.f_globals[co.co_name],
lambda: getattr(fr.f_locals['self'], co.co_name),
lambda: getattr(fr.f_locals['cls'], co.co_name),
lambda: fr.f_back.f_locals[co.co_name], # nested
lambda: fr.f_back.f_locals['func'], # decorators
lambda: fr.f_back.f_locals['meth'],
lambda: fr.f_back.f_locals['f'],
):
try:
func = get()
except (KeyError, AttributeError):
pass
else:
if func.__code__ == co:
return func
raise AttributeError("func not found")
def save_obj(obj, path):
with open(path , 'wb') as f:
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
def load_obj(path):
with open(path, 'rb') as f:
return pickle.load(f)
def replaceRockDots_lambda():
return lambda string : re.sub(r'[ß]', "ss",
(re.sub(r'[ö]', "oe",
(re.sub(r'[Ö]', "Oe",
(re.sub(r'[ü]', "ue",
(re.sub(r'[Ü]', "Ue",
(re.sub(r'[ä]', "ae",
(re.sub(r'[Ä]', "Ae",
string)))))))))))))
def replaceRockDots(string):
return re.sub(r'[ß]', "ss",
(re.sub(r'[ö]', "oe",
(re.sub(r'[Ö]', "Oe",
(re.sub(r'[ü]', "ue",
(re.sub(r'[Ü]', "Ue",
(re.sub(r'[ä]', "ae",
(re.sub(r'[Ä]', "Ae",
string)))))))))))))
def list_from_files(*paths):
"""
create string-list from file like
n1
n2
n3
:param paths: list(str) or str if single path
:return: list(str)
"""
listlist = []
for path in paths:
listlist.append(list(textacy.fileio.read_file_lines(path)))
#liste von listen zu einer liste
liste = [item for sublist in listlist for item in sublist]
return list(map(textacy.preprocess.normalize_whitespace, liste))
def breakpoint():
pass
def sort_dictionary(dict):
return sorted(dict.items(), key=operator.itemgetter(1))
def normalize(string):
# replaceRockDots
string = re.sub(r'[ß]', "ss", string.lower())
string = re.sub(r'[ö]', "oe", string)
string = re.sub(r'[ü]', "ue", string)
string = re.sub(r'[ä]', "ae", string)
string = textacy.preprocess.normalize_whitespace(string)
return string
def deprecated(func):
"""This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emmitted
when the function is used."""
@functools.wraps(func)
def new_func(*args, **kwargs):
warnings.simplefilter('always', DeprecationWarning) #turn off filter
warnings.warn("Call to deprecated function {}.".format(func.__name__), category=DeprecationWarning, stacklevel=2)
warnings.simplefilter('default', DeprecationWarning) #reset filter
return func(*args, **kwargs)
return new_func
def flatten(liste):
return [item for sublist in liste for item in sublist]
def printRandomDoc(textacyCorpus):
"""
printlogss random doc out of a textacy-Corpus
:param textacyCorpus:
"""
print()
if len(textacyCorpus) == 0:
logprint("NO DOCS IN CORPUS")
else:
#printlog("len(textacyCorpus) = %i" % len(textacyCorpus))
randIndex = int((len(textacyCorpus) - 1) * random.random())
logprint("Index: {0} \n Text: {1} \n categoryName: {2}\n".format(randIndex, textacyCorpus[randIndex].text,
textacyCorpus[randIndex].metadata['categoryName']))
print()
def get_list_from_config(section,option):
return list(map(textacy.preprocess.normalize_whitespace,config.get(section,option).split(",")))
def corpus2Text(corpus):
for doc in corpus:
yield doc.text
def corpus2Meta(corpus):
for doc in corpus:
yield doc.metadata
def savelabledCorpiLines(corpus,filepath):
textacy.fileio.write_file_lines(gen_labledLines(corpus), filepath=filepath)
def gen_labledLines(corpus):
for doc in corpus:
# generate [topic1, topic2....] tok1 tok2 tok3 out of corpi
yield "[" + doc.metadata["categoryName"] + "] " + doc.text
def save_corpus(corpus, corpus_path, corpus_name):
"""
saves a textacy-corpus including spacy-parser
:param corpus: textacy-Corpus
:param corpus_path: str
:param corpus_name: str (should content the language like "_de_")
"""
#todo pos und ner tagging speichern
# save parser
parser = corpus.spacy_lang
parserpath = corpus_path + str(parser.lang) + '_parser'
parser.save_to_directory(parserpath)
# save plain content + meta
plainpath = corpus_path + corpus_name + "_content.json"
textacy.fileio.write_json_lines(gen_dicts(corpus), plainpath)
def gen_dicts(corpus):
for doc in corpus:
dict = {"index" : doc.corpus_index, "content" : doc.text}
dict.update(doc.metadata)
yield dict
def multisub(subs, subject):
#https://stackoverflow.com/questions/764360/a-list-of-string-replacements-in-python
"Simultaneously perform all substitutions on the subject string."
pattern = '|'.join('(%s)' % re.escape(p) for p, s in subs)
substs = [s for p, s in subs]
replace = lambda m: substs[m.lastindex - 1]
return re.sub(pattern, replace, subject)
def load_corpus(corpus_path, corpus_name, lang="de"):
"""
Load textacy-Corpus including spacy-parser out from file
:param corpus_path: str
:param corpus_name: str (should content the language like "_de_")
:param lang: str (language code) ir spacy.Language
:return: texracy.Corpus, spacy.language
"""
#ckeck for language
if "de_" in corpus_name:
lang="de"
elif "en_" in corpus_name:
lang ="en"
# load parser
parser = spacy.load(lang)
stringstorepath = corpus_path + str(lang) + '_parser'+'/vocab/strings.json'
with open(stringstorepath) as file:
parser.vocab.strings.load(file)
vocabpath = Path(corpus_path + str(lang) + '_parser'+'/vocab/lexemes.bin')
parser.vocab.load_lexemes(vocabpath)
#load corpus
corpus = textacy.Corpus(parser)
plainpath = corpus_path + corpus_name + "_content.json"
plain_stream = textacy.fileio.read_json_lines(plainpath) # yields {int : str}
for plain in plain_stream:
meta = {}
for key,value in plain.items():
if key != "content" and key != "index":
meta[key] = value
corpus.add_doc(textacy.Doc(plain["content"], lang=corpus.spacy_lang, metadata=meta))
return corpus, corpus.spacy_lang