# Copyright (c) 2014-2021, Dr Alex Meakins, Raysect Project
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the Raysect Project nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from multiprocessing import get_context, cpu_count
from raysect.core.math import random
import time
[docs]class RenderEngine:
"""
Provides a common rendering workflow interface.
This is a base class, its functionality must be implemented fully by the deriving class.
This class provides a rendering workflow that abstracts away the underlying
system performing the work. It is intended that render engines may be built
that provide rendering on single cores, multi-cores (SMP) and clusters.
The basic workflow is as follows. The render task is split into small,
self-contained chunks of work - 'tasks'. These tasks are passed to the
render engine which distributes the work to the available computing
resources. These discrete computing resources are know as "workers".
Workers process one task at a time and return their result to the render
engine. When results are received the render engine assembles them into
the final result.
This workflow is implemented by supplying a set of tasks and two methods to
the render engines' run() method which processes those tasks. The functions
supplied to the run() method may be given additional args and kwargs.
A worker calls render for each task object received. render has the following signature: ::
def render(task, *render_args, **render_kwargs)
where args and kwargs are additional arguments supplied by the user.
Similarly, the worker calls update() for the results generated by a call to
render(). Update() has the following signature: ::
def update(results, *update_args, **update_kwargs)
where args and kwargs are additional arguments supplied by the user.
The render() function must return an object representing the results,
this must be a picklable python object.
The execution order of tasks is not guaranteed to be in order. If the order
is critical, an identifier should be passed as part of the task definition
and returned in the result. This will permit the order to be reconstructed.
"""
[docs] def run(self, tasks, render, update, render_args=(), render_kwargs={}, update_args=(), update_kwargs={}):
"""
Starts the render engine executing the requested tasks.
:param list tasks: List of user defined tuples that describe the task to execute.
:param object render: Callable python object that executes the tasks.
:param object update: Callable python object that is called following a render task and must be
used to update the internal state of the object requesting work.
:param tuple render_args: Additional arguments to pass to user defined render function.
:param tuple render_kwargs: Additional keyword arguments to pass to user defined render function.
:param tuple update_args: Additional arguments to pass to user defined update function.
:param tuple update_kwargs: Additional keyword arguments to pass to user defined update function.
"""
raise NotImplementedError("Virtual method must be implemented in sub-class.")
[docs] def worker_count(self):
"""
Returns the number of workers in use by this engine.
"""
raise NotImplementedError("Virtual method must be implemented in sub-class.")
[docs]class SerialEngine(RenderEngine):
"""
Render engine for running on a single CPU processor.
This engine is useful for debugging.
>>> from raysect.core import SerialEngine
>>> from raysect.optical.observer import PinholeCamera
>>>
>>> camera = PinholeCamera((512, 512))
>>> camera.render_engine = SerialEngine()
"""
def run(self, tasks, render, update, render_args=(), render_kwargs={}, update_args=(), update_kwargs={}):
for task in tasks:
result = render(task, *render_args, **render_kwargs)
update(result, *update_args, **update_kwargs)
def worker_count(self):
return 1
[docs]class MulticoreEngine(RenderEngine):
"""
A render engine for distributing work across multiple CPU cores.
The number of processes spawned by this render engine is controlled via
the processes attribute. This can also be set at object initialisation.
If the processes attribute is set to None (the default), the render engine
will automatically set the number of processes to be equal to the number
of CPU cores detected on the machine.
If a render is being performed where the time to compute an individual task
is comparable to the latency of the inter process communication (IPC), the
render may run significantly slower than expected due to waiting for the
IPC to complete. To reduce the impact of the IPC overhead, multiple tasks
are grouped together into jobs, requiring only one IPC wait for multiple
tasks.
By default the number of tasks per job is adjusted automatically. The
tasks_per_job attribute can be used to override this automatic adjustment.
To reenable the automated adjustment, set the tasks_per_job attribute to
None.
:param processes: The number of worker processes, or None to use all available cores (default).
:param tasks_per_job: The number of tasks to group into a single job, or None if this should be determined automatically (default).
:param start_method: The method used to start child processes: 'fork' (default), 'spawn' or 'forkserver'.
.. code-block:: pycon
>>> from raysect.core import MulticoreEngine
>>> from raysect.optical.observer import PinholeCamera
>>>
>>> camera = PinholeCamera((512, 512))
>>>
>>> # allowing the camera to use all available CPU cores.
>>> camera.render_engine = MulticoreEngine()
>>>
>>> # or forcing the render engine to use a specific number of CPU processes
>>> camera.render_engine = MulticoreEngine(processes=8)
"""
def __init__(self, processes=None, tasks_per_job=None, start_method='fork'):
super().__init__()
self.processes = processes
self.tasks_per_job = tasks_per_job
self._context = get_context(start_method)
@property
def processes(self):
return self._processes
@processes.setter
def processes(self, value):
if value is None:
self._processes = cpu_count()
else:
value = int(value)
if value <= 0:
raise ValueError('Number of concurrent worker processes must be greater than zero.')
self._processes = value
@property
def tasks_per_job(self):
return self._tasks_per_job
@tasks_per_job.setter
def tasks_per_job(self, value):
if value is None:
self._tasks_per_job = 1
self._auto_tasks_per_job = True
else:
if value < 1:
raise ValueError("The number of tasks per job must be greater than zero or None.")
self._tasks_per_job = value
self._auto_tasks_per_job = False
def run(self, tasks, render, update, render_args=(), render_kwargs={}, update_args=(), update_kwargs={}):
# establish ipc queues
job_queue = self._context.SimpleQueue()
result_queue = self._context.SimpleQueue()
tasks_per_job = self._context.Value('i')
# start process to generate jobs
tasks_per_job.value = self._tasks_per_job
producer = self._context.Process(target=self._producer, args=(tasks, job_queue, tasks_per_job))
producer.start()
# start worker processes
workers = []
for pid in range(self._processes):
p = self._context.Process(target=self._worker, args=(render, render_args, render_kwargs, job_queue, result_queue))
p.start()
workers.append(p)
# consume results
remaining = len(tasks)
while remaining:
results = result_queue.get()
# has a worker failed?
if isinstance(results, Exception):
# clean up
for worker in workers:
if worker.is_alive():
worker.terminate()
producer.terminate()
# wait for processes to terminate
for worker in workers:
worker.join()
producer.join()
# raise the exception to inform the user
raise results
# update state with new results
for result in results:
update(result, *update_args, **update_kwargs)
remaining -= 1
# shutdown workers
for _ in workers:
job_queue.put(None)
# store tasks per job value for next run
self._tasks_per_job = tasks_per_job.value
def worker_count(self):
return self._processes
def _producer(self, tasks, job_queue, stored_tasks_per_job):
# initialise request rate controller constants
target_rate = 50 # requests per second
min_time = 1 # seconds
min_requests = min(2 * target_rate, 5 * self._processes)
tasks_per_job = stored_tasks_per_job.value
# split tasks into jobs and dispatch to workers
requests = -self.processes # ignore the initial jobs, the requests are instantaneous
start_time = time.time()
while tasks:
# assemble job
job = []
for _ in range(tasks_per_job):
if tasks:
job.append(tasks.pop())
continue
break
# add job to queue
job_queue.put(job)
requests += 1
# if enabled, auto adjust tasks per job to keep target requests per second
if self._auto_tasks_per_job:
elapsed_time = (time.time() - start_time)
if elapsed_time > min_time and requests > min_requests:
# re-normalise the tasks per job based on previous work to propose a new value
requests_rate = requests / elapsed_time
proposed = tasks_per_job * requests_rate / target_rate
# gradually adjust tasks per job to reduce risk of oscillation
tasks_per_job = 0.1 * proposed + 0.9 * tasks_per_job
tasks_per_job = max(1, round(tasks_per_job))
# reset counters
requests = 0
start_time = time.time()
# pass back new value
stored_tasks_per_job.value = tasks_per_job
def _worker(self, render, args, kwargs, job_queue, result_queue):
# re-seed the random number generator to prevent all workers inheriting the same sequence
random.seed()
# process jobs
while True:
job = job_queue.get()
# have we been commanded to shutdown?
if job is None:
break
results = []
for task in job:
try:
results.append(render(task, *args, **kwargs))
except Exception as e:
# pass the exception back to the main process and quit
result_queue.put(e)
break
# hand back results
result_queue.put(results)
if __name__ == '__main__':
class Job:
def __init__(self, engine=None):
self.total = 0
self.engine = engine if engine else MulticoreEngine()
def run(self, v):
self.total = 0
self.engine.run(list(range(v)), self.render, self.update, render_args=(10000,))
return self.total
def render(self, task, count):
sum = 0
for i in range(count):
sum += 1 / count
return sum
def update(self, result):
self.total += result
n = 20000
t = time.time()
j = Job(SerialEngine())
print(j.run(n), time.time() - t)
t = time.time()
j = Job(MulticoreEngine())
print(j.run(n), time.time() - t)