Saturday, September 16, 2023

Best practices for OpenAI Chat apps: Streaming UI

As part of my role the Python advocacy team for Azure, I am now one of the maintainers on several ChatGPT samples, like my simple chat app and this popular chat + search app. In this series of blog posts, I'll share my learnings for writing chat-like applications. My experience is from apps with Python backends, but many of these practices apply cross-language.

Today I want to talk about the importance of streaming in the UI of a chat app, and how we can accomplish that. Streaming doesn't feel like a must-have at first, but users have gotten so accustomed to streaming in ChatGPT-using interfaces like ChatGPT, Bing Chat, and GitHub CoPilot, that they expect it in similar experiences. In addition, streaming can reduce the "time to first answer", as long as your UI is calling the streaming OpenAI API as well. Given it can take several seconds for ChatGPT to respond, we welcome any approaches to answer user's questions faster.

Animated GIF of GitHub CoPilot answering a question about bash

Streaming from the APIs

The openai package makes it easy to optionally stream responses from the API, by way of a stream argument:

chat_coroutine = openai.ChatCompletion.acreate(
    deployment_id="chatgpt",
    model="gpt-3.5-turbo",
    messages=[
      {"role": "system", "content": "You are a helpful assistant."},
      {"role": "user", "content": request_message},
    ],
    stream=True,
)

When stream is true, the response type is an asynchronous generator, so we can use async for to process each of the ChatCompletion chunk objects:

async for event in await chat_coroutine:
    message_chunk = event.choices[0].delta.content

Sending stream from backend to frontend

When we're making a web app, we need a way to send those objects as a stream from the backend to the browser. We can't use a standard HTTP response, since that sends everything at once and closes the connection. The most common approaches for streaming from backends are:

  • WebSockets: Bidirectional communication channel, client or server can push.
  • Server-sent events: An HTTP channel for server to push to client.
  • Readable streams: An HTTP response with a Transfer-encoding header of "chunked", signifying the browser must wait for all chunks.

All of these could potentially be used for a chat app, and I myself have experimented with both server-sent events and readable streams. Behind the scenes, the ChatGPT API actually uses server-sent events, so you'll find code in the openai package for parsing that protocol. However, I now prefer using readable streams for my frontend to backend communication. It's the simplest code setup on both the frontend and backend, and it supports the POST requests that our apps are already sending.

The key is to send the chunks from the backend using the NDJSON (jsonlines) format, and parse that format in the frontend. See my blog post on fetching JSON over streaming HTTP for Python and JavaScript example code.


Achieving a word-by-word effect

With all of that implemented, we have a frontend that reveals the answer gradually:

Animated GIF of answer appearing gradually

Here's what's interesting: despite our frontend receiving chunks of just a few tokens at a time, it appears to reveal almost entire sentences at a time. Why does the frontend UI seem to stream much larger chunks than what it receives? That's likely caused by the browser batching up repaints, deciding that it can wait to display the latest update to the innerHTML of the answer element. Normally that's a great performance enhancement on the browser's side, but it's not ideal in this case.

My colleague Steve Steiner experimented with various ways to force the browser to repaint more frequently, and settled on a technique that uses window.setTimeout() with a delay of 33 milliseconds for each chunk. That does mean that the browser takes overall more time to display a streamed response, but it doesn't end up faster than reading speed. See his PR for implementation details.

Now the frontend displays the answer at the same level of granularity that it receives from the ChatCompletions API:

Animated GIF of answer appearing word by word

Streaming more of the process

Many of our sample apps are RAG apps that "chat on your data", by chaining together calls across vector databases (like Azure Cognitive Search), embedding APIs, and the Chat Completion API. That chain of calls will take longer to process than a single ChatCompletion call, of course, so users may end up waiting longer for their answers.

One suggestion from Steve Steiner is to stream more of the process. Instead of waiting until we had the final answer, we could stream the process of finding the answer, like:

  • Processing your question: "Can you suggest a pizza recipe that incorporates both mushroom and pineapples?"
  • Generated search query "pineapple mushroom pizza recipes"
  • Found three related results from our cookbooks: 1) Mushroom calzone 2) Pineapple ham pizza 3) Mushroom loaf
  • Generating answer to your question...
  • Sure! Here's a recipe for a mushroom pineapple pizza...

We haven't integrated that idea into any of our samples yet, but it's interesting to consider for anyone building chat apps, as a way to keep the user engaged while the backend does additional work.


Making it optional

I just spent all that time talking about streaming, but I want to leave you with one final recommendation: make streaming optional, especially if you are developing a project for others to deploy. There are some web hosts that may not support streaming as readily as others, so developers appreciate the option to turn streaming off. There are also some use cases where streaming may not make sense, and it should be easy for developers (or even users) to turn it off.

Wednesday, September 13, 2023

Best practices for OpenAI Chat apps: Concurrency

As part of my role the Python advocacy team for Azure, I am now one of the maintainers on several ChatGPT samples, like my simple chat app and this popular chat + search app. In this series of blog posts, I'll share my learnings for writing chat-like applications. My experience is from apps with Python backends, but many of these practices apply cross-language.

My first tip is to use an asynchronous backend framework so that your app is capable of fulfilling concurrent requests from users.


The need for concurrency

Why? Let's imagine that we used a synchronous framework, like Flask. We deploy that to a server using gunicorn and several workers. One of those workers receives a POST request to the "/chat" endpoint. That chat endpoint in turns makes a request to the Azure ChatCompletions API. The request can take a while to complete - several seconds! During that time, the worker is tied up and cannot handle any more user requests. We could throw more CPUs and thus workers and threads at the problem, but that's a waste of server resources.

