issues with parallel processing of numpy arrays

It sounds like you’re trying to parallelize this operation with either Charm or Ray (it’s not clear how you would use both together).

If you choose to use Ray, and your data is a numpy array, you can take advantage of zero-copy reads to avoid any deserialization overhead.

You may want to optimize your sliding window function a bit, but it will likely look like this:

@ray.remote
def apply_rolling(f, arr, start, end, window_size):
    results_arr = []
    for i in range(start, end - window_size):
        results_arr.append(f(arr[i : i + windows_size])
    return np.array(results_arr)

note that this structure lets us call f multiple times within a single task (aka batching).

To use our function:

# Some small setup
big_arr = np.arange(10000000)
big_arr_ref = ray.put(big_arr)

batch_size = len(big_arr) // ray.available_resources()["CPU"]
window_size = 100

# Kick off our tasks
result_refs = []
for i in range(0, big_arr, batch_size):
    end_point = min(i + batch_size, len(big_arr))
    ref = apply_rolling.remote(f, big_arr_ref, i, end_point)
    result_refs.append(ref)


# Handle the results
flattened = []
for section in ray.get(result_refs):
    flattened.extend(section)

I’m sure you’ll want to customize this code, but here are 2 important and nice properties that you’ll likely want to maintain.

Batching: We’re utilizing batching to avoid starting too many tasks. In any system, parallelizing incurs overhead, so we always want to be careful and make sure we don’t start too many tasks. Furthermore, we are calculating batch_size = len(big_arr) // ray.available_resources()["CPU"] to make sure we use exactly the same number of batches as we have CPUs.

Shared memory: Since Ray’s object store supports zero copy reads from numpy arrays, calling ray.get or reading from a numpy array is pretty much free (on a single machine where there are no network costs). There is some overhead in serializing/calling ray.put though, so this approach only calls put (the expensive operation) once, and ray.get (which is implicitly called) many times.

Tip: Be careful when passing arrays as parameters directly into remote functions. It will call ray.put multiple times, even if you pass the same object.

CLICK HERE to find out more related problems solutions.

Leave a Comment

Your email address will not be published.

Scroll to Top