mr4mp module

Thin MapReduce-like layer that wraps the Python multiprocessing library.

class mr4mp.mr4mp.pool(processes=None, stages=None, progress=None, close=False)[source]

Bases: object

Class for a MapReduce-for-multiprocessing resource pool that can be used to run MapReduce-like workflows across multiple processes.

Parameters
  • processes (Optional[int]) – Number of processes to allocate and to employ in executing workflows.

  • stages (Optional[int]) – Number of stages (progress updates are provided once per stage).

  • progress (Optional[Callable[[Iterable], Iterable]]) – Function that wraps an iterable (can be used to also report progress).

  • close (Optional[bool]) – Flag indicating whether this instance should be closed after one workflow.

>>> from operator import inv, add
>>> with pool() as pool_:
...     results = pool_.mapreduce(m=inv, r=add, xs=range(3))
...     results
-6
__enter__()[source]

Placeholder to enable use of with construct.

__exit__(*exc_details)[source]

Close this instance; exceptions are not suppressed.

mapreduce(m, r, xs, stages=None, progress=None, close=None)[source]

Perform the map operation m and the reduce operation r over the supplied inputs xs (optionally in stages on subsequences of the data) and then release resources if directed to do so.

Parameters
  • m (Callable[…, Any]) – Operation to be applied to each element in the input iterable.

  • r (Callable[…, Any]) – Operation that can combine two outputs from itself, the map operation, or a mix.

  • xs (Iterable) – Input to process using the map and reduce operations.

  • stages (Optional[int]) – Number of stages (progress updates are provided once per stage).

  • progress (Optional[Callable[[Iterable], Iterable]]) – Function that wraps an iterable (can be used to also report progress).

  • close (Optional[bool]) – Flag indicating whether this instance should be closed after one workflow.

The stages, progress, and close parameter values each revert by default to those of this pool instance if they are not explicitly supplied. Supplying a value for any one of these parameters when invoking this method overrides this instance’s value for that parameter only during that invocation of the method (this instance’s value does not change).

>>> from operator import inv, add
>>> with pool() as pool_:
...     pool_.mapreduce(m=inv, r=add, xs=range(3))
-6
mapconcat(m, xs, stages=None, progress=None, close=None)[source]

Perform the map operation m over the elements in the iterable xs (optionally in stages on subsequences of the data) and then release resources if directed to do so.

Parameters
  • m (Callable[…, Sequence]) – Operation to be applied to each element in the input iterable.

  • xs (Iterable) – Input to process using the map operation.

  • stages (Optional[int]) – Number of stages (progress updates are provided once per stage).

  • progress (Optional[Callable[[Iterable], Iterable]]) – Function that wraps an iterable (can be used to also report progress).

  • close (Optional[bool]) – Flag indicating whether this instance should be closed after one workflow.

In contrast to the pool.mapreduce method, the map operation m must return a Sequence, as the results of this operation are combined using operator.concat.

The stages, progress, and close parameter values each revert by default to those of this pool instance if they are not explicitly supplied. Supplying a value for any one of these parameters when invoking this method overrides this instance’s value for that parameter only during that invocation of the method (this instance’s value does not change).

>>> with pool() as pool_:
...     pool_.mapconcat(m=tuple, xs=[[1], [2], [3]])
(1, 2, 3)
close()[source]

Prevent any additional work from being added to this instance and release resources associated with this instance.

>>> from operator import inv
>>> pool_ = pool()
>>> pool_.close()
>>> pool_.mapconcat(m=inv, xs=range(3))
Traceback (most recent call last):
  ...
ValueError: Pool not running
closed()[source]

Return a boolean indicating whether this instance has been closed.

>>> pool_ = pool()
>>> pool_.close()
>>> pool_.closed()
True
Return type

bool

terminate()[source]

Terminate the underlying multiprocessing Pool instance (associated resources will eventually be released, or they will be released when the instance is closed).

cpu_count()[source]

Return number of available CPUs.

>>> with pool() as pool_:
...     isinstance(pool_.cpu_count(), int)
True
Return type

int

__len__()[source]

Return number of processes supplied as a configuration parameter when this instance was created.

>>> with pool(1) as pool_:
...     len(pool_)
1
Return type

int

mr4mp.mr4mp.mapreduce(m, r, xs, processes=None, stages=None, progress=None)[source]

One-shot function for performing a workflow (no explicit object management or resource allocation is required on the user’s part).

Parameters
  • m (Callable[…, Any]) – Operation to be applied to each element in the input iterable.

  • r (Callable[…, Any]) – Operation that can combine two outputs from itself, the map operation, or a mix.

  • xs (Iterable) – Input to process using the map and reduce operations.

  • processes (Optional[int]) – Number of processes to allocate and to employ in executing the workflow.

  • stages (Optional[int]) – Number of stages (progress updates are provided once per stage).

  • progress (Optional[Callable[[Iterable], Iterable]]) – Function that wraps an iterable (can be used to also report progress).

>>> from operator import inv, add
>>> mapreduce(m=inv, r=add, xs=range(3))
-6
mr4mp.mr4mp.mapconcat(m, xs, processes=None, stages=None, progress=None)[source]

One-shot function for applying an operation across an iterable and assembling the results back into a list (no explicit object management or resource allocation is required on the user’s part).

Parameters
  • m (Callable[…, Sequence]) – Operation to be applied to each element in the input iterable.

  • xs (Iterable) – Input to process using the map and reduce operations.

  • processes (Optional[int]) – Number of processes to allocate and to employ in executing the workflow.

  • stages (Optional[int]) – Number of stages (progress updates are provided once per stage).

  • progress (Optional[Callable[[Iterable], Iterable]]) – Function that wraps an iterable (can be used to also report progress).

In contrast to the mapreduce function, the map operation m must return a Sequence, as the results of this operation are combined using operator.concat.

>>> mapconcat(m=list, xs=[[1], [2], [3]])
[1, 2, 3]