Rational
In same case, you could have a CPU-bound computation that could be parallelized (break into smaller unit). But due to the GIL of Python, it is not helpful to start a new thread for each unit. But also using multiprocessing
is neither a solution because it requires a new connection pool which are a limited resource.
Proposal
We propose to re-use the Transactional queue and their workers.
We add to the ir.queue
a result field to store the returned value of the method.
We add a new map_async
method on the __queue__ method (_Method
) which is similar to the Pool.map_async
(without the chunksize nor callbacks). The method will be responsible to push to the queue (using a new transaction with autocommit) a task for each argument of the iterable and to return an object that can wait for the results.
The waiting of the results will be done using a LISTEN/NOTIFY channel with the task id finished as payload and the result will be read from the task id record. (The LISTEN will be done in a new connection with autocommit).
In order to not wait indefinitely for the result of a failed task, an exception Boolean field will be added to ir.queue
and filled by the worker.
We will add some syntactic sugar with:
-
.map
(which callmap_async
+.get
) -
.apply_async
(which callmap_async
with a single result) -
.apply
(which callapply_async
+.get
)
The feature will work also on back-end without channel because at the end of the transaction used to push to the queue. The tasks will be run (like in the dispatcher or cron).
Example
@classmethod
def computation(cls, records):
results = cls.__queue__.simple_computation.map_async([(r,) for r in records])
return results.get() # return a list with the result of simple_computation for each record
@classmethod
def simple_computation(cls, record):
return record.a * record.b
Consideration
The round-trip with the database will have a cost so this tool will be only useful for computation that are slower than this round-trip. Also the size of the result should be taken into consideration as it will need to be dumped and loaded. So it suits better computations with a small result.
Implementation
https://bugs.tryton.org/issue8318
Future
- Add timeout