emloop.datasets
¶Module with emloop dataset concept (AbstractDataset
) and python BaseDataset
.
AbstractDataset
:
This concept prescribes the API that is required from every emloop dataset.BaseDataset
:
Base class for datasets written in python.DownloadableDataset
:
DownloadableDataset is dataset base class implementing routines for downloading and extracting data viaStreamWrapper
:
Dataset stream wrapper which manages buffering, epoch cutting etc.emloop.datasets.
AbstractDataset
(config_str)¶Bases: object
This concept prescribes the API that is required from every emloop dataset.
Every emloop dataset has to have a constructor which takes YAML string config.
Additionally, one may implement any <stream_name>_stream
method
in order to make stream_name
stream available in the emloop emloop.MainLoop
.
All the defined stream methods should return a Stream
.
emloop.datasets.
BaseDataset
(config_str)¶Bases: datasets.AbstractDataset
Base class for datasets written in python.
_configure_dataset
train_stream
method if intended to be used with emloop train ...
<stream_name>_stream
method in order to make <stream_name>
stream available__init__
(config_str)[source]¶Create new dataset.
Decode the given YAML config string and pass the obtained **kwargs
to _configure_dataset()
.
Parameters: | config_str (str ) – dataset configuration as YAML string |
---|
_configure_dataset
(output_dir, **kwargs)[source]¶Configure the dataset with **kwargs
decoded from YAML configuration.
Parameters: | |
---|---|
Raises: | NotImplementedError – if not overridden |
emloop.datasets.
DownloadableDataset
(config_str)¶Bases: datasets.BaseDataset
DownloadableDataset is dataset base class implementing routines for downloading and extracting data via
emloop dataset download
command.
The typical use-case is that data_root
, url_root
and download_filenames
variables are passed to the
dataset constructor.
Alternatively, these properties might be directly implemented in their corresponding methods.
_configure_dataset
(data_root=None, download_urls=None, **kwargs)[source]¶Save the passed values and use them as a default property implementation.
Parameters: | |
---|---|
Return type: |
|
download
()[source]¶Maybe download and extract the extra files required.
If not already downloaded, download all files specified by download_urls()
. Then, extract
the downloaded files to data_root()
.
emloop dataset download <path-to-config>
Return type: | None |
---|
emloop.datasets.
StreamWrapper
(stream_fn, buffer_size=0, epoch_size=-1, name=None, profile=None)¶Bases: object
Dataset stream wrapper which manages buffering, epoch cutting etc.
Caution
Buffered StreamWrapper
must be used in with-resource environment
so that the enqueueing thread can be properly managed.
stream = StreamWrapper(dataset.train_stream, 'train')
for batch in stream: # 1st batch
# do stuff
for batch in stream: # 2nd batch
# do stuff
stream = StreamWrapper(dataset.train_stream, 'train', buffer=16, epoch_size=1000)
with stream: # we would get error without with-resource directive
for batch in stream: # 1st batch
# do stuff
__enter__
()[source]¶If buffered, start the enqueueing thread.
Return type: | Iterator [Mapping [str , Sequence [Any ]]] |
---|
__init__
(stream_fn, buffer_size=0, epoch_size=-1, name=None, profile=None)[source]¶Create new StreamWrapper.
Parameters: |
|
---|
__next__
()[source]¶Return next batch or end epoch with StopIteration
.
Return type: | Mapping [str , Sequence [Any ]] |
---|---|
Returns: | next batch |
Raises: | StopIteration – at the end of the epoch |
_dequeue_batch
()[source]¶Return a single batch from queue or None
signaling epoch end.
Raises: | ChildProcessError – if the enqueueing thread ended unexpectedly |
---|---|
Return type: | Optional [Mapping [str , Sequence [Any ]]] |
_enqueue_batches
(stop_event)[source]¶Enqueue all the stream batches. If specified, stop after epoch_size
batches.
Note
Signal the epoch end with None
.
Stop when:
- stop_event
is risen
- stream ends and epoch size is not set
- specified number of batches is enqueued
Note
This is used only with buffer
> 0.
Parameters: | stop_event (Event ) – event signaling stop instruction |
---|---|
Return type: | None |
_epoch_limit_reached
()[source]¶Returns True if the number of produced batches reached the specified epoch_size
.
Always return False if no limit was specified.
Return type: | bool |
---|
_get_stream
()[source]¶Possibly create and return raw dataset stream iterator.
Return type: | Iterator [+T_co] |
---|
_next_batch
()[source]¶Return a single batch or None
signaling epoch end.
Note
Signal the epoch end with None
.
Stop when: - stream ends and epoch size is not set - specified number of batches is returned
Return type: | Optional [Mapping [str , Sequence [Any ]]] |
---|---|
Returns: | a single batch or None signaling epoch end |
allow_buffering
¶A resource that allows the stream object to prepare batches in advance.
After the construction of the stream wrapper, the buffering is disabled. This
function makes it possible to allow buffering only when there is some spare CPU
time. A good place to allow buffering is e.g., during the training procedure in the
emloop.models.AbstractModel.run()
method, whenever the GIL
is released.
# the training method of a model
def run(self, batch, train, stream):
preprocess_batch_in_python(batch) # this function holds the GIL and fully utilizes the CPU
with stream.allow_buffering:
call_native_backend(batch) # this function is blocking, but releases the GIL
# we can use the GIL and the spare CPU to prepare the next batch
Return type: | ReleasedSemaphore |
---|---|
Returns: | A resource object that allows buffering when in use. |