Batch iterators

Batch iterators are built using the following constructor:

from dpipe.batch_iter import Infinite

its only required argument is source - an infinite iterable that yields entries from your data.

We’ll build an example batch iterator that yields batches from the MNIST dataset:

from torchvision.datasets import MNIST
from pathlib import Path
import numpy as np


# download to ~/tests/MNIST, if necessary
dataset = MNIST(Path('~/tests/MNIST').expanduser(), transform=np.array, download=True)

Sampling

from dpipe.batch_iter import sample

# yield 10 batches of size 30 each epoch:

batch_iter = Infinite(
    sample(dataset), # randomly sample from the dataset
    batch_size=30, batches_per_epoch=10,
)

sample infinitely yields data randomly sampled from the dataset:

for x, y in sample(dataset):
    print(x.shape, y)
    break
(28, 28) 7

We use infinite sources because our batch iterators are executed in a background thread, this allows us to use the resources more efficiently. For example, a new batch can be prepared while the network’s forward and backward passes are performed in the main thread.

Now we can simply iterate over batch_iter:

# give 10 batches of size 30
for xs, ys in batch_iter():
    print(xs.shape, ys.shape)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)

… and reuse it again:

# give another 10 batches of size 30
for xs, ys in batch_iter():
    print(xs.shape, ys.shape)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)

After the training is over you must close the batch iterator in order to stop all the background processes:

batch_iter.close()

Or you can use it as a context manager:

batch_iter = Infinite(
    sample(dataset),
    batch_size=30, batches_per_epoch=10,
)

with batch_iter:
    for xs, ys in batch_iter():
        print(xs.shape, ys.shape)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)
(30, 28, 28) (30,)

Transformations

Let’s add more transformations to the data.

from dpipe.im import zoom

def zoom_image(pair):
    image, label = pair
    return zoom(image, scale_factor=[2, 2]), label
batch_iter = Infinite(
    sample(dataset), # yields pairs
    zoom_image, # zoom the images by a factor of 2

    batch_size=30, batches_per_epoch=3,
)

You can think of Infinite as a pipe through which the data flows.

Each function takes as input the data (an [image, label] pair in this case) applies a trasformation, and the result is propagated further.

with batch_iter:
    for xs, ys in batch_iter():
        print(xs.shape, ys.shape)
(30, 56, 56) (30,)
(30, 56, 56) (30,)
(30, 56, 56) (30,)

Note, that because sample yields pairs, pair is the input of zoom_image. This is not very user-friendly, that’s why there are a number of wrappers for transformers:

from dpipe.batch_iter import unpack_args

# a better version of zoom
def zoom_image(image, label):
    return zoom(image, scale_factor=[2, 2]), label


batch_iter = Infinite(
    sample(dataset),
    unpack_args(zoom_image), # unpack the arguments before calling the function

    batch_size=30, batches_per_epoch=3)

# or use a lambda directly
batch_iter = Infinite(
    sample(dataset),
    unpack_args(lambda image, label: [zoom(image, scale_factor=[2, 2]), label]),

    batch_size=30, batches_per_epoch=3)

However, there is still redundancy: the label argument is simply passed through, only the image is transformed. Let’s fix that:

from dpipe.batch_iter import apply_at

batch_iter = Infinite(
    sample(dataset),
    # apply zoom at index 0 of the pair with scale_factor=[2, 2] as an additional argument
    apply_at(0, zoom, scale_factor=[2, 2]),

    batch_size=30, batches_per_epoch=3)
with batch_iter:
    for xs, ys in batch_iter():
        print(xs.shape, ys.shape)
(30, 56, 56) (30,)
(30, 56, 56) (30,)
(30, 56, 56) (30,)

Now we don’t even have to create another function!

Check dpipe.batch_iter.utils for other helper functions.

Parallel execution

The batch iterator supports both thread-based and process-based execution.

Threads

Wrap the function in Threads in order to enable thread-based parallelism:

%%time

import time
import itertools
from dpipe.batch_iter import Threads


def do_stuff(x):
    time.sleep(1)
    return x ** 2,

batch_iter = Infinite(
    range(10),
    do_stuff, # sleep for 10 seconds
    batch_size=10, batches_per_epoch=1
)

for value in batch_iter():
    pass
CPU times: user 33.3 ms, sys: 9.17 ms, total: 42.5 ms
Wall time: 10 s
%%time

batch_iter = Infinite(
    range(10),
    Threads(do_stuff, n_workers=2), # sleep for 5 seconds
    batch_size=10, batches_per_epoch=1
)

for value in batch_iter():
    pass
CPU times: user 21.4 ms, sys: 7.75 ms, total: 29.1 ms
Wall time: 5.01 s

Processes

Similarly, wrap the function in Loky in order to enable process-based parallelism:

from dpipe.batch_iter import Loky
%%time

batch_iter = Infinite(
    range(10),
    Loky(do_stuff, n_workers=2), # sleep for 5 seconds
    batch_size=10, batches_per_epoch=1
)

for value in batch_iter():
    pass
CPU times: user 43.6 ms, sys: 27.6 ms, total: 71.2 ms
Wall time: 5.56 s

Combining objects into batches

If your dataset contains items of various shapes, you can’t just stack them into batches. For example you may want to pad them to a common shape. To do this, pass a custom combiner to Infinite:

# random 3D images of random shapes:

images = [np.random.randn(10, 10, np.random.randint(2, 40)) for _ in range(100)]
labels = np.random.randint(0, 2, size=30)
images[0].shape, images[1].shape
((10, 10, 34), (10, 10, 34))
from dpipe.batch_iter import combine_pad

batch_iter = Infinite(
    sample(list(zip(images, labels))),
    batch_size=5, batches_per_epoch=3,
#     pad and combine
    combiner=combine_pad
)

with batch_iter:
    for xs, ys in batch_iter():
        print(xs.shape, ys.shape)
(5, 10, 10, 39) (5,)
(5, 10, 10, 34) (5,)
(5, 10, 10, 39) (5,)

Adaptive batch size

If samples in your pipeline have various sizes, a constant batch size can be too wasteful.

You can pass a function to batch_size instead of an integer.

Let’s say we are classifying 3D images of different shapes along the last axis. We want a batch to contain at most 100 slices along the last axis.

def should_add(seq, item):
    # seq - sequence of already added objects to the batch
    # item - the next item

    count = 0
    for image, label in seq + [item]:
        count += image.shape[-1]

    return count <= 100
from dpipe.batch_iter import combine_pad

batch_iter = Infinite(
    sample(list(zip(images, labels))),

    batch_size=should_add, batches_per_epoch=3,
    combiner=combine_pad
)

with batch_iter:
    for xs, ys in batch_iter():
        print(xs.shape, ys.shape)
(5, 10, 10, 34) (5,)
(4, 10, 10, 25) (4,)
(4, 10, 10, 32) (4,)

Note that the batch sizes are different: 4, 4, 5