mr4mp module
Thin MapReduce-like layer that wraps the Python multiprocessing library.
- class mr4mp.mr4mp.pool(processes=None, stages=None, progress=None, close=False)[source]
Bases:
object
Class for a MapReduce-for-multiprocessing resource pool that can be used to run MapReduce-like workflows across multiple processes.
- Parameters
processes (
Optional
[int
]) – Number of processes to allocate and to employ in executing workflows.stages (
Optional
[int
]) – Number of stages (progress updates are provided once per stage).progress (
Optional
[Callable
[[Iterable
],Iterable
]]) – Function that wraps an iterable (can be used to also report progress).close (
Optional
[bool
]) – Flag indicating whether this instance should be closed after one workflow.
>>> from operator import inv, add >>> with pool() as pool_: ... results = pool_.mapreduce(m=inv, r=add, xs=range(3)) ... results -6
- mapreduce(m, r, xs, stages=None, progress=None, close=None)[source]
Perform the map operation
m
and the reduce operationr
over the supplied inputsxs
(optionally in stages on subsequences of the data) and then release resources if directed to do so.- Parameters
m (
Callable
[…,Any
]) – Operation to be applied to each element in the input iterable.r (
Callable
[…,Any
]) – Operation that can combine two outputs from itself, the map operation, or a mix.xs (
Iterable
) – Input to process using the map and reduce operations.stages (
Optional
[int
]) – Number of stages (progress updates are provided once per stage).progress (
Optional
[Callable
[[Iterable
],Iterable
]]) – Function that wraps an iterable (can be used to also report progress).close (
Optional
[bool
]) – Flag indicating whether this instance should be closed after one workflow.
The
stages
,progress
, andclose
parameter values each revert by default to those of thispool
instance if they are not explicitly supplied. Supplying a value for any one of these parameters when invoking this method overrides this instance’s value for that parameter only during that invocation of the method (this instance’s value does not change).>>> from operator import inv, add >>> with pool() as pool_: ... pool_.mapreduce(m=inv, r=add, xs=range(3)) -6
- mapconcat(m, xs, stages=None, progress=None, close=None)[source]
Perform the map operation
m
over the elements in the iterablexs
(optionally in stages on subsequences of the data) and then release resources if directed to do so.- Parameters
m (
Callable
[…,Sequence
]) – Operation to be applied to each element in the input iterable.xs (
Iterable
) – Input to process using the map operation.stages (
Optional
[int
]) – Number of stages (progress updates are provided once per stage).progress (
Optional
[Callable
[[Iterable
],Iterable
]]) – Function that wraps an iterable (can be used to also report progress).close (
Optional
[bool
]) – Flag indicating whether this instance should be closed after one workflow.
In contrast to the
pool.mapreduce
method, the map operationm
must return aSequence
, as the results of this operation are combined usingoperator.concat
.The
stages
,progress
, andclose
parameter values each revert by default to those of thispool
instance if they are not explicitly supplied. Supplying a value for any one of these parameters when invoking this method overrides this instance’s value for that parameter only during that invocation of the method (this instance’s value does not change).>>> with pool() as pool_: ... pool_.mapconcat(m=tuple, xs=[[1], [2], [3]]) (1, 2, 3)
- close()[source]
Prevent any additional work from being added to this instance and release resources associated with this instance.
>>> from operator import inv >>> pool_ = pool() >>> pool_.close() >>> pool_.mapconcat(m=inv, xs=range(3)) Traceback (most recent call last): ... ValueError: Pool not running
- closed()[source]
Return a boolean indicating whether this instance has been closed.
>>> pool_ = pool() >>> pool_.close() >>> pool_.closed() True
- Return type
- terminate()[source]
Terminate the underlying
multiprocessing
Pool
instance (associated resources will eventually be released, or they will be released when the instance is closed).
- mr4mp.mr4mp.mapreduce(m, r, xs, processes=None, stages=None, progress=None)[source]
One-shot function for performing a workflow (no explicit object management or resource allocation is required on the user’s part).
- Parameters
m (
Callable
[…,Any
]) – Operation to be applied to each element in the input iterable.r (
Callable
[…,Any
]) – Operation that can combine two outputs from itself, the map operation, or a mix.xs (
Iterable
) – Input to process using the map and reduce operations.processes (
Optional
[int
]) – Number of processes to allocate and to employ in executing the workflow.stages (
Optional
[int
]) – Number of stages (progress updates are provided once per stage).progress (
Optional
[Callable
[[Iterable
],Iterable
]]) – Function that wraps an iterable (can be used to also report progress).
>>> from operator import inv, add >>> mapreduce(m=inv, r=add, xs=range(3)) -6
- mr4mp.mr4mp.mapconcat(m, xs, processes=None, stages=None, progress=None)[source]
One-shot function for applying an operation across an iterable and assembling the results back into a
list
(no explicit object management or resource allocation is required on the user’s part).- Parameters
m (
Callable
[…,Sequence
]) – Operation to be applied to each element in the input iterable.xs (
Iterable
) – Input to process using the map and reduce operations.processes (
Optional
[int
]) – Number of processes to allocate and to employ in executing the workflow.stages (
Optional
[int
]) – Number of stages (progress updates are provided once per stage).progress (
Optional
[Callable
[[Iterable
],Iterable
]]) – Function that wraps an iterable (can be used to also report progress).
In contrast to the
mapreduce
function, the map operationm
must return aSequence
, as the results of this operation are combined usingoperator.concat
.>>> mapconcat(m=list, xs=[[1], [2], [3]]) [1, 2, 3]