Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
474 views
in Technique[技术] by (71.8m points)

asynchronous - Sharing python objects across multiple workers

We have created a service using FastAPI. When our service starts it creates a few python objects that the endpoints then use to store or retrieve data from.

FastAPI in production starts with multiple workers. Our problem is that each worker creates its own object rather than sharing a single one.

The script below shows a (simplified) example of what we are doing, though in our case the usage of Meta() is considerably more complex.

from fastapi import FastAPI, status

class Meta:
   def __init__(self):
      self.count = 0  

app = FastAPI()

meta = Meta()

# increases the count variable in the meta object by 1
@app.get("/increment")
async def increment():
    meta.count += 1
    return status.HTTP_200_OK

# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
    return {'count':meta.count}

# resets the count in the meta object to 0
@app.get("/reset")
async def reset():
    meta.count = 0
    return status.HTTP_200_OK


As mentioned above, the problem with multiple workers is that each one will have its own meta object. Please be aware that the issue is not visible when running the api with a single worker.

More explicitly, when we hit the /increment endpoint for the first time we will see only one of the two workers responding to the call (this is correct, we don't want both workers doing the same thing). However, because there are two separate meta objects, only one of the two will be incremented.
When hitting the /report endpoint, depending on which worker responds to the request, either 1 or 0 will be returned.

The question then is, how do we get the workers to share and operate on the same object?

As a side question, the problem above affects the /reset endpoint too. If this endpoint is called then only one of the workers will reset its object. Is there a way to force all workers to respond to a single call on an endpoint?

Thanks!

Edit: I forgot to mention that we have tried (with no success) to store the meta object in the app.state instead. Essentially:

app.state.meta = Meta()
...
@app.get("/report")
async def report():
    return {'count':app.state.meta.count}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

It is not possible to share a python object between different processes straightforwardly. The facilities included in the multiprocessing module (like managers or shared memory) are not suitable for sharing resources between workers, since they require a master process creating the resources and do not have the durability property.

The most preferred means for sharing resources between workers:

  • Databases - in the case of a persistent nature of resources that require reliable storage and scalability. Examples: PostgreSQL, MariaDB, MongoDB, and many others.
  • Caches (key/value) - in the case of a temporary nature of the data, faster than databases, but not having such scalability and often not ACID compliant. Examples: Redis, Memcached and etc.

Below I will present two very simple examples of how one could use both approaches to share data in FastAPI application between workers. As an example, I took the aiocache library with Redis as backend and Tortoise ORM library with PostgreSQL as backend. Since FastAPI is the asynchronous framework I chose asyncio-based libraries.

The structure of the test project is as follows:

.
├── app_cache.py
├── app_db.py
├── docker-compose.yml
├── __init__.py

Docker-compose file:

For experiments, you can use the following docker-compose file exposing 5432 (Postgres) and 6379 (Redis) ports to localhost.

version: '3'

services:
  database:
    image: postgres:12-alpine
    ports:
      - "5432:5432"
    environment:
      POSTGRES_PASSWORD: test_pass
      POSTGRES_USER: test_user
      POSTGRES_DB: test_db
  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"

Starting:

docker-compose up -d

Cache (aiocache)

Aiocache provides 3 main entities:

  • backends: Allow you specify which backend you want to use for your cache. Currently supporting: SimpleMemoryCache, RedisCache using aioredis and MemCache using aiomcache.
  • serializers: Serialize and deserialize the data between your code and the backends. This allows you to save any Python object into your cache. Currently supporting: StringSerializer, PickleSerializer, JsonSerializer, and MsgPackSerializer. But you can also build custom ones.
  • plugins: Implement a hooks system that allows to execute extra behavior before and after of each command.

Starting:

uvicorn app_cache:app --host localhost --port 8000 --workers 5
# app_cache.py
import os
from aiocache import Cache
from fastapi import FastAPI, status


app = FastAPI()
cache = Cache(Cache.REDIS, endpoint="localhost", port=6379, namespace="main")


class Meta:
    def __init__(self):
        pass

    async def get_count(self) -> int:
        return await cache.get("count", default=0)

    async def set_count(self, value: int) -> None:
        await cache.set("count", value)

    async def increment_count(self) -> None:
        await cache.increment("count", 1)


meta = Meta()


# increases the count variable in the meta object by 1
@app.post("/increment")
async def increment():
    await meta.increment_count()
    return status.HTTP_200_OK


# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
    count = await meta.get_count()
    return {'count': count, "current_process_id": os.getpid()}


# resets the count in the meta object to 0
@app.post("/reset")
async def reset():
    await meta.set_count(0)
    return status.HTTP_200_OK

Database (Tortoise ORM + PostgreSQL)

Starting: For the sake of simplicity, we first run one worker to create a schema in the database:

uvicorn app_db:app --host localhost --port 8000 --workers 1
[Ctrl-C] 
uvicorn app_db:app --host localhost --port 8000 --workers 5
# app_db.py
from fastapi import FastAPI, status
from tortoise import Model, fields
from tortoise.contrib.fastapi import register_tortoise


class MetaModel(Model):
    count = fields.IntField(default=0)


app = FastAPI()


# increases the count variable in the meta object by 1
@app.get("/increment")
async def increment():
    meta, is_created = await MetaModel.get_or_create(id=1)
    meta.count += 1  # it's better do it in transaction
    await meta.save()
    return status.HTTP_200_OK


# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
    meta, is_created = await MetaModel.get_or_create(id=1)
    return {'count': meta.count}


# resets the count in the meta object to 0
@app.get("/reset")
async def reset():
    meta, is_created = await MetaModel.get_or_create(id=1)
    meta.count = 0
    await meta.save()
    return status.HTTP_200_OK

register_tortoise(
    app,
    db_url="postgres://test_user:test_pass@localhost:5432/test_db",  # Don't expose login/pass in src, use environment variables
    modules={"models": ["app_db"]},
    generate_schemas=True,
    add_exception_handlers=True,
)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...