topicModelingTickets/miscellaneous.py

293 lines
8.0 KiB
Python

# -*- coding: utf-8 -*-
from datetime import datetime
import configparser as ConfigParser
import csv
import functools
import logging
import random
import re
import sys
from pathlib import Path
import pickle
import spacy
import textacy
from scipy import *
import os
import glob, os
from textacy.fileio import open_sesame
import json
from spacy.tokens.doc import Doc as SpacyDoc
import operator
csv.field_size_limit(sys.maxsize)
FILEPATH = os.path.dirname(os.path.realpath(__file__)) + "/"
# load config
config_ini = FILEPATH + "config.ini"
config = ConfigParser.ConfigParser()
with open(config_ini) as f:
config.read_file(f)
# config logging
filename = FILEPATH + config.get("logging","filename")
level = config.get("logging","level")
if level == "INFO":
level = logging.INFO
elif level == "DEBUG":
level = logging.DEBUG
elif level == "WARNING":
level = logging.WARNING
logging.basicConfig(filename=filename, level=level)
def logprint(string, level="INFO"):
"""log and prints"""
string = "{}\t".format(datetime.now()) + str(string)
print(string)
if level == "INFO":
logging.info(string)
elif level == "DEBUG":
logging.debug(string)
elif level == "WARNING":
logging.warning(string)
def compose(*functions):
def compose2(f, g):
return lambda x: f(g(x))
return functools.reduce(compose2, functions, lambda x: x)
def get_calling_function():
"""finds the calling function in many decent cases.
https://stackoverflow.com/questions/39078467/python-how-to-get-the-calling-function-not-just-its-name
"""
fr = sys._getframe(1) # inspect.stack()[1][0]
co = fr.f_code
for get in (
lambda: fr.f_globals[co.co_name],
lambda: getattr(fr.f_locals['self'], co.co_name),
lambda: getattr(fr.f_locals['cls'], co.co_name),
lambda: fr.f_back.f_locals[co.co_name], # nested
lambda: fr.f_back.f_locals['func'], # decorators
lambda: fr.f_back.f_locals['meth'],
lambda: fr.f_back.f_locals['f'],
):
try:
func = get()
except (KeyError, AttributeError):
pass
else:
if func.__code__ == co:
return func
raise AttributeError("func not found")
def save_obj(obj, path):
with open(path , 'wb') as f:
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
def load_obj(path):
with open(path, 'rb') as f:
return pickle.load(f)
def replaceRockDots_lambda():
return lambda string : re.sub(r'[ß]', "ss",
(re.sub(r'[ö]', "oe",
(re.sub(r'[Ö]', "Oe",
(re.sub(r'[ü]', "ue",
(re.sub(r'[Ü]', "Ue",
(re.sub(r'[ä]', "ae",
(re.sub(r'[Ä]', "Ae",
string)))))))))))))
def replaceRockDots(string):
return re.sub(r'[ß]', "ss",
(re.sub(r'[ö]', "oe",
(re.sub(r'[Ö]', "Oe",
(re.sub(r'[ü]', "ue",
(re.sub(r'[Ü]', "Ue",
(re.sub(r'[ä]', "ae",
(re.sub(r'[Ä]', "Ae",
string)))))))))))))
def list_from_files(*paths):
"""
create string-list from file like
n1
n2
n3
:param paths: list(str) or str if single path
:return: list(str)
"""
listlist = []
for path in paths:
listlist.append(list(textacy.fileio.read_file_lines(path)))
#liste von listen zu einer liste
liste = [item for sublist in listlist for item in sublist]
return list(map(textacy.preprocess.normalize_whitespace, liste))
def breakpoint():
pass
def sort_dictionary(dict):
return sorted(dict.items(), key=operator.itemgetter(1))
def normalize_str(string):
"""
replaceRockDots
textacy.preprocess.normalize_whitespace
:param string: str
:return: str
"""
return textacy.preprocess.normalize_whitespace(replaceRockDots(string))
def deprecated(func):
"""This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emmitted
when the function is used."""
@functools.wraps(func)
def new_func(*args, **kwargs):
warnings.simplefilter('always', DeprecationWarning) #turn off filter
warnings.warn("Call to deprecated function {}.".format(func.__name__), category=DeprecationWarning, stacklevel=2)
warnings.simplefilter('default', DeprecationWarning) #reset filter
return func(*args, **kwargs)
return new_func
def flatten(liste):
return [item for sublist in liste for item in sublist]
def printRandomDoc(textacyCorpus):
"""
printlogss random doc out of a textacy-Corpus
:param textacyCorpus:
"""
print()
if len(textacyCorpus) == 0:
logprint("NO DOCS IN CORPUS")
else:
#printlog("len(textacyCorpus) = %i" % len(textacyCorpus))
randIndex = int((len(textacyCorpus) - 1) * random.random())
logprint("Index: {0} \n Text: {1} \n categoryName: {2}\n".format(randIndex, textacyCorpus[randIndex].text,
textacyCorpus[randIndex].metadata['categoryName']))
print()
def get_list_from_config(section,option):
return list(map(textacy.preprocess.normalize_whitespace,config.get(section,option).split(",")))
def corpus2Text(corpus):
for doc in corpus:
yield doc.text
def corpus2Meta(corpus):
for doc in corpus:
yield doc.metadata
def savelabledCorpiLines_cat(corpus, filepath):
textacy.fileio.write_file_lines(gen_labledLines(corpus), filepath=filepath)
def gen_labledLines(corpus, label ="categoryName"):
for doc in corpus:
# generate [topic1, topic2....] tok1 tok2 tok3 out of corpi
yield "[" + doc.metadata[label] + "] " + doc.text
def save_corpus(corpus, corpus_path, corpus_name):
"""
saves a textacy-corpus including spacy-parser
:param corpus: textacy-Corpus
:param corpus_path: str
:param corpus_name: str (should content the language like "_de_")
"""
# save parser
parser = corpus.spacy_lang
parserpath = corpus_path + str(parser.lang) + '_parser'
parser.save_to_directory(parserpath)
# save plain content + meta
plainpath = corpus_path + corpus_name + "_content.json"
textacy.fileio.write_json_lines(gen_dicts(corpus), plainpath)
def gen_dicts(corpus):
for doc in corpus:
dict = {"index" : doc.corpus_index, "content" : doc.text}
dict.update(doc.metadata)
yield dict
def multisub(subs, subject):
#https://stackoverflow.com/questions/764360/a-list-of-string-replacements-in-python
"Simultaneously perform all substitutions on the subject string."
pattern = '|'.join('(%s)' % re.escape(p) for p, s in subs)
substs = [s for p, s in subs]
replace = lambda m: substs[m.lastindex - 1]
return re.sub(pattern, replace, subject)
def load_corpus(corpus_path, corpus_name, lang="de"):
"""
Load textacy-Corpus including spacy-parser out from file
:param corpus_path: str
:param corpus_name: str (should content the language like "_de_")
:param lang: str (language code) ir spacy.Language
:return: texracy.Corpus, spacy.language
"""
#ckeck for language
if "de_" in corpus_name:
lang="de"
elif "en_" in corpus_name:
lang ="en"
# load parser
parser = spacy.load(lang)
stringstorepath = corpus_path + str(lang) + '_parser'+'/vocab/strings.json'
with open(stringstorepath) as file:
parser.vocab.strings.load(file)
vocabpath = Path(corpus_path + str(lang) + '_parser'+'/vocab/lexemes.bin')
parser.vocab.load_lexemes(vocabpath)
#load corpus
corpus = textacy.Corpus(parser)
plainpath = corpus_path + corpus_name + "_content.json"
plain_stream = textacy.fileio.read_json_lines(plainpath) # yields {int : str}
for plain in plain_stream:
meta = {}
for key,value in plain.items():
if key != "content" and key != "index":
meta[key] = value
corpus.add_doc(textacy.Doc(plain["content"], lang=corpus.spacy_lang, metadata=meta))
return corpus, corpus.spacy_lang