emloop.datasets

Module with emloop dataset concept (AbstractDataset) and python BaseDataset.

Classes

  • AbstractDataset: This concept prescribes the API that is required from every emloop dataset.

  • BaseDataset: Base class for datasets written in python.

  • DownloadableDataset: DownloadableDataset is dataset base class implementing routines for downloading and extracting data via

  • StreamWrapper: Dataset stream wrapper which manages buffering, epoch cutting etc.

class emloop.datasets.AbstractDataset(config_str)

Bases: object

This concept prescribes the API that is required from every emloop dataset.

Every emloop dataset has to have a constructor which takes YAML string config. Additionally, one may implement any <stream_name>_stream method in order to make stream_name stream available in the emloop emloop.MainLoop.

All the defined stream methods should return a Stream.

Inheritance diagram of AbstractDataset

__init__(config_str)[source]

Create new dataset configured with the given YAML string (obligatory).

The configuration must contain dataset entry and may contain output_dir entry.

Parameters

config_str (str) – YAML string config

class emloop.datasets.BaseDataset(config_str)

Bases: datasets.AbstractDataset

Base class for datasets written in python.

In the inherited class, one should:
  • override the _configure_dataset

  • (optional) implement train_stream method if intended to be used with emloop train ...

  • (optional) implement <stream_name>_stream method in order to make <stream_name> stream available

Inheritance diagram of BaseDataset

__init__(config_str)[source]

Create new dataset.

Decode the given YAML config string and pass the obtained **kwargs to _configure_dataset().

Parameters

config_str (str) – dataset configuration as YAML string

_configure_dataset(output_dir, **kwargs)[source]

Configure the dataset with **kwargs decoded from YAML configuration.

Parameters
  • output_dir (Optional[str]) – output directory for logging and any additional outputs (None if no output dir is available)

  • kwargs – dataset configuration as **kwargs parsed from config['dataset']

Raises

NotImplementedError – if not overridden

stream_info()[source]

Check and report source names, dtypes and shapes of all the streams available.

Return type

None

class emloop.datasets.DownloadableDataset(config_str)

Bases: datasets.BaseDataset

DownloadableDataset is dataset base class implementing routines for downloading and extracting data via emloop dataset download command.

The typical use-case is that data_root, url_root and download_filenames variables are passed to the dataset constructor. Alternatively, these properties might be directly implemented in their corresponding methods.

Inheritance diagram of DownloadableDataset

_configure_dataset(data_root=None, download_urls=None, **kwargs)[source]

Save the passed values and use them as a default property implementation.

Parameters
  • data_root (Optional[str]) – directory to which the files will be downloaded

  • download_urls (Optional[Iterable[str]]) – list of URLs to be downloaded

Return type

None

data_root

Path to the data root directory.

Return type

str

download()[source]

Maybe download and extract the extra files required.

If not already downloaded, download all files specified by download_urls(). Then, extract the downloaded files to data_root().

emloop CLI example
emloop dataset download <path-to-config>
Return type

None

download_urls

A list of URLs to be downloaded.

Return type

Iterable[str]

class emloop.datasets.StreamWrapper(stream_fn, buffer_size=0, epoch_size=-1, name=None, profile=None)

Bases: object

Dataset stream wrapper which manages buffering, epoch cutting etc.

The main features are:
  • resets underlying dataset stream after the iteration reaches its end

  • if specified, uses consumer-producer buffer for batches allowing simultaneous batch producing and training

  • if specified, produces epochs of fixed size

  • logs the timings to the given profile

Caution

Buffered StreamWrapper must be used in with-resource environment so that the enqueueing thread can be properly managed.

non-buffered StreamWrapper
stream = StreamWrapper(dataset.train_stream, 'train')
for batch in stream:  # 1st batch
    # do stuff
for batch in stream:  # 2nd batch
    # do stuff
buffered StreamWrapper with fixed size epochs
stream = StreamWrapper(dataset.train_stream, 'train', buffer=16, epoch_size=1000)
with stream:  # we would get error without with-resource directive
    for batch in stream:  # 1st batch
        # do stuff
Inheritance diagram of StreamWrapper

__enter__()[source]

If buffered, start the enqueueing thread.

Return type

Iterator[Mapping[str, Sequence[Any]]]

__exit__(*args)[source]

If buffered, terminate the enqueueing thread.

Return type

None

__init__(stream_fn, buffer_size=0, epoch_size=-1, name=None, profile=None)[source]

Create new StreamWrapper.

Parameters
__iter__()[source]

Get stream iterator.

Return type

Iterator[Mapping[str, Sequence[Any]]]

__next__()[source]

Return next batch or end epoch with StopIteration.

Return type

Mapping[str, Sequence[Any]]

Returns

next batch

Raises

StopIteration – at the end of the epoch

_dequeue_batch()[source]

Return a single batch from queue or None signaling epoch end.

Raises

ChildProcessError – if the enqueueing thread ended unexpectedly

Return type

Optional[Mapping[str, Sequence[Any]]]

_enqueue_batches(stop_event)[source]

Enqueue all the stream batches. If specified, stop after epoch_size batches.

Note

Signal the epoch end with None.

Stop when: - stop_event is risen - stream ends and epoch size is not set - specified number of batches is enqueued

Note

This is used only with buffer > 0.

Parameters

stop_event (Event) – event signaling stop instruction

Return type

None

_epoch_limit_reached()[source]

Returns True if the number of produced batches reached the specified epoch_size.

Always return False if no limit was specified.

Return type

bool

_get_stream()[source]

Possibly create and return raw dataset stream iterator.

Return type

Iterator[+T_co]

_next_batch()[source]

Return a single batch or None signaling epoch end.

Note

Signal the epoch end with None.

Stop when: - stream ends and epoch size is not set - specified number of batches is returned

Return type

Optional[Mapping[str, Sequence[Any]]]

Returns

a single batch or None signaling epoch end

_start_thread()[source]

Start an enqueueing thread.

_stop_thread()[source]

Stop the enqueueing thread. Keep the queue content and stream state.

allow_buffering

A resource that allows the stream object to prepare batches in advance.

After the construction of the stream wrapper, the buffering is disabled. This function makes it possible to allow buffering only when there is some spare CPU time. A good place to allow buffering is e.g., during the training procedure in the emloop.models.AbstractModel.run() method, whenever the GIL is released.

Usage
# the training method of a model
def run(self, batch, train, stream):
    preprocess_batch_in_python(batch)  # this function holds the GIL and fully utilizes the CPU
    with stream.allow_buffering:
        call_native_backend(batch)  # this function is blocking, but releases the GIL
                                    # we can use the GIL and the spare CPU to prepare the next batch
Return type

ReleasedSemaphore

Returns

A resource object that allows buffering when in use.

empty()[source]

Return whether the buffer is empty.

Return type

bool

name

Stream name.

Return type

Optional[str]