Introduction to Data Parallelism#
Learning Objectives
Understand how parallelization can improve code runtime
Implement basic parallization in Python
import time
import numpy as np
import matplotlib.pyplot as plt
import concurrent.futures
Note
This notebook is intended for students who are familiar with the basic principles of scientific programming and are looking to branch into intermediate topics that could be useful for research applications.
Motivation & Goals#
Often in astronomy, we are working with many pieces of data that are similar in format (i.e. photometric images, spectra, simulation snapshots, etc…) and we want to perform operations across all of these data. Those operations could be simply loading in the data into memory from storage, or they could be as complicated as running a full scale photometric reduction pipeline. In this notebook we start addressing various techniques to speed up working with large amounts of data through parallelism schemes.
The goal of this resource is to show someone how to simply parallelize a for
loop in the context of working with data. This resource uses concurrent.futures
.
We will be using images of galaxies from Galaxy Zoo for this exercise.
Problem Description#
We need to load in 1000 2D photometric images from storage into memory using a simple for loop. We can imagine that the time to load in each image takes ~0.01 seconds. The images are stored in the ./images/
directory and have a .npy
file format. Each image has shape (424,424)
. We time how long this takes for reference.
images = []
start = time.time()
for i in range(1000):
image_i = plt.imread(f'./images/{i}.jpg')
time.sleep(0.01)
images.append(image_i)
stop = time.time()
print(f"Loading in the data took ~{stop - start:0.2f} seconds")
Loading in the data took ~20.38 seconds
This is very slow because it is being done sequentially with only a single process. Since the 2010’s, it has become common place that laptops used for school, research, and everyday use have several cores (4+) at their disposal. A simple use case of parallelism would be to distribute the above for
loop task across multiple cores to speed up the total time. In the case of ~10 seconds total this is not super useful, but for exceedingly large files or for systems with slow File I/O speeds, this can save minutes to hours of time.
For this resource, we will show one of the simplest methods for parallelization of a for
loop: concurrent.futures
To use simple parallelization with concurrent.futures
, it is important to first understand the concept of the map()
function. The map()
function returns a map object of the results after applying the given function to each item of a given iterable (i.e. list, tuple, array, etc…). To use the map()
function we must first rewrite the operations done in the above for
loop as a mappable function
def load_data(i):
"""
a mappable function to load in the ith image.
Args:
i - int - index
Returns:
image_i - np.ndarray - a (424,424) image array corresponding to index i.
"""
time.sleep(0.01) #artificial sleep to mimick the scenario
image_i = plt.imread(f'./images/{i}.jpg')
return image_i
From here we can perform the same task as the initial for
loop with our mappable function by calling the map()
function, giving the mappable function and the iteratable we wish to map the function onto. In this case our mappable function is load_data()
and our iteratable is the same iterable we were going to use in our for
loop.
start = time.time()
images = list(map(load_data, range(1000)))
stop = time.time()
print(f"Loading in the data took ~{stop - start:0.2f} seconds")
Loading in the data took ~20.77 seconds
Question
We note that if the outer most function call list()
is not present the code runs nearly instantly, why would it not take ~10 seconds like before?
We don’t yet see any speed up, but this is just because we haven’t started using multiple cores yet. This is all done on a single core. Any for
loop can be recast as a mapping. In some cases a mapping can actually be a little faster than a for
loop. We will now dive into using a pool of processors to perform the mapping in parallel.
Using concurrent.futures
is quite simple and only requires a few lines of new code. Often times jupyter
environments can clash with the mechanics going on under the hood, to avoid dealing with these issues we simply use a jupyter
magic command to write the contents of the cell below to a python script file which we will run in the cell below using another magic command (this is equivalent to just running a .py
file instead of using your notebook.
%%writefile map_parallel.py
import time
import numpy as np
import matplotlib.pyplot as plt
import concurrent.futures
def load_data(i):
"""
a mappable function to load in the ith image.
Args:
i - int - index
Returns:
image_i - np.ndarray - a (256,256) image array corresponding to index i.
"""
time.sleep(0.01) #artificial sleep to mimick the scenario
image_i = plt.imread(f'./images/{i}.jpg')
return image_i
def main():
max_workers = 4 #this should be however many cores you are able to use
start = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
images = list(executor.map(load_data, range(1000)))
executor.shutdown(wait=True)
stop = time.time()
print(f"Loading in the data took ~{stop - start:0.2f} seconds")
return images
if __name__ == '__main__':
images = main()
"""
from here the list of images can be used as one
pleases. It can be fed into other functions for
analysis for example. For this example, we
will just save them all into large numpy array
and save that array to local storage.
"""
images = np.array(images)
np.save('images.npy', images)
Overwriting map_parallel.py
!python map_parallel.py
Loading in the data took ~5.03 seconds
"""
we load the images back in, in practice one would perform the analysis
in the python script and store the results of the analysis in storage.
"""
images = np.load('images.npy')
images.shape
(1000, 424, 424, 3)
We can see that when using 4 cores the time taken to load in the data is ~3-4x faster! The reason it is not exactly 4x faster is because one of the processors used in the pool is used to manage the process pool. One can use as many workers in the process pool as they have processors available in their hardware.
The ProcessPoolExecutor
is an abstract object in the concurrent.futures
module that uses a pool of processes to execute function calls asynchronously. ProcessPoolExecutor
uses the multiprocessing
module, which allows it to side-step the Global Interpreter Lock that makes parallelization in python frustratingly overcomplicated but also means that only picklable objects can be executed and returned. It is also possible to use concurrent.futures
to do multithreading related tasks with a ThreadPoolExecutor
, for a review on the difference between these two topics we refer to this article on multiprocessing vs. multithreading.
Exercises#
Exercise 1
Write your own python script which uses concurrent.futures
to run a mock “analysis” pipeline on all 1000 images using the pipeline()
function written below.
def pipeline(image):
"""
a mock pipeline analysis function which we wish
to apply to each image in our stack.
"""
nx, ny = image.size
weighted_sum = 0
for i in range(nx):
for j in range(ny):
weighted_sum += np.sin(image[i, j])
return weighted_sum
Exercise 2
Use your own concurrent.futures
analysis pipeline framework code from the first exercise to make a new analysis pipeline where you compute the median brightness in each color filter. Make a histogram of the median brightness for each filter (R,G,B) across all 1000 images. What do you see in the distributions?