Without concurrency, requests must be handled serially:

Diagram of worker handling requests one after the other

The better approach when our app has long blocking I/O calls is to use an asynchronous framework. That way, when a request has gone out to a potentially slow-to-respond API, the Python program can pause that coroutine and handle a brand new request.

With concurrency, workers can handle new requests during I/O calls:

Diagram of worker handling second request while first request waits for API response

Asynchronous Python backends

We use Quart, the asynchronous version of Flask, for the simple chat quickstart as well as the chat + search app. I've also ported the simple chat to FastAPI, the most popular asynchronous framework for Python.

Our handlers now all have async in front, signifying that they return a Python coroutine instead of a normal function:

async def chat_handler():
    request_message = (await request.get_json())["message"]

When we deploy those apps, we still use gunicorn, but with the uvicorn worker, which is designed for Python ASGI apps. The gunicorn.conf.py configures it like so:

num_cpus = multiprocessing.cpu_count()
workers = (num_cpus * 2) + 1
worker_class = "uvicorn.workers.UvicornWorker"

Asynchronous API calls

To really benefit from the port to an asynchronous framework, we need to make asynchronous calls to all of the APIs, so that a worker can handle a new request whenever an API call is being awaited.

Our API calls to the openai SDK now use await with the acreate variant:

chat_coroutine = openai.ChatCompletion.acreate(
    deployment_id=os.getenv("AZURE_OPENAI_CHATGPT_DEPLOYMENT", "chatgpt"),
    messages=[{"role": "system", "content": "You are a helpful assistant."},
              {"role": "user", "content": request_message}],
    stream=True,
)

For the RAG sample, we also have calls to Azure services like Azure Cognitive Search. To make those asynchronous, we first import the async variant of the credential and client classes in the aio module:

from azure.identity.aio import DefaultAzureCredential
from azure.search.documents.aio import SearchClient

Then the API calls themselves require await to the same function name:

r = await self.search_client.search(query_text)

Monday, September 11, 2023

Mocking async openai package calls with pytest

As part of my role the Python advocacy team for Azure, I am now one of the maintainers on several ChatGPT samples, like my simple chat app and the very popular chat + search app. Both of those samples use Quart, the asynchronous version of Flask, which enables them to use the asynchronous versions of the functions from the openai package.

Making async openai calls

A synchronous call to the streaming ChatCompletion API looks like:

response = openai.ChatCompletion.create(
  messages=[{"role": "system", "content": "You are a helpful assistant."},	
            {"role": "user", "content": request_message}],	
  stream=True)	
An asynchronous call to that same API looks like:
response = await openai.ChatCompletion.acreate(
  messages=[{"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": request_message},],
  stream=True)

The difference is just the addition of await to wait for the results of the asynchronous function (and signal that the process can work on other tasks), along with the change in method name from create to acreate. That's a small difference in our app code, but it's a significant difference when it comes to mocking those calls, so it's worth pointing out.

Mocking a streaming call

In our tests of the apps, we don't want to actually make calls to the OpenAI servers, since that'd require authentication and would use up quota needlessly. Instead, we can mock the calls using the built-in pytest fixture monkeypatch with code that mimics the openai package's response.

Here's the fixture that I use to mock the asynchronous acreate call:

@pytest.fixture
def mock_openai_chatcompletion(monkeypatch):

    class AsyncChatCompletionIterator:
        def __init__(self, answer: str):
            self.answer_index = 0
            self.answer_deltas = answer.split(" ")

        def __aiter__(self):
            return self

        async def __anext__(self):
            if self.answer_index < len(self.answer_deltas):
                answer_chunk = self.answer_deltas[self.answer_index]
                self.answer_index += 1
                return openai.util.convert_to_openai_object(
                    {"choices": [{"delta": {"content": answer_chunk}}]})
            else:
                raise StopAsyncIteration

    async def mock_acreate(*args, **kwargs):
        return AsyncChatCompletionIterator("The capital of France is Paris.")

    monkeypatch.setattr(openai.ChatCompletion, "acreate", mock_acreate)

The final line of that fixture swaps the acreate method with my mock method that returns a class that acts like an asynchronous generator thanks to its __anext__ dunder method. That method returns a chunk of the answer each time it's called, until there are no chunks left.

Mocking non-streaming call

For the other repo, which supports both streaming and non-streaming response, the mock acreate method must account for the non-streaming case by immediately returning the full answer.

    async def mock_acreate(*args, **kwargs):
        messages = kwargs["messages"]
        answer = "The capital of France is Paris."
        if "stream" in kwargs and kwargs["stream"] is True:
            return AsyncChatCompletionIterator(answer)
        else:
            return openai.util.convert_to_openai_object(
                {"choices": [{"message": {"content": answer}}]})

Mocking multiple answers

If necessary, it's possible to make the mock respond with different answers based off the passed on the last message passed in. We need that for the chat + search app, since we also use a ChatGPT call to generate keyword searches based on the user question.

Just change the answer based off the messages keyword arg:

    async def mock_acreate(*args, **kwargs):
        messages = kwargs["messages"]
        if messages[-1]["content"] == "Generate search query for: What is the capital of France?":
            answer = "capital of France"
        else:
            answer = "The capital of France is Paris."

Mocking other openai calls

We also make other calls through the openai package, like to create embeddings. That's a much simpler mock, since there's no streaming involved:

@pytest.fixture
def mock_openai_embedding(monkeypatch):
    async def mock_acreate(*args, **kwargs):
        return {"data": [{"embedding": [0.1, 0.2, 0.3]}]}

    monkeypatch.setattr(openai.Embedding, "acreate", mock_acreate)

More resources

For more context and example tests, view the full tests in the repos:

.