topicModelingTickets/miscellaneous.py

355 lines
10 KiB
Python
Raw Normal View History

2017-10-16 14:01:38 +02:00
# -*- coding: utf-8 -*-
2017-10-17 10:13:49 +02:00
import configparser as ConfigParser
2017-10-16 14:01:38 +02:00
import csv
import functools
import logging
2017-10-17 10:13:49 +02:00
import random
2017-10-16 14:01:38 +02:00
import re
import sys
2017-10-17 10:13:49 +02:00
from pathlib import Path
import pickle
2017-10-16 14:01:38 +02:00
import spacy
import textacy
from scipy import *
2017-10-17 10:13:49 +02:00
import os
2017-10-25 09:46:44 +02:00
import glob, os
from textacy.fileio import open_sesame
import json
from spacy.tokens.doc import Doc as SpacyDoc
2017-10-16 14:01:38 +02:00
csv.field_size_limit(sys.maxsize)
2017-10-17 10:13:49 +02:00
FILEPATH = os.path.dirname(os.path.realpath(__file__)) + "/"
2017-10-16 14:01:38 +02:00
# load config
2017-10-17 10:13:49 +02:00
config_ini = FILEPATH + "config.ini"
2017-10-16 14:01:38 +02:00
config = ConfigParser.ConfigParser()
with open(config_ini) as f:
config.read_file(f)
# config logging
2017-10-17 10:13:49 +02:00
filename = FILEPATH + config.get("logging","filename")
2017-10-16 14:01:38 +02:00
level = config.get("logging","level")
if level == "INFO":
level = logging.INFO
elif level == "DEBUG":
level = logging.DEBUG
elif level == "WARNING":
level = logging.WARNING
logging.basicConfig(filename=filename, level=level)
2017-10-25 09:46:44 +02:00
def logprint(string, level="INFO"):
2017-10-16 14:01:38 +02:00
"""log and prints"""
print(string)
if level == "INFO":
logging.info(string)
elif level == "DEBUG":
logging.debug(string)
elif level == "WARNING":
logging.warning(string)
def compose(*functions):
def compose2(f, g):
return lambda x: f(g(x))
return functools.reduce(compose2, functions, lambda x: x)
def get_calling_function():
"""finds the calling function in many decent cases.
https://stackoverflow.com/questions/39078467/python-how-to-get-the-calling-function-not-just-its-name
"""
fr = sys._getframe(1) # inspect.stack()[1][0]
co = fr.f_code
for get in (
lambda: fr.f_globals[co.co_name],
lambda: getattr(fr.f_locals['self'], co.co_name),
lambda: getattr(fr.f_locals['cls'], co.co_name),
lambda: fr.f_back.f_locals[co.co_name], # nested
lambda: fr.f_back.f_locals['func'], # decorators
lambda: fr.f_back.f_locals['meth'],
lambda: fr.f_back.f_locals['f'],
):
try:
func = get()
except (KeyError, AttributeError):
pass
else:
if func.__code__ == co:
return func
raise AttributeError("func not found")
def save_obj(obj, path):
with open(path , 'wb') as f:
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
def load_obj(path):
with open(path, 'rb') as f:
return pickle.load(f)
2017-10-25 09:46:44 +02:00
2017-10-16 14:01:38 +02:00
def replaceRockDots():
return lambda string: re.sub(r'[ß]', "ss",
(re.sub(r'[ö]', "oe",
(re.sub(r'[ü]', "ue", (re.sub(r'[ä]', "ae", string.lower())))))))
def list_from_files(*paths):
"""
create string-list from file like
n1
n2
n3
:param paths: list(str) or str if single path
:return: list(str)
"""
listlist = []
for path in paths:
listlist.append(list(textacy.fileio.read_file_lines(path)))
#liste von listen zu einer liste
liste = [item for sublist in listlist for item in sublist]
return list(map(textacy.preprocess.normalize_whitespace, liste))
2017-10-25 09:46:44 +02:00
def deprecated(func):
"""This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emmitted
when the function is used."""
@functools.wraps(func)
def new_func(*args, **kwargs):
warnings.simplefilter('always', DeprecationWarning) #turn off filter
warnings.warn("Call to deprecated function {}.".format(func.__name__), category=DeprecationWarning, stacklevel=2)
warnings.simplefilter('default', DeprecationWarning) #reset filter
return func(*args, **kwargs)
2017-10-16 14:01:38 +02:00
2017-10-25 09:46:44 +02:00
return new_func
2017-10-16 14:01:38 +02:00
def printRandomDoc(textacyCorpus):
"""
printlogss random doc out of a textacy-Corpus
:param textacyCorpus:
"""
print()
2017-10-17 10:13:49 +02:00
if len(textacyCorpus) == 0:
2017-10-25 09:46:44 +02:00
logprint("NO DOCS IN CORPUS")
2017-10-17 10:13:49 +02:00
else:
2017-10-25 09:46:44 +02:00
#printlog("len(textacyCorpus) = %i" % len(textacyCorpus))
2017-10-17 10:13:49 +02:00
randIndex = int((len(textacyCorpus) - 1) * random.random())
2017-10-25 09:46:44 +02:00
logprint("Index: {0} \n Text: {1} \n categoryName: {2}\n".format(randIndex, textacyCorpus[randIndex].text,
textacyCorpus[randIndex].metadata['categoryName']))
2017-10-16 14:01:38 +02:00
print()
2017-10-25 09:46:44 +02:00
def corpus2Text(corpus):
for doc in corpus:
yield doc.text
2017-10-16 14:01:38 +02:00
2017-10-25 09:46:44 +02:00
def corpus2Meta(corpus):
for doc in corpus:
yield doc.metadata
def saveplaincorpustext(corpus,path):
textacy.fileio.write_file_lines(corpus2Text(corpus),filepath=path )
2017-10-16 14:01:38 +02:00
def save_corpus(corpus, corpus_path, corpus_name):
"""
saves a textacy-corpus including spacy-parser
:param corpus: textacy-Corpus
:param corpus_path: str
:param corpus_name: str (should content the language like "_de_")
"""
"""
# save stringstore
stringstore_path = corpus_path + corpus_name + '_strings.json'
with open(stringstore_path, "w") as file:
parser.vocab.strings.dump(file)
#todo save vocab?
"""
# save parser
parser = corpus.spacy_lang
parserpath = corpus_path + str(parser.lang) + '_parser'
parser.save_to_directory(parserpath)
2017-10-25 09:46:44 +02:00
##
2017-10-16 14:01:38 +02:00
# save content
contentpath = corpus_path + corpus_name + "_content.bin"
textacy.fileio.write_spacy_docs((doc.spacy_doc for doc in corpus), contentpath)
2017-10-25 09:46:44 +02:00
#save plain content
plainpath = corpus_path + corpus_name + "_content.json"
textacy.fileio.write_json_lines(({"index" : doc.corpus_index, "content" : doc.text} for doc in corpus), plainpath)
2017-10-16 14:01:38 +02:00
# save meta
metapath = corpus_path + corpus_name + "_meta.json"
textacy.fileio.write_json_lines((doc.metadata for doc in corpus), metapath)
2017-10-25 09:46:44 +02:00
2017-10-16 14:01:38 +02:00
def load_corpus(corpus_path, corpus_name, lang="de"):
"""
Load textacy-Corpus including spacy-parser out from file
:param corpus_path: str
:param corpus_name: str (should content the language like "_de_")
2017-10-17 10:13:49 +02:00
:param lang: str (language code) ir spacy.Language
2017-10-16 14:01:38 +02:00
:return: texracy.Corpus, spacy.language
"""
#ckeck for language
2017-10-17 10:13:49 +02:00
if "de_" in corpus_name:
2017-10-16 14:01:38 +02:00
lang="de"
2017-10-17 10:13:49 +02:00
elif "en_" in corpus_name:
2017-10-16 14:01:38 +02:00
lang ="en"
# load parser
parser = spacy.load(lang)
stringstorepath = corpus_path + str(lang) + '_parser'+'/vocab/strings.json'
with open(stringstorepath) as file:
parser.vocab.strings.load(file)
vocabpath = Path(corpus_path + str(lang) + '_parser'+'/vocab/lexemes.bin')
parser.vocab.load_lexemes(vocabpath)
#load corpus
corpus = textacy.Corpus(parser)
contentpath = corpus_path + corpus_name + "_content.bin"
2017-10-25 09:46:44 +02:00
plainpath = corpus_path + corpus_name + "_content.json"
2017-10-16 14:01:38 +02:00
metapath = corpus_path + corpus_name + "_meta.json"
2017-10-25 09:46:44 +02:00
try:
spacy_docs = textacy.fileio.read_spacy_docs(corpus.spacy_vocab, contentpath)
metadata_stream = textacy.fileio.read_json_lines(metapath)
2017-10-16 14:01:38 +02:00
2017-10-25 09:46:44 +02:00
for spacy_doc, metadata in zip(spacy_docs, metadata_stream):
corpus.add_doc(
2017-10-16 14:01:38 +02:00
textacy.Doc(spacy_doc, lang=corpus.spacy_lang, metadata=metadata))
2017-10-25 09:46:44 +02:00
except:
# neu init!!
corpus = textacy.Corpus(parser)
plain_stream = textacy.fileio.read_json_lines(plainpath) # yields {int : str}
metadata_stream = textacy.fileio.read_json_lines(metapath)
for plain, metadata in zip(plain_stream, metadata_stream):
corpus.add_doc(
textacy.Doc(plain["content"], lang=corpus.spacy_lang, metadata=metadata))
2017-10-16 14:01:38 +02:00
return corpus, corpus.spacy_lang
2017-10-25 09:46:44 +02:00
def save_corpusV2(corpus, corpus_path, corpus_name):
"""
saves a textacy-corpus including spacy-parser
:param corpus: textacy-Corpus
:param corpus_path: str
:param corpus_name: str (should content the language like "_de_")
"""
# save parser
parser = corpus.spacy_lang
parserpath = corpus_path + str(parser.lang) + '_parser'
parser.save_to_directory(parserpath)
contentpath = corpus_path +corpus_name + "_docs/"
if not os.path.exists(contentpath):
os.makedirs(contentpath)
for doc in corpus:
with open(contentpath + str(doc.corpus_index) + "_doc.bin", 'w') as f:
f.write(doc.spacy_doc.to_bytes())
with open(contentpath + str(doc.corpus_index) + "_meta.json", 'w') as file:
file.write(json.dumps(doc.metadata))
def load_corpusV2(corpus_path, corpus_name, lang="de"):
"""
Load textacy-Corpus including spacy-parser out from file
:param corpus_path: str
:param corpus_name: str (should content the language like "_de_")
:param lang: str (language code) ir spacy.Language
:return: texracy.Corpus, spacy.language
"""
# ckeck for language
if "de_" in corpus_name:
lang = "de"
elif "en_" in corpus_name:
lang = "en"
# load parser
parser = spacy.load(lang)
stringstorepath = corpus_path + str(lang) + '_parser' + '/vocab/strings.json'
with open(stringstorepath) as file:
parser.vocab.strings.load(file)
vocabpath = Path(corpus_path + str(lang) + '_parser' + '/vocab/lexemes.bin')
parser.vocab.load_lexemes(vocabpath)
# load corpus
corpus = textacy.Corpus(parser)
contentpath = corpus_path + corpus_name + "_docs/"
docs = yield_fromdir(contentpath,spacy_vocab=corpus.spacy_vocab,type="doc")
metas = yield_fromdir(contentpath,type="meta")
for doc,meta in zip(docs,metas):
corpus.add_doc(
textacy.Doc(doc, lang=corpus.spacy_lang, metadata=meta))
return corpus, corpus.spacy_lang
def yield_fromdir(path,spacy_vocab=None,type=".pkl"):
os.chdir(path)
filelist = [name for name in os.listdir('.') if os.path.isfile(name)]
filelist = [filename for filename in filelist if type in filename]
filelist.sort(key = lambda elem : elem.split("_")[0])
if type =='doc':
for filename in filelist:
with open(path+filename,'r') as f:
for bytes_string in SpacyDoc.read_bytes(f):
yield SpacyDoc(spacy_vocab).from_bytes(bytes_string)
elif type == 'meta':
for filename in filelist:
with open(path+filename,'r') as f:
yield json.load(f)
else:
for filename in filelist:
yield load_obj(path+filename)