Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions pyndl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
'Topic :: Scientific/Engineering',
'Topic :: Scientific/Engineering :: Artificial Intelligence',
'Topic :: Scientific/Engineering :: Information Analysis',
]
]


def sysinfo():
Expand All @@ -62,9 +62,12 @@ def sysinfo():

if uname.sysname == "Linux":
_, *lines = os.popen("free -m").readlines()
for identifier in ["Mem:", "Swap:"]:
memory = [line for line in lines if identifier in line][0]
_, total, used, *_ = memory.split()
for identifier in ("Mem:", "Swap:"):
memory = [line for line in lines if identifier in line]
if len(memory) > 0:
_, total, used, *_ = memory[0].split()
else:
total, used = '?', '?'
osinfo += "{} {}MiB/{}MiB\n".format(identifier, used, total)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This of the memory printing change should go into master as well (independent of the typing). IMHO.


osinfo += "\n"
Expand Down
47 changes: 29 additions & 18 deletions pyndl/activation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@
import multiprocessing as mp
import ctypes
from collections import defaultdict, OrderedDict
from typing import Iterable, List, Dict, Optional, Tuple, Union

import numpy as np
import xarray as xr

from . import io
from .types import AnyWeights, CollectionEvent, AnyEvent, Path, CueCollection, Collection


# pylint: disable=W0621
def activation(events, weights, number_of_threads=1, remove_duplicates=None, ignore_missing_cues=False):
def activation(events: Union[Path, Iterable[AnyEvent]],
weights: AnyWeights,
number_of_threads: int = 1,
remove_duplicates: Optional[bool] = None,
ignore_missing_cues: bool = False) -> Union[xr.DataArray, Dict[str, np.ndarray]]:
"""
Estimate activations for given events in event file and outcome-cue weights.

Expand Down Expand Up @@ -58,20 +64,23 @@ def activation(events, weights, number_of_threads=1, remove_duplicates=None, ign
returned if weights is instance of dict

"""
if isinstance(events, str):
events = io.events_from_file(events)
event_list = [] # type: Iterable[CollectionEvent]
if isinstance(events, Path):
event_list = io.events_from_file(events)
else:
event_list = events

events = (cues for cues, outcomes in events)
cues_gen = (cues for cues, outcomes in event_list) # type: Iterable[CueCollection]
if remove_duplicates is None:
def check_no_duplicates(cues):
if len(cues) != len(set(cues)):
raise ValueError('cues needs to be unique: "{}"; use '
'remove_duplicates=True'.format(' '.join(cues)))
else:
return set(cues)
events = (check_no_duplicates(cues) for cues in events)
cues_gen = (check_no_duplicates(cues) for cues in cues_gen)
elif remove_duplicates is True:
events = (set(cues) for cues in events)
cues_gen = (set(cues) for cues in cues_gen)

if isinstance(weights, xr.DataArray):
cues = weights.coords["cues"].values.tolist()
Expand All @@ -81,10 +90,10 @@ def check_no_duplicates(cues):
cue_map = OrderedDict(((cue, ii) for ii, cue in enumerate(cues)))
if ignore_missing_cues:
event_cue_indices_list = (tuple(cue_map[cue] for cue in event_cues if cue in cues)
for event_cues in events)
for event_cues in cues_gen)
else:
event_cue_indices_list = (tuple(cue_map[cue] for cue in event_cues)
for event_cues in events)
for event_cues in cues_gen)
# pylint: disable=W0621
activations = _activation_matrix(list(event_cue_indices_list),
weights.values, number_of_threads)
Expand All @@ -95,14 +104,14 @@ def check_no_duplicates(cues):
dims=('outcomes', 'events'))
elif isinstance(weights, dict):
assert number_of_threads == 1, "Estimating activations with multiprocessing is not implemented for dicts."
activations = defaultdict(lambda: np.zeros(len(events)))
events = list(events)
cues_list = list(cues_gen)
activation_dict = defaultdict(lambda: np.zeros(len(cues_list))) # type: Dict[str, np.ndarray]
for outcome, cue_dict in weights.items():
_activations = activations[outcome]
for row, cues in enumerate(events):
_activations = activation_dict[outcome]
for row, cues in enumerate(cues_list):
for cue in cues:
_activations[row] += cue_dict[cue]
return activations
_activations[row] += cue_dict[cue] # type: ignore
return activation_dict
else:
raise ValueError("Weights other than xarray.DataArray or dicts are not supported.")

Expand Down Expand Up @@ -130,7 +139,8 @@ def _run_mp_activation_matrix(event_index, cue_indices):
activations[:, event_index] = weights[:, cue_indices].sum(axis=1)


