"""
Thin MapReduce-like layer that wraps the Python multiprocessing
library.
"""
from __future__ import annotations
from typing import Any, Optional, Callable, Sequence, Iterable
import doctest
import collections.abc
import multiprocessing
from operator import concat
from functools import reduce, partial
import parts
def _parts(xs: Iterable, quantity: int) -> Sequence:
"""
Wrapper for the partitioning function :obj:`~parts.parts.parts`. This
wrapper returns a :obj:`~collections.abc.Sized` list of parts if the
original input iterable is :obj:`~collections.abc.Sized`.
"""
xss = parts.parts(xs, quantity)
return list(xss) if isinstance(xss, collections.abc.Sized) else xss
[docs]class pool:
"""
Class for a MapReduce-for-multiprocessing resource pool that can be used to
run MapReduce-like workflows across multiple processes.
:param processes: Number of processes to allocate and to employ in executing workflows.
:param stages: Number of stages (progress updates are provided once per stage).
:param progress: Function that wraps an iterable (can be used to also report progress).
:param close: Flag indicating whether this instance should be closed after one workflow.
>>> from operator import inv, add
>>> with pool() as pool_:
... results = pool_.mapreduce(m=inv, r=add, xs=range(3))
... results
-6
"""
def __init__(
self: pool,
processes: Optional[int] = None,
stages: Optional[int] = None,
progress: Optional[Callable[[Iterable], Iterable]] = None,
close: Optional[bool] = False
):
"""
Initialize a :obj:`pool` instance given the target number of processes.
"""
# Use the maximum number of available processes as the default.
# If a negative number of processes is designated, wrap around
# and subtract from the maximum.
if isinstance(processes, int) and processes <= 0:
processes = multiprocessing.cpu_count() + processes
elif processes is None:
processes = multiprocessing.cpu_count()
# Only create a multiprocessing pool if necessary.
if processes != 1:
# pylint: disable=consider-using-with
self._pool = multiprocessing.Pool(processes=processes)
self._processes = processes
self._stages = stages
self._progress = progress
self._close = close # Indicates whether to close pool after first ``mapreduce`` call.
self._closed = False
self._terminated = False
[docs] def __enter__(self: pool):
"""
Placeholder to enable use of ``with`` construct.
"""
return self
[docs] def __exit__(self: pool, *exc_details):
"""
Close this instance; exceptions are not suppressed.
"""
self.close()
def _map(self: pool, op: Callable, xs: Iterable):
"""
Split data (one part per process) and map the operation
onto each part.
"""
if self._processes == 1:
return [[op(x) for x in xs]]
return self._pool.map(partial(map, op), parts.parts(xs, len(self)))
def _reduce(self: pool, op: Callable, xs_per_part: Iterable):
"""
Apply the specified binary operator to the results
obtained from multiple processes.
"""
if self._processes == 1 and len(xs_per_part) == 1:
return reduce(op, map(partial(reduce, op), xs_per_part))
return reduce(op, self._pool.map(partial(reduce, op), xs_per_part))
[docs] def mapreduce(
self: pool,
m: Callable[..., Any],
r: Callable[..., Any],
xs: Iterable,
stages: Optional[int] = None,
progress: Optional[Callable[[Iterable], Iterable]] = None,
close: Optional[bool] = None
):
"""
Perform the map operation ``m`` and the reduce operation ``r`` over the
supplied inputs ``xs`` (optionally in stages on subsequences of the data)
and then release resources if directed to do so.
:param m: Operation to be applied to each element in the input iterable.
:param r: Operation that can combine two outputs from itself, the map operation, or a mix.
:param xs: Input to process using the map and reduce operations.
:param stages: Number of stages (progress updates are provided once per stage).
:param progress: Function that wraps an iterable (can be used to also report progress).
:param close: Flag indicating whether this instance should be closed after one workflow.
The ``stages``, ``progress``, and ``close`` parameter values each revert by
default to those of this :obj:`pool` instance if they are not explicitly
supplied. Supplying a value for any one of these parameters when invoking
this method overrides this instance's value for that parameter *only during
that invocation of the method* (this instance's value does not change).
>>> from operator import inv, add
>>> with pool() as pool_:
... pool_.mapreduce(m=inv, r=add, xs=range(3))
-6
"""
# A :obj:`ValueError` is returned to maintain consistency with the
# behavior of the underlying :obj:`multiprocessing` ``Pool`` object.
if self.closed():
raise ValueError('Pool not running')
# Update state to enforce semantics of closing.
self._closed = close if close is not None else self._closed
stages = self._stages if stages is None else stages
progress = self._progress if progress is None else progress
close = self._close if close is None else close
if stages is None:
result = self._reduce(r, self._map(m, xs))
else:
# Separate input into specified number of stages.
xss = _parts(xs, stages)
# Perform each stage sequentially.
result = None
for xs_ in (progress(xss) if progress is not None else xss):
result_stage = self._reduce(r, self._map(m, xs_))
result = result_stage if result is None else r(result, result_stage)
# Release resources if directed to do so.
if close:
self.close()
return result
[docs] def mapconcat(
self: pool,
m: Callable[..., Sequence],
xs: Iterable,
stages: Optional[int] = None,
progress: Optional[Callable[[Iterable], Iterable]] = None,
close: Optional[bool] = None
):
"""
Perform the map operation ``m`` over the elements in the iterable
``xs`` (optionally in stages on subsequences of the data) and then
release resources if directed to do so.
:param m: Operation to be applied to each element in the input iterable.
:param xs: Input to process using the map operation.
:param stages: Number of stages (progress updates are provided once per stage).
:param progress: Function that wraps an iterable (can be used to also report progress).
:param close: Flag indicating whether this instance should be closed after one workflow.
In contrast to the :obj:`pool.mapreduce` method, the map operation ``m``
*must return a* :obj:`~collections.abc.Sequence`, as the results of this operation are
combined using :obj:`operator.concat`.
The ``stages``, ``progress``, and ``close`` parameter values each revert by
default to those of this :obj:`pool` instance if they are not explicitly
supplied. Supplying a value for any one of these parameters when invoking
this method overrides this instance's value for that parameter *only during
that invocation of the method* (this instance's value does not change).
>>> with pool() as pool_:
... pool_.mapconcat(m=tuple, xs=[[1], [2], [3]])
(1, 2, 3)
"""
return self.mapreduce(m, concat, xs, stages, progress, close)
[docs] def close(self: pool):
"""
Prevent any additional work from being added to this instance and
release resources associated with this instance.
>>> from operator import inv
>>> pool_ = pool()
>>> pool_.close()
>>> pool_.mapconcat(m=inv, xs=range(3))
Traceback (most recent call last):
...
ValueError: Pool not running
"""
self._closed = True
if self._processes != 1:
self._pool.close()
[docs] def closed(self: pool) -> bool:
"""
Return a boolean indicating whether this instance has been closed.
>>> pool_ = pool()
>>> pool_.close()
>>> pool_.closed()
True
"""
if self._processes == 1:
return self._closed
return (
self._closed or
self._pool._state in ('CLOSE', 'TERMINATE') # pylint: disable=protected-access
)
[docs] def terminate(self: pool):
"""
.. |Pool| replace:: ``Pool``
.. _Pool: https://docs.python.org/3/library/multiprocessing.html#using-a-pool-of-workers
Terminate the underlying :obj:`multiprocessing` |Pool|_ instance
(associated resources will eventually be released, or they will be
released when the instance is closed).
"""
self._closed = True
self._terminated = True
if self._processes != 1:
self._pool.terminate()
[docs] def cpu_count(self: pool) -> int:
"""
Return number of available CPUs.
>>> with pool() as pool_:
... isinstance(pool_.cpu_count(), int)
True
"""
return multiprocessing.cpu_count()
[docs] def __len__(self: pool) -> int:
"""
Return number of processes supplied as a configuration parameter
when this instance was created.
>>> with pool(1) as pool_:
... len(pool_)
1
"""
return self._processes
[docs]def mapreduce(
m: Callable[..., Any],
r: Callable[..., Any],
xs: Iterable,
processes: Optional[int] = None,
stages: Optional[int] = None,
progress: Optional[Callable[[Iterable], Iterable]] = None
):
"""
One-shot function for performing a workflow (no explicit object
management or resource allocation is required on the user's part).
:param m: Operation to be applied to each element in the input iterable.
:param r: Operation that can combine two outputs from itself, the map operation, or a mix.
:param xs: Input to process using the map and reduce operations.
:param processes: Number of processes to allocate and to employ in executing the workflow.
:param stages: Number of stages (progress updates are provided once per stage).
:param progress: Function that wraps an iterable (can be used to also report progress).
>>> from operator import inv, add
>>> mapreduce(m=inv, r=add, xs=range(3))
-6
"""
if processes == 1:
if stages is not None:
xss = _parts(xs, stages) # Create one part per stage.
return reduce(r, [
m(x)
for xs in (progress(xss) if progress is not None else xss)
for x in xs
])
return reduce(r, [m(x) for x in xs])
pool_ = pool() if processes is None else pool(processes)
return pool_.mapreduce(m, r, xs, stages=stages, progress=progress, close=True)
[docs]def mapconcat(
m: Callable[..., Sequence],
xs: Iterable,
processes: Optional[int] = None,
stages: Optional[int] = None,
progress: Optional[Callable[[Iterable], Iterable]] = None
):
"""
One-shot function for applying an operation across an iterable and
assembling the results back into a :obj:`list` (no explicit object
management or resource allocation is required on the user's part).
:param m: Operation to be applied to each element in the input iterable.
:param xs: Input to process using the map and reduce operations.
:param processes: Number of processes to allocate and to employ in executing the workflow.
:param stages: Number of stages (progress updates are provided once per stage).
:param progress: Function that wraps an iterable (can be used to also report progress).
In contrast to the :obj:`mapreduce` function, the map operation ``m``
*must return a* :obj:`~collections.abc.Sequence`, as the results of this operation are
combined using :obj:`operator.concat`.
>>> mapconcat(m=list, xs=[[1], [2], [3]])
[1, 2, 3]
"""
return mapreduce(m, concat, xs, processes, stages=stages, progress=progress)
if __name__ == '__main__':
doctest.testmod() # pragma: no cover