Parallelism computation

(Cédric Krier) #1


In same case, you could have a CPU-bound computation that could be parallelized (break into smaller unit). But due to the GIL of Python, it is not helpful to start a new thread for each unit. But also using multiprocessing is neither a solution because it requires a new connection pool which are a limited resource.


We propose to re-use the Transactional queue and their workers.

We add to the ir.queue a result field to store the returned value of the method.
We add a new map_async method on the __queue__ method (_Method) which is similar to the Pool.map_async (without the chunksize nor callbacks). The method will be responsible to push to the queue (using a new transaction with autocommit) a task for each argument of the iterable and to return an object that can wait for the results.
The waiting of the results will be done using a LISTEN/NOTIFY channel with the task id finished as payload and the result will be read from the task id record. (The LISTEN will be done in a new connection with autocommit).
In order to not wait indefinitely for the result of a failed task, an exception Boolean field will be added to ir.queue and filled by the worker.

We will add some syntactic sugar with:

  • .map (which call map_async + .get)
  • .apply_async (which call map_async with a single result)
  • .apply (which call apply_async + .get)

The feature will work also on back-end without channel because at the end of the transaction used to push to the queue. The tasks will be run (like in the dispatcher or cron).


def computation(cls, records):
    results = cls.__queue__.simple_computation.map_async([(r,) for r in records])
    return results.get()  #  return a list with the result of simple_computation for each record

 def simple_computation(cls, record):
     return record.a * record.b


The round-trip with the database will have a cost so this tool will be only useful for computation that are slower than this round-trip. Also the size of the result should be taken into consideration as it will need to be dumped and loaded. So it will sweet better computation that has a small result.



  • Add timeout

(Nicolas Évrard) #2

I don’t understand this.
If the connection pool is a limited resource why would using the transactional queue not exhaust this resource?

(Cédric Krier) #3

Because the number of workers is fixed by the configuration.

(Josías) #4

I’m not sure if these could be related, but when a query is executed only use one cpu in database server, even when you can have 4 o more cores, and sometimes it delay a long time. Postgres support parallel queries to improve the speed on quering, so it could be configured from tryton too.

(Cédric Krier) #5

It is not related. This topic is about CPU-bound python code which is the most common case.

From my experience, it is usually because the query is not well written or some indexes are missing.

There is nothing to configure in Tryton to have PostgreSQL run parallel queries. PostgreSQL just needs to be configured:

(albert) #6

I like the idea of a fixed set of workers helping in parallelization.

However, I see a problem with database transaction management with this proposal. For me, keeping the main transaction open and waiting for tasks in the queue (that run in a separate transaction) to finish is not a good design.

For me there are two different solutions that could be better. Both solutions are not incompatible but on the contrary, they fulfill two different use cases.

Same Transaction

The first use case would be the one I think this proposal tries to solve.

In this case I think methods should not be added to the queue but would look something like this:

def computation(cls, records):
    for record in records:
        record.a += 1
    results = cls.__parallel__.simple_computation.map_async([(r,) for r in records])
    return results.get()  #  return a list with the result of simple_computation for each record

def simple_computation(cls, record):
    return record.a * record.b

What __parallel__ would do is check how many workers are available right now and use as many as there are available for the execution. Those parallel workers would run in the same transaction. See and
. The process in which “computation()” is executed, is also used as a worker, so if there are no parallel workers available the computation is done anyway (this resembles how postgresql parallel workers work).

This ensures the main transaction is not held for too much time and ensures that work done by __parallel__ sees the records as seen by the main worker. Note, that if it is not done using snapshot synchronization the records as seen by simple_computation() would be different from the ones seen by computation().

I think the parallel workers should probably not be the same as the queue workers because that allows for finer grained tunning of the system.

Or if the same set of workers are used it could have some tunning parameters. For example we could have three parameters:

  • Total number of workers
  • Minimum number of workers reserved for parallel execution
  • Minimum number of workers reserved for queue execution

Different Transactions

The other use case, would use the existing queue system but would help “joining” once a set of jobs have been executed. Could look like this:

def computation(cls, records):
    results = cls.__queue__.simple_computation.map_async([(r,) for r in records], 'continue_computation')
    return results.get()  #  return a list with the result of simple_computation for each record

def simple_computation(cls, record):
    return record.a * record.b

def continue_computation(cls, result):
    # Do something with result

In this case, once all simple_computation() have been executed, the queue system enqueues “continue_computation()” (and runs it when workers are available). In this case, each “simple_computation()” is run in a new transaction. The same happens with “continue_computation()”.

(Cédric Krier) #7

It is not a problem if the main transaction does not keep any lock.
So this mechanism should be used carefully on specific tasks.

I do not see how. The main transaction still need to wait the result of every workers.

This could be an addition for the queue but it should be optional as it is PostgreSQL specific.
Also this will not solve the problem to see the uncommitted data of the main transaction because as the documentation explains on REPEATABLE READ the snapshot is always the same.
So the only way to share modified data from the main transaction is to commit it (and so it should be avoided).

Well there are already multiple queues.

I do not see what this tries to solve. The “continue_compuation” can be performed in “computation” after the call to results.get.

(albert) #8

Because making the main transaction do some work we ensure there’s always somebody working on those tasks. By queueing them we don’t know if there’re some large tasks (or a long queue) blocking the execution.

You’re right. Didn’t realize that. At least we’d prevent other transactions to affect the workers, but not as ideal as I envisioned.

It tries to prevent to keep the current transaction open. It simply helps the developer to enqueue a task when a set of tasks have finished.

(Cédric Krier) #9

For me, it is more complex to make the main transaction pulling only its task out of the queue than making it wait all of them.

That’s a configuration issue. If all the workers are busy, it means that all your system is fully busy so there is no available resource neither for the main transaction.
But also it is possible to use different named queue and dedicate the right amount of workers to this queue.

Also if the setup has many high computational tasks, the setup will probably dedicate some hosts only for workers while keeping the front-end available for simple requests. If the front-end starts to work on expensive tasks, it may slowdown the answers to simple requests.

But the main transaction must receive the result to put in the request answer.

(Cédric Krier) #10

Here is the implementation: Issue 8318: Parallelism tasks - Tryton issue tracker