Wednesday, January 3, 2024

Using FastAPI for an OpenAI chat backend

When building web APIs that make calls to OpenAI servers, we really want a backend that supports concurrency, so that it can handle a new user request while waiting for the OpenAI server response. Since my apps have Python backends, I typically use either Quart, the asynchronous version of Flask, or FastAPI, the most popular asynchronous Python web framework.

In this post, I'm going to walk through a FastAPI backend that makes chat completion calls to OpenAI. Full code is available on GitHub: github.com/pamelafox/chatgpt-backend-fastapi/


Initializing the OpenAI client

In the new (>= 1.0) version of the openai Python package, the first step is to construct a client, using either OpenAI(), AsyncOpenAI(), AsyncOpenAI, or AzureAsyncOpenAI(). Since we're using FastAPI, we should use the Async* variants, either AsyncOpenAI() for openai.com accounts or AzureAsyncOpenAI() for Azure OpenAI accounts.

But when do we actually initialize that client? We could do it in every single request, but that would be doing unnecessary work. Ideally, we would do it once, when the app started up on a particular machine, and keep the client in memory for future requests. The way to do that in FastAPI is with lifespan events.

When constructing the FastAPI object, we must point the lifespan parameter at a function.

app = fastapi.FastAPI(docs_url="/", lifespan=lifespan)

That lifespan function must be wrapped with the @contextlib.asynccontextmanager decorator. The body of the function setups the OpenAI client, stores it as a global, issues a yield to signal it's done setting up, and then closes the client as part of shutdown.

from .globals import clients


@contextlib.asynccontextmanager
async def lifespan(app: fastapi.FastAPI):
    
    if os.getenv("OPENAI_KEY"):
         # openai.com OpenAI
        clients["openai"] = openai.AsyncOpenAI(
            api_key=os.getenv("OPENAI_KEY")
        )
    else:
        # Azure OpenAI: auth is more involved, see full code.
        clients["openai"] = openai.AsyncAzureOpenAI(
            api_version="2023-07-01-preview",
            azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
            **client_args,
        )

    yield

    await clients["openai"].close()

See full __init__.py.

Unfortunately, FastAPI doesn't have a standard way of defining globals (like Flask/Quart with the g object), so I am storing the client in a dictionary from a shared module. There are some more sophisticated approaches to shared globals in this discussion.


Making chat completion API calls

Now that the client is setup, the next step is to create a route that processes a message from the user, sends it to OpenAI, and returns the OpenAI response as an HTTP response.

We start off by defining pydantic models that describe what a request looks like for our chat app. In our case, each HTTP request will contain JSON with two keys, a list of "messages" and a "stream" boolean:

class ChatRequest(pydantic.BaseModel):
    messages: list[Message]
    stream: bool = True

Each message contains a "role" and "content" key, where role defaults to "user". I chose to be consistent with the OpenAI API here, but you could of course define your own input format and do pre-processing as needed.

class Message(pydantic.BaseModel):
    content: str
    role: str = "user"

Then we can define a route that handles chat requests over POST and send backs a non-streaming response:

@router.post("/chat")
async def chat_handler(chat_request: ChatRequest):
	messages = [{"role": "system", "content": system_prompt}] + chat_request.messages
    response = await clients["openai"].chat.completions.create(
    	messages=messages,
    	stream=False,
    )
    return response.model_dump()

The auto-generated documentation shows the JSON response as expected:

Screenshot of FastAPI documentation with JSON response from OpenAI chat completion call


Sending back streamed responses

It gets more interesting when we add support for streamed responses, as we need to return a StreamingResponse object pointing at an asynchronous generator function.

We'll add this code inside the "/chat" route:

if chat_request.stream:
    async def response_stream():
        chat_coroutine = clients["openai"].chat.completions.create(
            messages=messages,
            stream=True,
        )
        async for event in await chat_coroutine:
            yield json.dumps(event.model_dump(), ensure_ascii=False) + "\n"

    return fastapi.responses.StreamingResponse(response_stream())

The response_stream() function is an asynchronous generator, since it is defined with async and has a yield inside it. It uses async for to loop through the asynchronous iterable results of the Chat Completion call. For each event it receives, it yields a JSON string with a newline after it. This sort of response is known as "json lines" or "ndjson" and is my preferred approach for streaming JSON over HTTP versus other protocols like server-sent events.

The auto-generated documentation doesn't natively understand streamed JSON lines, but it happily displays it anyways:

Screenshot of FastAPI server response with streamed JSON lines


All together now

You can see the full router code in chat.py. You may also be interested in the tests folder to see how I fully tested the app using pytest, extensive mocks of the OpenAI API, including Azure OpenAI variations, and snapshot testing.

No comments: