In same case, you could have a CPU-bound computation that could be parallelized (break into smaller unit). But due to the GIL of Python, it is not helpful to start a new thread for each unit. But also using
multiprocessing is neither a solution because it requires a new connection pool which are a limited resource.
We propose to re-use the Transactional queue and their workers.
We add to the
ir.queue a result field to store the returned value of the method.
We add a new
map_async method on the __queue__ method (
_Method) which is similar to the
Pool.map_async (without the chunksize nor callbacks). The method will be responsible to push to the queue (using a new transaction with autocommit) a task for each argument of the iterable and to return an object that can wait for the results.
The waiting of the results will be done using a LISTEN/NOTIFY channel with the task id finished as payload and the result will be read from the task id record. (The LISTEN will be done in a new connection with autocommit).
In order to not wait indefinitely for the result of a failed task, an exception Boolean field will be added to
ir.queue and filled by the worker.
We will add some syntactic sugar with:
map_asyncwith a single result)
The feature will work also on back-end without channel because at the end of the transaction used to push to the queue. The tasks will be run (like in the dispatcher or cron).
@classmethod def computation(cls, records): results = cls.__queue__.simple_computation.map_async([(r,) for r in records]) return results.get() # return a list with the result of simple_computation for each record @classmethod def simple_computation(cls, record): return record.a * record.b
The round-trip with the database will have a cost so this tool will be only useful for computation that are slower than this round-trip. Also the size of the result should be taken into consideration as it will need to be dumped and loaded. So it will sweet better computation that has a small result.
- Add timeout