emloop.datasets
¶
Module with emloop dataset concept (AbstractDataset
) and python BaseDataset
.
Classes¶
AbstractDataset
: This concept prescribes the API that is required from every emloop dataset.BaseDataset
: Base class for datasets written in python.DownloadableDataset
: DownloadableDataset is dataset base class implementing routines for downloading and extracting data viaStreamWrapper
: Dataset stream wrapper which manages buffering, epoch cutting etc.
-
class
emloop.datasets.
AbstractDataset
(config_str)¶ Bases:
object
This concept prescribes the API that is required from every emloop dataset.
Every emloop dataset has to have a constructor which takes YAML string config. Additionally, one may implement any
<stream_name>_stream
method in order to makestream_name
stream available in the emloopemloop.MainLoop
.All the defined stream methods should return a
Stream
.
-
class
emloop.datasets.
BaseDataset
(config_str)¶ Bases:
datasets.AbstractDataset
Base class for datasets written in python.
- In the inherited class, one should:
override the
_configure_dataset
(optional) implement
train_stream
method if intended to be used withemloop train ...
(optional) implement
<stream_name>_stream
method in order to make<stream_name>
stream available
-
__init__
(config_str)[source]¶ Create new dataset.
Decode the given YAML config string and pass the obtained
**kwargs
to_configure_dataset()
.- Parameters
config_str (
str
) – dataset configuration as YAML string
-
_configure_dataset
(output_dir, **kwargs)[source]¶ Configure the dataset with
**kwargs
decoded from YAML configuration.- Parameters
- Raises
NotImplementedError – if not overridden
-
class
emloop.datasets.
DownloadableDataset
(config_str)¶ Bases:
datasets.BaseDataset
DownloadableDataset is dataset base class implementing routines for downloading and extracting data via
emloop dataset download
command.The typical use-case is that
data_root
,url_root
anddownload_filenames
variables are passed to the dataset constructor. Alternatively, these properties might be directly implemented in their corresponding methods.-
_configure_dataset
(data_root=None, download_urls=None, **kwargs)[source]¶ Save the passed values and use them as a default property implementation.
-
download
()[source]¶ Maybe download and extract the extra files required.
If not already downloaded, download all files specified by
download_urls()
. Then, extract the downloaded files todata_root()
.emloop CLI example¶emloop dataset download <path-to-config>
- Return type
None
-
-
class
emloop.datasets.
StreamWrapper
(stream_fn, buffer_size=0, epoch_size=-1, name=None, profile=None)¶ Bases:
object
Dataset stream wrapper which manages buffering, epoch cutting etc.
- The main features are:
resets underlying dataset stream after the iteration reaches its end
if specified, uses consumer-producer buffer for batches allowing simultaneous batch producing and training
if specified, produces epochs of fixed size
logs the timings to the given profile
Caution
Buffered
StreamWrapper
must be used in with-resource environment so that the enqueueing thread can be properly managed.non-buffered StreamWrapper¶stream = StreamWrapper(dataset.train_stream, 'train') for batch in stream: # 1st batch # do stuff for batch in stream: # 2nd batch # do stuff
buffered StreamWrapper with fixed size epochs¶stream = StreamWrapper(dataset.train_stream, 'train', buffer=16, epoch_size=1000) with stream: # we would get error without with-resource directive for batch in stream: # 1st batch # do stuff
-
__init__
(stream_fn, buffer_size=0, epoch_size=-1, name=None, profile=None)[source]¶ Create new StreamWrapper.
- Parameters
stream_fn (
Callable
[[],Iterable
[Mapping
[str
,Sequence
[Any
]]]]) – callable which returns raw dataset streambuffer_size (
int
) – buffer size, < 1 means no bufferepoch_size (
int
) – if > 0, stop iteration after the specified number of batchesprofile (
Optional
[Mapping
[str
,List
[float
]]]) – profile to record times
-
__next__
()[source]¶ Return next batch or end epoch with
StopIteration
.- Return type
- Returns
next batch
- Raises
StopIteration – at the end of the epoch
-
_enqueue_batches
(stop_event)[source]¶ Enqueue all the stream batches. If specified, stop after
epoch_size
batches.Note
Signal the epoch end with
None
.Stop when: -
stop_event
is risen - stream ends and epoch size is not set - specified number of batches is enqueuedNote
This is used only with
buffer
> 0.- Parameters
stop_event (
Event
) – event signaling stop instruction- Return type
None
-
_epoch_limit_reached
()[source]¶ Returns True if the number of produced batches reached the specified
epoch_size
.Always return False if no limit was specified.
- Return type
-
_get_stream
()[source]¶ Possibly create and return raw dataset stream iterator.
- Return type
Iterator
[+T_co]
-
_next_batch
()[source]¶ Return a single batch or
None
signaling epoch end.Note
Signal the epoch end with
None
.Stop when: - stream ends and epoch size is not set - specified number of batches is returned
-
allow_buffering
¶ A resource that allows the stream object to prepare batches in advance.
After the construction of the stream wrapper, the buffering is disabled. This function makes it possible to allow buffering only when there is some spare CPU time. A good place to allow buffering is e.g., during the training procedure in the
emloop.models.AbstractModel.run()
method, whenever theGIL
is released.Usage¶# the training method of a model def run(self, batch, train, stream): preprocess_batch_in_python(batch) # this function holds the GIL and fully utilizes the CPU with stream.allow_buffering: call_native_backend(batch) # this function is blocking, but releases the GIL # we can use the GIL and the spare CPU to prepare the next batch
- Return type
ReleasedSemaphore
- Returns
A resource object that allows buffering when in use.