Batch iterators¶
Tools for creating batch iterators. See the Batch iterators tutorial for more details.
Pipeline¶
- class dpipe.batch_iter.pipeline.Infinite(source: ~typing.Iterable, *transformers: ~typing.Union[~typing.Callable, ~dpipe.batch_iter.pipeline.Transform], batch_size: ~typing.Union[int, ~typing.Callable], batches_per_epoch: int, buffer_size: int = 1, combiner: ~typing.Callable = <function combine_to_arrays>, **kwargs)[source]¶
Bases:
object
Combine
source
andtransformers
into a batch iterator that yields batches of sizebatch_size
.- Parameters
source (Iterable) – an infinite iterable.
transformers (Callable) – the callable that transforms the objects generated by the previous element of the pipeline.
batch_size (int, Callable) – the size of batch.
batches_per_epoch (int) – the number of batches to yield each epoch.
buffer_size (int) – the number of objects to keep buffered in each pipeline element. Default is 1.
combiner (Callable) – combines chunks of single batches in multiple batches, e.g. combiner([(x, y), (x, y)]) -> ([x, x], [y, y]). Default is
combine_to_arrays
.kwargs – additional keyword arguments passed to the
combiner
.
References
See the Batch iterators tutorial for more details.
- class dpipe.batch_iter.pipeline.Threads(func: Callable, *args, n_workers: int = 1, buffer_size: int = 1, **kwargs)[source]¶
Bases:
Iterator
Apply
func
concurrently to each object in the batch iterator by moving it ton_workers
threads.- Parameters
transform (Callable(Iterable) -> Iterable) – a function that takes an iterable and yields transformed values.
n_workers (int) – the number of threads to which
transform
will be moved.buffer_size (int) – the number of objects to keep buffered.
args – additional positional arguments passed to
transform
.kwargs – additional keyword arguments passed to
transform
.
References
See the Batch iterators tutorial for more details.
- class dpipe.batch_iter.pipeline.Loky(func: Callable, *args, n_workers: int = 1, buffer_size: int = 1, **kwargs)[source]¶
Bases:
Transform
Apply
func
concurrently to each object in the batch iterator by moving it ton_workers
processes.- Parameters
transform (Callable(Iterable) -> Iterable) – a function that takes an iterable and yields transformed values.
n_workers (int) – the number of threads to which
transform
will be moved.buffer_size (int) – the number of objects to keep buffered.
args – additional positional arguments passed to
transform
.kwargs – additional keyword arguments passed to
transform
.
Notes
Process-based parallelism is implemented with the
loky
backend.References
See the Batch iterators tutorial for more details.
- class dpipe.batch_iter.pipeline.Iterator(transform: Callable, *args, n_workers: int = 1, buffer_size: int = 1, **kwargs)[source]¶
Bases:
Transform
Apply
transform
to the iterator of values that flow through the batch iterator.- Parameters
transform (Callable(Iterable) -> Iterable) – a function that takes an iterable and yields transformed values.
n_workers (int) – the number of threads to which
transform
will be moved.buffer_size (int) – the number of objects to keep buffered.
args – additional positional arguments passed to
transform
.kwargs – additional keyword arguments passed to
transform
.
References
See the Batch iterators tutorial for more details.
- dpipe.batch_iter.pipeline.combine_batches(inputs)[source]¶
Combines tuples from
inputs
into batches: [(x, y), (x, y)] -> [(x, x), (y, y)]
- dpipe.batch_iter.pipeline.combine_to_arrays(inputs)[source]¶
Combines tuples from
inputs
into batches of numpy arrays.
- dpipe.batch_iter.pipeline.combine_pad(inputs, padding_values: Union[float, Sequence[float]] = 0, ratio: Union[float, Sequence[float]] = 0.5)[source]¶
Combines tuples from
inputs
into batches and pads each batch in order to obtain a correctly shaped numpy array.- Parameters
inputs –
padding_values – values to pad with. If Callable (e.g.
numpy.min
) -padding_values(x)
will be used.ratio – the fraction of the padding that will be applied to the left,
1.0 - ratio
will be applied to the right. By default0.5 - ratio
, it is applied uniformly to the left and right.
References
Sources¶
- dpipe.batch_iter.sources.sample(sequence: Sequence, weights: Optional[Sequence[float]] = None, random_state: Optional[Union[RandomState, int]] = None)[source]¶
Infinitely yield samples from
sequence
according toweights
.- Parameters
sequence (Sequence) – the sequence of elements to sample from.
weights (Sequence[float], None, optional) – the weights associated with each element. If
None
, the weights are assumed to be equal. Should be the same size assequence
.random_state (int, np.random.RandomState, None, optional) – if not
None
, used to set the random seed for reproducibility reasons.
- dpipe.batch_iter.sources.load_by_random_id(*loaders: Callable, ids: Sequence, weights: Optional[Sequence[float]] = None, random_state: Optional[Union[RandomState, int]] = None)[source]¶
Infinitely yield objects loaded by
loaders
according to the identifier fromids
. The identifiers are randomly sampled fromids
according to theweights
.- Parameters
loaders (Callable) – function, which loads object by its id.
ids (Sequence) – the sequence of identifiers to sample from.
weights (Sequence[float], None, optional) – The weights associated with each id. If
None
, the weights are assumed to be equal. Should be the same size asids
.random_state (int, np.random.RandomState, None, optional) – if not
None
, used to set the random seed for reproducibility reasons.
Blocks¶
- class dpipe.batch_iter.expiration_pool.ExpirationPool(pool_size: int, repetitions: int, iterations: int = 1)[source]¶
Bases:
Iterator
A simple expiration pool for time consuming operations that don’t fit into RAM. See
expiration_pool
for details.Examples
>>> batch_iter = Infinite( # ... some expensive operations, e.g. loading from disk, or preprocessing ExpirationPool(pool_size, repetitions), # ... here are the values from pool # ... other lightweight operations # ... )
- dpipe.batch_iter.expiration_pool.expiration_pool(iterable: Iterable, pool_size: int, repetitions: int, iterations: int = 1)[source]¶
Caches
pool_size
items fromiterable
. The item is removed from cache after it was generatedrepetitions
times. After an item is removed, a new one is extracted from theiterable
. Finally,iterations
controls how many values are generated after a new value is added, thus speeding up the pipeline at early stages.
Utils¶
- dpipe.batch_iter.utils.pad_batch_equal(batch, padding_values: Union[float, Sequence[float]] = 0, ratio: Union[float, Sequence[float]] = 0.5)[source]¶
Pad each element of
batch
to obtain a correctly shaped array.References
- dpipe.batch_iter.utils.unpack_args(func: Callable, *args, **kwargs)[source]¶
Returns a function that takes an iterable and unpacks it while calling
func
.args
andkwargs
are passed tofunc
as additional arguments.Examples
>>> def add(x, y): >>> return x + y >>> >>> add_ = unpack_args(add) >>> add(1, 2) == add_([1, 2]) >>> True
- dpipe.batch_iter.utils.multiply(func: Callable, *args, **kwargs)[source]¶
Returns a function that takes an iterable and maps
func
over it. Useful when multiple batches require the same function.args
andkwargs
are passed tofunc
as additional arguments.
- dpipe.batch_iter.utils.apply_at(index: Union[int, Sequence[int]], func: Callable, *args, **kwargs)[source]¶
Returns a function that takes an iterable and applies
func
to the values at the correspondingindex
.args
andkwargs
are passed tofunc
as additional arguments.Examples
>>> first_sqr = apply_at(0, np.square) >>> first_sqr([3, 2, 1]) >>> (9, 2, 1)
- dpipe.batch_iter.utils.zip_apply(*functions: Callable, **kwargs)[source]¶
Returns a function that takes an iterable and zips
functions
over it.kwargs
are passed to each function as additional arguments.Examples
>>> zipper = zip_apply(np.square, np.sqrt) >>> zipper([4, 9]) >>> (16, 3)
- dpipe.batch_iter.utils.random_apply(p: float, func: Callable, *args, **kwargs)[source]¶
Returns a function that applies
func
with a given probabilityp
.args
andkwargs
are passed tofunc
as additional arguments.
- dpipe.batch_iter.utils.sample_args(func: Callable, *args: Callable, **kwargs: Callable)[source]¶
Returns a function that samples arguments for
func
fromargs
andkwargs
.Each argument in
args
andkwargs
must be a callable that samples a random value.Examples
>>> from scipy.ndimage import rotate >>> >>> random_rotate = sample_args(rotate, angle=np.random.normal) >>> random_rotate(x) >>> # same as >>> rotate(x, angle=np.random.normal())