topicModelingTickets/miscellaneous.py

314 lines
8.7 KiB
Python

# -*- coding: utf-8 -*-
import configparser as ConfigParser
import csv
import functools
import logging
import random
import re
import sys
from pathlib import Path
import pickle
import spacy
import textacy
from scipy import *
import os
import glob, os
from textacy.fileio import open_sesame
import json
from spacy.tokens.doc import Doc as SpacyDoc
csv.field_size_limit(sys.maxsize)
FILEPATH = os.path.dirname(os.path.realpath(__file__)) + "/"
# load config
config_ini = FILEPATH + "config.ini"
config = ConfigParser.ConfigParser()
with open(config_ini) as f:
config.read_file(f)
# config logging
filename = FILEPATH + config.get("logging","filename")
level = config.get("logging","level")
if level == "INFO":
level = logging.INFO
elif level == "DEBUG":
level = logging.DEBUG
elif level == "WARNING":
level = logging.WARNING
logging.basicConfig(filename=filename, level=level)
def logprint(string, level="INFO"):
"""log and prints"""
print(string)
if level == "INFO":
logging.info(string)
elif level == "DEBUG":
logging.debug(string)
elif level == "WARNING":
logging.warning(string)
def compose(*functions):
def compose2(f, g):
return lambda x: f(g(x))
return functools.reduce(compose2, functions, lambda x: x)
def get_calling_function():
"""finds the calling function in many decent cases.
https://stackoverflow.com/questions/39078467/python-how-to-get-the-calling-function-not-just-its-name
"""
fr = sys._getframe(1) # inspect.stack()[1][0]
co = fr.f_code
for get in (
lambda: fr.f_globals[co.co_name],
lambda: getattr(fr.f_locals['self'], co.co_name),
lambda: getattr(fr.f_locals['cls'], co.co_name),
lambda: fr.f_back.f_locals[co.co_name], # nested
lambda: fr.f_back.f_locals['func'], # decorators
lambda: fr.f_back.f_locals['meth'],
lambda: fr.f_back.f_locals['f'],
):
try:
func = get()
except (KeyError, AttributeError):
pass
else:
if func.__code__ == co:
return func
raise AttributeError("func not found")
def save_obj(obj, path):
with open(path , 'wb') as f:
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
def load_obj(path):
with open(path, 'rb') as f:
return pickle.load(f)
def replaceRockDots():
return lambda string: re.sub(r'[ß]', "ss",
(re.sub(r'[ö]', "oe",
(re.sub(r'[ü]', "ue", (re.sub(r'[ä]', "ae", string.lower())))))))
def list_from_files(*paths):
"""
create string-list from file like
n1
n2
n3
:param paths: list(str) or str if single path
:return: list(str)
"""
listlist = []
for path in paths:
listlist.append(list(textacy.fileio.read_file_lines(path)))
#liste von listen zu einer liste
liste = [item for sublist in listlist for item in sublist]
return list(map(textacy.preprocess.normalize_whitespace, liste))
def deprecated(func):
"""This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emmitted
when the function is used."""
@functools.wraps(func)
def new_func(*args, **kwargs):
warnings.simplefilter('always', DeprecationWarning) #turn off filter
warnings.warn("Call to deprecated function {}.".format(func.__name__), category=DeprecationWarning, stacklevel=2)
warnings.simplefilter('default', DeprecationWarning) #reset filter
return func(*args, **kwargs)
return new_func
def printRandomDoc(textacyCorpus):
"""
printlogss random doc out of a textacy-Corpus
:param textacyCorpus:
"""
print()
if len(textacyCorpus) == 0:
logprint("NO DOCS IN CORPUS")
else:
#printlog("len(textacyCorpus) = %i" % len(textacyCorpus))
randIndex = int((len(textacyCorpus) - 1) * random.random())
logprint("Index: {0} \n Text: {1} \n categoryName: {2}\n".format(randIndex, textacyCorpus[randIndex].text,
textacyCorpus[randIndex].metadata['categoryName']))
print()
def save_corpus(corpus, corpus_path, corpus_name):
"""
saves a textacy-corpus including spacy-parser
:param corpus: textacy-Corpus
:param corpus_path: str
:param corpus_name: str (should content the language like "_de_")
"""
# save parser
parser = corpus.spacy_lang
parserpath = corpus_path + str(parser.lang) + '_parser'
parser.save_to_directory(parserpath)
# save plain content + meta
plainpath = corpus_path + corpus_name + "_content.json"
textacy.fileio.write_json_lines(gen_dicts(corpus), plainpath)
def gen_dicts(corpus):
for doc in corpus:
dict = {"index" : doc.corpus_index, "content" : doc.text}
dict.update(doc.metadata)
yield dict
def load_corpus(corpus_path, corpus_name, lang="de"):
"""
Load textacy-Corpus including spacy-parser out from file
:param corpus_path: str
:param corpus_name: str (should content the language like "_de_")
:param lang: str (language code) ir spacy.Language
:return: texracy.Corpus, spacy.language
"""
#ckeck for language
if "de_" in corpus_name:
lang="de"
elif "en_" in corpus_name:
lang ="en"
# load parser
parser = spacy.load(lang)
stringstorepath = corpus_path + str(lang) + '_parser'+'/vocab/strings.json'
with open(stringstorepath) as file:
parser.vocab.strings.load(file)
vocabpath = Path(corpus_path + str(lang) + '_parser'+'/vocab/lexemes.bin')
parser.vocab.load_lexemes(vocabpath)
#load corpus
corpus = textacy.Corpus(parser)
plainpath = corpus_path + corpus_name + "_content.json"
plain_stream = textacy.fileio.read_json_lines(plainpath) # yields {int : str}
for plain in plain_stream:
meta = {}
for key,value in plain.items():
if key != "content" and key != "index":
meta[key] = value
corpus.add_doc(textacy.Doc(plain["content"], lang=corpus.spacy_lang, metadata=meta))
return corpus, corpus.spacy_lang
"""
def corpus2Text(corpus):
for doc in corpus:
yield doc.text
def corpus2Meta(corpus):
for doc in corpus:
yield doc.metadata
def saveplaincorpustext(corpus,path):
textacy.fileio.write_file_lines(corpus2Text(corpus),filepath=path )
def save_corpusV2(corpus, corpus_path, corpus_name):
# save parser
parser = corpus.spacy_lang
parserpath = corpus_path + str(parser.lang) + '_parser'
parser.save_to_directory(parserpath)
contentpath = corpus_path +corpus_name + "_docs/"
if not os.path.exists(contentpath):
os.makedirs(contentpath)
for doc in corpus:
with open(contentpath + str(doc.corpus_index) + "_doc.bin", 'w') as f:
f.write(doc.spacy_doc.to_bytes())
with open(contentpath + str(doc.corpus_index) + "_meta.json", 'w') as file:
file.write(json.dumps(doc.metadata))
def load_corpusV2(corpus_path, corpus_name, lang="de"):
# ckeck for language
if "de_" in corpus_name:
lang = "de"
elif "en_" in corpus_name:
lang = "en"
# load parser
parser = spacy.load(lang)
stringstorepath = corpus_path + str(lang) + '_parser' + '/vocab/strings.json'
with open(stringstorepath) as file:
parser.vocab.strings.load(file)
vocabpath = Path(corpus_path + str(lang) + '_parser' + '/vocab/lexemes.bin')
parser.vocab.load_lexemes(vocabpath)
# load corpus
corpus = textacy.Corpus(parser)
contentpath = corpus_path + corpus_name + "_docs/"
docs = yield_fromdir(contentpath,spacy_vocab=corpus.spacy_vocab,type="doc")
metas = yield_fromdir(contentpath,type="meta")
for doc,meta in zip(docs,metas):
corpus.add_doc(
textacy.Doc(doc, lang=corpus.spacy_lang, metadata=meta))
return corpus, corpus.spacy_lang
def yield_fromdir(path,spacy_vocab=None,type=".pkl"):
os.chdir(path)
filelist = [name for name in os.listdir('.') if os.path.isfile(name)]
filelist = [filename for filename in filelist if type in filename]
filelist.sort(key = lambda elem : elem.split("_")[0])
if type =='doc':
for filename in filelist:
with open(path+filename,'r') as f:
for bytes_string in SpacyDoc.read_bytes(f):
yield SpacyDoc(spacy_vocab).from_bytes(bytes_string)
elif type == 'meta':
for filename in filelist:
with open(path+filename,'r') as f:
yield json.load(f)
else:
for filename in filelist:
yield load_obj(path+filename)
"""