def _activation_matrix(indices_list, weights, number_of_threads):
def _activation_matrix(indices_list: List[Tuple[int, ...]],
weights: np.ndarray, number_of_threads: int) -> np.ndarray:
"""
Estimate activation for indices in weights

Expand Down Expand Up @@ -160,12 +170,13 @@ def _activation_matrix(indices_list, weights, number_of_threads):
activations[:, row] = weights[:, event_cues].sum(axis=1)
return activations
else:
shared_activations = mp.RawArray(ctypes.c_double, int(np.prod(activations_dim)))
# type stubs seem to be incorrect for multiprocessing lib. 2018-05-16
shared_activations = mp.RawArray(ctypes.c_double, int(np.prod(activations_dim))) # type: ignore
weights = np.ascontiguousarray(weights)
shared_weights = mp.sharedctypes.copy(np.ctypeslib.as_ctypes(np.float64(weights)))
shared_weights = mp.sharedctypes.copy(np.ctypeslib.as_ctypes(np.float64(weights))) # type: ignore
initargs = (shared_weights, weights.shape, shared_activations, activations_dim)
with mp.Pool(number_of_threads, initializer=_init_mp_activation_matrix, initargs=initargs) as pool:
pool.starmap(_run_mp_activation_matrix, enumerate(indices_list))
activations = np.ctypeslib.as_array(shared_activations)
activations = np.ctypeslib.as_array(shared_activations) # type: ignore
activations.shape = activations_dim
return activations
13 changes: 8 additions & 5 deletions pyndl/corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
import gzip
import multiprocessing
import xml.etree.ElementTree
from typing import Iterator

__version__ = '0.2.0'

FRAMES_PER_SECOND = 30
PUNCTUATION = tuple(".,:;?!()[]'")


def _parse_time_string(time_string):
def _parse_time_string(time_string: str) -> float:
"""
parses string and returns time in seconds.

Expand All @@ -32,7 +33,7 @@ def _parse_time_string(time_string):
float(frames) / FRAMES_PER_SECOND)


def read_clean_gzfile(gz_file_path, *, break_duration=2.0):
def read_clean_gzfile(gz_file_path: str, *, break_duration=2.0) -> Iterator[str]:
"""
Generator that opens and reads a gunzipped xml subtitle file, while all
xml tags and timestamps are removed.
Expand Down Expand Up @@ -68,8 +69,10 @@ def read_clean_gzfile(gz_file_path, *, break_duration=2.0):
text = word_tag.text
if text in PUNCTUATION:
words.append(text)
else:
elif text is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have enough test coverage here? Is this the right thing to do? If yes, this should be merged into master as soon as possible as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agreed that it might be good to raise an exception here when text is None.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we raise a exception for null text.

words.extend((' ', text))
else:
raise ValueError("Text content of word tag is None.")
result = ''.join(words)
result = result.strip()

Expand Down Expand Up @@ -112,7 +115,7 @@ class JobParseGz():

"""

def __init__(self, break_duration):
def __init__(self, break_duration: float) -> None:
self.break_duration = break_duration

def run(self, filename):
Expand All @@ -126,7 +129,7 @@ def run(self, filename):
return (lines, not_found)


