Async support for Postgres operations with trytond

I am trying to rebuild my API that works with Tryton DB directly with a “tuned” connection module.
The framework I am using is FastAPI. I am rebuilding the tryton_flask module to work with FastAPI.
The BI-I-I-IG difference is sync vs async…

The problem:
When FastAPI calls this decorator with a new request while the previous one has not been completed I have a weird error — the Transaction() class property user is not None. It’s like a Transaction() class is a singleton.
Why can’t I have many transactions simultaneously?? Is that because psycopg2 is not really asynchronous?

      def decorator(func):
           @retry_transaction(self.database_retry)
           @wraps(func)
           async def wrapper(request, *args, **kwargs):
               start_time = time.time()
               tryton = options.extensions['Tryton']
               tryton.request_method = request.method 
               database = options.config['TRYTON_DATABASE']
               if (5, 1) > trytond_version:
                   with Transaction().start(database, 0):
                       Cache.clean(database)
               if user is None:
                   transaction_user = get_value(
                       int(options.config['TRYTON_USER']))
               else:
                   transaction_user = get_value(user)

               transaction_context = {}
               with Transaction().start(database, transaction_user, readonly=True):   # !! HERE THE EXCEPTION HAPPENS
                       if tryton.context_callback:
                               transaction_context = tryton.context_callback()
                       else:
                               transaction_context.update(get_value(context) or {})
               transaction_context.setdefault('_request', {}).update({
                       'remote_addr':  request.headers.get("X-Real-IP ", ""),
                       'http_host': request.client.host,
                       'scheme': dict(request.items())['scheme'],
                       'method': request.method,  # GET, PUT or something else})

               if readonly is None:
                   is_readonly = get_value(tryton._readonly)
               else:
                   is_readonly = get_value(readonly)

               with Transaction().start(database, transaction_user, readonly=is_readonly,
                                        context=transaction_context) as transaction:
                       result = await func(request, *map(instanciate, args),
                                           **dict((n, instanciate(v)) for n, v in kwargs.items()))  # Calls FastAPI endpoint handler
                       if (hasattr(transaction, 'cursor')
                               and not is_readonly):
                           await transaction.cursor.commit()
                       print(f"🔥🔥 finished {time.time() - start_time:0.4f}s")

What I should to change to make it work? Should I update Transaction class or modify database.py module? Should I switch database.py to aiopg or psycopg3?

Indeed Transaction is somehow a singleton (take a look at its __new__ method).

Your code is mixing async code with sync code, I would isolate the whole sync code in a separate blocking method and it will work (but it will kill your performances).

The other solution would be to rewrite Tryton to be asynchronous but it’s a big task.

So I am afraid that the best solution (in term of performance and development time) is not to use an async based framework.

To be precise it is a thread-base singleton. So there is only one instance per thread.

Also ORM design does not suite well with the async syntax of Python because the main goal of ORM is to hide the database communication so when accessing an attribute we do not know if it will make or not a database request.

1 Like

You make me cry! :slight_smile: Joking. Yes, I checked the __new__ method and now it is clear.
Thank you for the clarification!

As per your suggestion, I am considering two ways:

  1. Communicate with DB directly with SQLAlchemy, since I am well-aware of database structure as I am a developer of the corresponding Tryton module. The caveat: that makes any major module updates painful.
  2. Isolate Tryton ORM operations in sync mode (build a transaction queue)… The performance will be good enough, but it requires extra work and it looks like a really tough task for a couple of weeks.

I could not continue to use a slow 100% sync framework — when I have more than 5 connections (I developed a mobile app that uses Tryton ERP as a backend) — it is a devastating experience for customers. Not sure what to choose…

Any comments? Do you have any experience with a similar task?

Use a good WSGI server with a good configuration like proper worker pool etc.

I tried workers (threading). Still have errors (I have the same problem with uwsgi + flask):

ERROR: could not serialize access due to concurrent update

Postgres doesn’t ‘like’ to write concurrent access to the same column. It looks like I need to update this part:

class LoggingCursor(cursor):
    def execute(self, sql, args=None):
        if logger.isEnabledFor(logging.DEBUG):
            logger.debug(self.mogrify(sql, args))
        cursor.execute(self, sql, args)  # <—— HERE 

What would you recommend?

This is perfectly normal with concurrent transaction, you will have sometimes conflict. The application must retry on such case.

For those who are interested and will find this thread with a similar question.
My API has a complex request handler structure, interacting with various models and doing a slow job of converting incoming images into a proper format.
Since that, I just disband away all Tryton interactions to separate async functions and decorated them with tryton transaction, — not the initial request handler.

from fastapi import Request
from fastapi import UploadFile
from typing import Optional
from fastapi import File
from fastapi.responses import JSONResponse
import my_config as config

# # Check token from request 
@tryton.transaction(readonly=False)  # I initiated Tryton transaction here
async def check_token(request, tokens):
    request_token = request.headers.get('token', None)
    login = Login()  # Login — that is my custom tryton model class 
    ...  # login to tryton , use token, etc...
    return result

@app.post(f"/{config.API_VER}/")  
async def my_post_handler(request: Request, file: Optional[UploadFile] = File(None)):
        # note: tryton transaction is not started yet! and free to use with other requests
        tokens = await read_all_tokens(...)  # read tokens from somewhere (non-blocking call)
        result = await  check_token(request, tokens)  # making a tryton transaction
        if not result:
              token_result_json = {"Result": "wrong token"}
              return JSONResponse(status_code=200, content=token_result_json)
        ...  # tryton transaction ends, do other things to handle request

Disclaimer: I am using my own async(!) fastapi_tryton driver that will be soon available on PyPi: fastapi-tryton-async module