def create_corpus_from_gz(directory, outfile, *, n_threads=1, verbose=False):
def create_corpus_from_gz(directory: str, outfile: str, *, n_threads=1, verbose=False):
"""
Create a corpus file from a set of gunziped (.gz) files in a directory.

Expand Down
24 changes: 13 additions & 11 deletions pyndl/count.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import itertools
import multiprocessing
import sys
from typing import Tuple


def _job_cues_outcomes(event_file_name, start, step, verbose=False):
Expand Down Expand Up @@ -45,8 +46,8 @@ def _job_cues_outcomes(event_file_name, start, step, verbose=False):
return (nn + 1, cues, outcomes)


def cues_outcomes(event_file_name,
*, number_of_processes=2, verbose=False):
def cues_outcomes(event_file_name: str,
*, number_of_processes=2, verbose=False) -> Tuple[int, Counter, Counter]:
"""
Counts cues and outcomes in event_file_name using number_of_processes
processes.
Expand All @@ -65,8 +66,8 @@ def cues_outcomes(event_file_name,
verbose)
for start in range(number_of_processes)))
n_events = 0
cues = Counter()
outcomes = Counter()
cues = Counter() # type: Counter
outcomes = Counter() # type: Counter
for nn, cues_process, outcomes_process in results:
n_events += nn
cues += cues_process
Expand Down Expand Up @@ -116,8 +117,9 @@ def _job_words_symbols(corpus_file_name, start, step, lower_case=False,
return (words, symbols)


def words_symbols(corpus_file_name,
*, number_of_processes=2, lower_case=False, verbose=False):
def words_symbols(corpus_file_name: str, *,
number_of_processes=2, lower_case=False,
verbose=False) -> Tuple[Counter, Counter]:
"""
Counts words and symbols in corpus_file_name using number_of_processes
processes.
Expand All @@ -136,8 +138,8 @@ def words_symbols(corpus_file_name,
verbose)
for start in
range(number_of_processes)))
words = Counter()
symbols = Counter()
words = Counter() # type: Counter
symbols = Counter() # type: Counter
for words_process, symbols_process in results:
words += words_process
symbols += symbols_process
Expand All @@ -148,7 +150,7 @@ def words_symbols(corpus_file_name,
return words, symbols


def save_counter(counter, filename, *, header='key\tfreq\n'):
def save_counter(counter: Counter, filename: str, *, header='key\tfreq\n') -> None:
"""
Saves a counter object into a tab delimitered text file.

Expand All @@ -159,15 +161,15 @@ def save_counter(counter, filename, *, header='key\tfreq\n'):
dfile.write('{key}\t{count}\n'.format(key=key, count=count))


def load_counter(filename):
def load_counter(filename: str) -> Counter:
"""
Loads a counter out of a tab delimitered text file.

"""
with open(filename, 'rt') as dfile:
# skip header
dfile.readline()
counter = Counter()
counter = Counter() # type: Counter
for line in dfile:
key, count = line.strip().split('\t')
if key in counter.keys():
Expand Down
29 changes: 19 additions & 10 deletions pyndl/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
"""

import gzip
from collections import Iterator, Iterable
from collections import Iterable
from typing import Iterator, List, Optional, Tuple, Union, cast

import pandas as pd

from .types import CollectionEvent, StringEvent

def events_from_file(event_path, compression="gzip"):

def events_from_file(event_path: str, compression: Optional[str] = "gzip") -> Iterator[Tuple[List[str], List[str]]]:
"""
Yields events for all events in a gzipped event file.

Expand All @@ -30,8 +33,8 @@ def events_from_file(event_path, compression="gzip"):
------
cues, outcomes : list, list
a tuple of two lists containing cues and outcomes

"""

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually do not have an empty line after the docstring.

if compression == "gzip":
event_file = gzip.open(event_path, 'rt')
elif compression is None:
Expand All @@ -51,8 +54,11 @@ def events_from_file(event_path, compression="gzip"):
event_file.close()


def events_to_file(events, file_path, delimiter="\t", compression="gzip",
columns=("cues", "outcomes")):
def events_to_file(events: Union[Iterator[StringEvent], Iterator[CollectionEvent], pd.DataFrame],
file_path: str,
delimiter: str = "\t",
compression: Optional[str] = "gzip",
columns: Tuple[str, str] = ("cues", "outcomes")) -> None:
"""
Writes events to a file

Expand All @@ -75,9 +81,11 @@ def events_to_file(events, file_path, delimiter="\t", compression="gzip",

"""
if isinstance(events, pd.DataFrame):
events = events_from_dataframe(events)
collection_events = events_from_dataframe(events)
elif isinstance(events, (Iterator, Iterable)):
events = events_from_list(events)
collection_events = events_from_list(cast(Union[Iterator[StringEvent],
Iterator[CollectionEvent]],
events))
else:
raise ValueError("events should either be a pd.DataFrame or an Iterator or an Iterable.")

Expand All @@ -91,7 +99,7 @@ def events_to_file(events, file_path, delimiter="\t", compression="gzip",
try:
out_file.write("{}\n".format(delimiter.join(columns)))

for cues, outcomes in events:
for cues, outcomes in collection_events:
if isinstance(cues, list) and isinstance(outcomes, list):
line = "{}{}{}\n".format("_".join(cues),
delimiter,
Expand All @@ -105,7 +113,8 @@ def events_to_file(events, file_path, delimiter="\t", compression="gzip",
out_file.close()


def events_from_dataframe(df, columns=("cues", "outcomes")):
def events_from_dataframe(df: pd.DataFrame,
columns: Tuple[str, str] = ("cues", "outcomes")) -> Iterator[CollectionEvent]:
"""
Yields events for all events in a pandas dataframe.

Expand All @@ -130,7 +139,7 @@ def events_from_dataframe(df, columns=("cues", "outcomes")):
yield (cues, outcomes)


def events_from_list(lst):
def events_from_list(lst: Union[Iterator[StringEvent], Iterator[CollectionEvent]]) -> Iterator[CollectionEvent]:
"""
Yields events for all events in a list.

Expand Down
Loading