A big part of my job in Python advocacy at Microsoft is to create and maintain code samples, like examples of how to deploy to Azure using FastAPI, Flask,
or Django. We've recently undergone an effort to standardize our best practices across samples. Most best practices are straightforward, like using ruff for linting and black for PEP8 formatting, but there's one area where the jury's still out: dependency management. Here's what we've tried and the ways in which they have failed us. I'm writing this post in hopes of getting feedback from other maintainers on the best strategy.
Unpinned package requirements files
Quite a few of our samples simply provide a requirements.txt without versions, such as:
The benefit of this approach is that a developer installing the requirements will automatically get the latest version of every package. However, that same benefit is also its curse:
What happens when the sample is no longer compatible with the latest version? The goal of our samples is usually somewhat orthogonal to the exact technologies used, like getting an app deployed on App Service, and we generally want to prioritize a working sample over a sample that is using the very latest version. We could say, well, we'll just wait for a bug report from users, and then we'll scramble to fix it. But that assumes users will make reports and that we have the resources to scramble to fix old samples at any point.
What if a developer bases their production code off the sample, and never ends up pinning versions? They may end up deploying that code to production, without tests, and be very sad when they realize their code is broken, and they don't necessarily know what version update caused the breakage.
So we have been trying to move away from the bare package listings, since neither of those situations are good.
Pinned direct dependencies
The next step is a requirements.txt file that pins known working versions of each direct dependency, such as:
With this approach, we also set up a dependabot.yaml file so that GitHub emails us every week when new versions are available, and we run tests in GitHub actions so that we can use the pass/fail state to reason about whether a new version upgrade is safe to merge.
I was pretty happy with this approach, until it all fell apart one day. The quart library brings in the werkzeug library, and a new version came out of the werkzeug library that was incompatible with the pinned version of quart (which was also latest). That meant that every developer who had our sample checked out suddenly saw a funky error upon installing requirements, caused by quart trying to use a feature no longer available in werkzeug. I immediately pinned an issue with workarounds for developers, but I still got DMs and emails from developers trying to figure out this sudden new error in previously working code.
I felt pretty bad as I'd heard developers warning about only pinning direct dependencies, but I'd never experienced an issue like this first-hand. Well, now I have, and I will never forget! I think this kind of situation is particularly painful for code samples, where we have hundreds of developers using code that they didn't originally write, so we don't want to put them in a situation where they have to fix a bug they didn't introduce and lack the context to quickly understand.
Compiled direct & indirect dependencies
I made a pull request for that repo to use pip-tools to compile pinned versions of all dependencies. Here's a snippet of the compiled file:
uvicorn[standard]==0.23.2
# via -r app/backend/requirements.in
uvloop==0.17.0
# via uvicorn
watchfiles==0.20.0
# via uvicorn
websockets==11.0.3
# via uvicorn
werkzeug==3.0.0
# via
# flask
# quart
I assumed naively that I had it all figured out: this was the approach that we should use for all repos going forward! No more randomly introduced errors!
Unfortunately, I started getting reports that Windows users were no longer able to run the local server, with an error message that "uvloop is not supported on Windows". After some digging, I realized that our requirement of uvicorn[standard] brought in certain dependencies only in certain environments, including uvloop for Linux environments. Since I ran pip-compile in a Linux environment, the resulting requirements.txt included uvloop, a package that doesn't work on Windows. Uh oh!
I realized that our app didn't actually need the additional uvloop requirement, so I changed the dependency from uvicorn[standard] to uvicorn, and that resolved that issue. But I was lucky! What if there was a situation where we did need a particular environment-specific dependency? What approach would we use then?
I imagine the answer is to use some other tool that can both pin indirect dependencies while obeying environment conditionals, and I know there are tools like poetry and hatch, but I'm not an expert in them. So, please, I request your help: what approach would avoid the issues we've run into with the three strategies described here? Thank you! 🙏🏼
Way back in January, the very popular Python ORM SQLAlchemy released version 2.0. This version makes SQLAlchemy code much more compatible with Python type checkers.
When you're using an IDE that understands type annotations (like VS Code with the Python extension), you can then get intellisense for those columns, like suggestions for functions that can be called on that data type.
You can also run a tool like mypy or pyright to find out if any of your code is using types incorrectly. For example, imagine I wrote a function to process the BlogPost model above:
def process_blog_posts(posts: list[BlogPost]):
for post in posts:
post.title = post.title.upper()
post.id = post.id.upper()
Then running mypy would let me know if my code was using the typed columns incorrectly:
$ python3 -m mypy main_sqlalchemy.py
main_sqlalchemy.py:30: error: "int" has no attribute "upper" [attr-defined]
Adding support to Flask-SQLAlchemy
I have recently begun to use type annotations more heavily in my code (especially for class and function signatures) so I was excited to try out SQLAlchemy 2.0. But then I realized that almost all of my usage of SQLAlchemy 2.0 was inside Flask apps, using the Flask-SQLAlchemy extension, and at the time, it did not support SQLAlchemy 2.0. What's a girl to do? Add support for it, of course!
I experimented with several ways to support SQLAlchemy 2.0 and eventually settled on a proposal that would be compatible with (hopefully all) the ways to customize SQLAlchemy 2.0 base classes. You can can choose for their base class to inherit from DeclarativeBase or DeclarativeBaseNoMeta, and you can add on MappedAsDataclass if they'd like to use dataclass-like data models.
A few examples:
class Base(DeclarativeBase):
pass
db = SQLAlchemy(model_class=Base)
class Todo(db.Model):
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(nullable=True)
class Base(DeclarativeBase, MappedAsDataclass):
pass
db = SQLAlchemy(model_class=Base)
class Todo(db.Model):
id: Mapped[int] = mapped_column(init=False, primary_key=True)
title: Mapped[str] = mapped_column(default=None)
The pull request was rather large, since we decided to default the documentation to 2.0 style classes, plus I parameterized every test to check all the possible base classes. Thanks to helpful reviews from the community (especially lead Flask maintainer David Lord), we were able to merge the PR and release SQLAlchemy 2.0 support on September 11th.
Porting Flask apps to SQLAlchemy 2.0
Since the release, I've been happily porting sample Flask applications over to use the new style models in SQLAlchemy 2.0, and also using the opportunity to make sure our code doesn't use the legacy way of querying data as well.
Here are a few pull requests that show the changes needed:
Of course, as those are samples, there wasn't a lot of code to change. In a complex production codebase, it will be a much bigger change to upgrade all your models. Hopefully you have tests written before making the change, so you can ensure they're made in a backwards compatible way.
Additional resources
As you're upgrading your models to new-style models, make sure you look through both the SQLAlchemy docs and the Flask-SQLAlchemy docs for examples of what you're trying to accomplish. You can even search through each GitHub repository for additional examples, as some situations that aren't in the docs are still covered in unit tests. The SQLAlchemy docs can be daunting in their scope, so I recommend bookmarking their ORM quickstart and Migration cheatsheet.
If you do run into any issues with porting your Flask app to SQLAlchemy 2.0, try to figure out first if it's a Flask-SQLAlchemy issue or a core SQLAlchemy issue. Many of the Flask-SQLAlchemy issue reports are in fact just SQLAlchemy issues. You can discuss SQLAlchemy issues in their GitHub discussions and discuss Flask-SQLAlchemy issues in our GitHub discussions or Discord.
As part of my role the Python advocacy team for Azure, I am now one of the maintainers on several ChatGPT samples, like my simple chat app and this popular chat + search app. In this series of blog posts, I'll share my learnings for writing chat-like applications. My experience is from apps with Python backends, but many of these practices apply cross-language.
Today's tip for OpenAI apps isn't really specific to OpenAI, but is a good practice for production-grade apps of any type: don't use API keys! If your app is using openai.com's OpenAI service, then you'll have to use keys, but if you're using Azure's OpenAI service, then you can authenticate with Azure Active Directory tokens instead.
The risks of keys
It's tempting to use keys, since the setup looks so straightforward - you only need your endpoint URL and key.
But using API keys in a codebase can lead to all kinds of issues. To name a few:
The key could be accidentally checked into a source control, by a developer who replaces the getenv() call with a hardcoded string, or a developer who adds a .env file to a commit.
Once checked into source control, keys are exposed internally and are also at a greater risk of external exposure by malicious actors who gain access to the codebase.
In a large company, multiple developers might unknowingly use the same key, use up each other's resources, and discover their services are failing due to quota errors.
I've seen all of these situations play out, and I don't want them to happen to other developers. A more secure approach is to use authentication tokens, and that's what I use in my samples.
Authenticating to Azure OpenAI with Active Directory
This code authenticates to Azure OpenAI with the openai Python package and Azure Python SDK:
The api_type is set to "azure_ad" so that the openai package knows to send the headers with the Bearer Token set properly.
The code authenticates to Azure using DefaultAzureCredential which will iterate through many possible credential types until it finds a valid Azure login.
The code then gets a token from that credential and sets that as the api_key.
Accessing OpenAI locally
The next step is to make sure that whoever is running the code has permission to access the OpenAI service. By default, you will not, even if you created the OpenAI service yourself. That's a security measure to make sure you don't accidentally access production resources from a local machine (particularly helpful when your code deals with write operations on databases.).
To access an OpenAI resource, you need the "Cognitive Services OpenAI User" role (role ID '5e0bd9bd-7b93-4f28-af87-19fc36ad61bd'). That can be assigned using the Azure Portal, Azure CLI, or ARM/Bicep.
Assigning roles with the Azure CLI
First, set the following environment variables:
PRINCIPAL_ID: The principal ID of your logged in account.
SUBSCRIPTION_ID: The subscription ID of your logged in account.
RESOURCE_GROUP: The resource group of the OpenAI resource.
Then you can run this command using the Azure CLI:
az role assignment create \
--role "5e0bd9bd-7b93-4f28-af87-19fc36ad61bd" \
--assignee-object-id "$PRINCIPAL_ID" \
--scope /subscriptions/"$SUBSCRIPTION_ID"/resourceGroups/"$RESOURCE_GROUP" \
--assignee-principal-type User
Assigning roles with ARM/Bicep
We use the Azure Developer CLI to deploy all of our samples, which relies on Bicep files to declare the infrastructure-as-code. That results in more repeatable deploys, so it's a great approach for deploying production applications.
This Bicep resource creates the role, assuming a principalId parameter is set:
You can also see how our sample's main.bicep uses a module to set up the role.
Assigning roles with the Azure Portal
If you are unable to use those automated approaches (preferred), it's also possible to use the Azure Portal to create the role:
Open the OpenAI resource
Select "Access Control (IAM)" from the left navigation
Select "+ Add" in the top menu
Search for "Cognitive Services User" and select it in the results
Select "Assign access to: User, group, or service principal"
Search for your email address
Select "Review and assign"
Accessing OpenAI from production hosts
The next step is to ensure your deployed application can also use a DefaultAzureCredential token to access the OpenAI resource. That requires setting up a Managed Identity and assigning that same role to the Managed identity. There are two kinds of managed identities: system-assigned and user-assigned. All Azure hosting platforms support managed identity. We'll start with App Service and system-assigned identities as an example.
Managed identity for App Service
This is how we create an App Service with a system-assigned identity in Bicep code:
The role assignment process is largely the same for the host as it was for a user, but the principal ID must be set to the managed identity's principal ID instead and the principal type is "ServicePrincipal".
For example, this Bicep assigns the role for an App Service system-assigned identity:
It's also possible to use a system-assigned identity for Azure Container Apps, using a similar approach as above. However, for our samples, we needed to use user-assigned identities so that we could give the same identity access to Azure Container Registry before the ACA app was provisioned. That's the advantage of a user-assigned identities, reuse across multiple resources.
First, we create a new identity outside of the ACA Bicep:
When using a user-assigned identity, we need to modify our call to AzureDefaultCredential to tell it which identity to use, since you could potentially have multiple user-assigned identities (not just the single system-assigned identity for the hosting environment).
The following code retrieves the identity's ID from the environment variables and specifies it as the client_id for the Managed Identity credential:
The credentials returned from Azure AD do not last forever, so for any long running script or hosted application, you will need to refresh the tokens. Typically, the Azure Python SDK takes care of that for you, but since we use the openai package for Python apps, we need to implement token refresh ourselves.
For our application that uses the Quart web framework, we define a function that runs before every request to check if the globally stored token is close to expiring. If so, we fetch a new token and store it.
For our script that ingests data from PDFs, we define a similar function that we call before every attempt to use the vector embedding function:
def refresh_openai_token():
if (
CACHE_KEY_TOKEN_TYPE in open_ai_token_cache
and open_ai_token_cache[CACHE_KEY_TOKEN_TYPE] == "azure_ad"
and open_ai_token_cache[CACHE_KEY_CREATED_TIME] + 300 < time.time()
):
token_cred = open_ai_token_cache[CACHE_KEY_TOKEN_CRED]
openai.api_key = token_cred.get_token(
"https://cognitiveservices.azure.com/.default").token
open_ai_token_cache[CACHE_KEY_CREATED_TIME] = time.time()
Accessing OpenAI in a local Docker container
At this point, you should be able to access OpenAI both for local development and in production. Unless, that is, you're developing with a local Docker container. By default, a Docker container does not have a way to access any of your local credentials, so you'll see authentication errors in the logs.
It used to be possible to use a workaround with volumes to access the credential, but after Azure started encrypting the local credential, it's now an open question as to how to easily authenticate inside a local container.
Unfortunately, in this case, my current approach is to fallback to using a key for local development in a Docker container. Another interesting approach would be to use a mock ChatGPT service in a local environment, to avoid unnecessarily using up quota.
All together now
As you can see, it's not entirely straightforward to authenticate to OpenAI without keys, depending on how you're developing locally and where you're deploying.
The following code uses a key when it's set in the environment, uses a user-assigned Managed Identity when the identity ID is set in the environment, and otherwises uses DefaultAzureCredential:
The technologies in this space are changing rapidly, so some of the more tricky aspects of keyless authentication will hopefully be easier in the future. In the meantime, try to avoid keys whenever possible.
As part of my role the Python advocacy team for Azure, I am now one of the maintainers on several ChatGPT samples, like my simple chat app and this popular chat + search app. In this series of blog posts, I'll share my learnings for writing chat-like applications. My experience is from apps with Python backends, but many of these practices apply cross-language.
Today I want to talk about the importance of streaming in the UI of a chat app, and how we can accomplish that. Streaming doesn't feel like a must-have at first, but users have gotten so accustomed to streaming in ChatGPT-using interfaces like ChatGPT, Bing Chat, and GitHub CoPilot, that they expect it in similar experiences. In addition, streaming can reduce the "time to first answer", as long as your UI is calling the streaming OpenAI API as well. Given it can take several seconds for ChatGPT to respond, we welcome any approaches to answer user's questions faster.
Streaming from the APIs
The openai package makes it easy to optionally stream responses from the API, by way of a stream argument:
When stream is true, the response type is an asynchronous generator, so we can use async for to process each of the ChatCompletion chunk objects:
async for event in await chat_coroutine:
message_chunk = event.choices[0].delta.content
Sending stream from backend to frontend
When we're making a web app, we need a way to send those objects as a stream from the backend to the browser. We can't use a standard HTTP response, since that sends everything at once and closes the connection. The most common approaches for streaming from backends are:
WebSockets: Bidirectional communication channel, client or server can push.
Readable streams: An HTTP response with a Transfer-encoding header of "chunked", signifying the browser must wait for all chunks.
All of these could potentially be used for a chat app, and I myself have experimented with both server-sent events and readable streams. Behind the scenes, the ChatGPT API actually uses server-sent events, so you'll find code in the openai package for parsing that protocol. However, I now prefer using readable streams for my frontend to backend communication. It's the simplest code setup on both the frontend and backend, and it supports the POST requests that our apps are already sending.
The key is to send the chunks from the backend using the NDJSON (jsonlines) format, and parse that format in the frontend. See my blog post on fetching JSON over streaming HTTP for Python and JavaScript example code.
Achieving a word-by-word effect
With all of that implemented, we have a frontend that reveals the answer gradually:
Here's what's interesting: despite our frontend receiving chunks of just a few tokens at a time, it appears to reveal almost entire sentences at a time. Why does the frontend UI seem to stream much larger chunks than what it receives? That's likely caused by the browser batching up repaints, deciding that it can wait to display the latest update to the innerHTML of the answer element. Normally that's a great performance enhancement on the browser's side, but it's not ideal in this case.
My colleague Steve Steiner experimented with various ways to force the browser to repaint more frequently, and settled on a technique that uses window.setTimeout() with a delay of 33 milliseconds for each chunk. That does mean that the browser takes overall more time to display a streamed response, but it doesn't end up faster than reading speed. See his PR for implementation details.
Now the frontend displays the answer at the same level of granularity that it receives from the ChatCompletions API:
Streaming more of the process
Many of our sample apps are RAG apps that "chat on your data", by chaining together calls across vector databases (like Azure Cognitive Search), embedding APIs, and the Chat Completion API. That chain of calls will take longer to process than a single ChatCompletion call, of course, so users may end up waiting longer for their answers.
One suggestion from Steve Steiner is to stream more of the process. Instead of waiting until we had the final answer, we could stream the process of finding the answer, like:
Processing your question: "Can you suggest a pizza recipe that incorporates both mushroom and pineapples?"
Found three related results from our cookbooks: 1) Mushroom calzone 2) Pineapple ham pizza 3) Mushroom loaf
Generating answer to your question...
Sure! Here's a recipe for a mushroom pineapple pizza...
We haven't integrated that idea into any of our samples yet, but it's interesting to consider for anyone building chat apps, as a way to keep the user engaged while the backend does additional work.
Making it optional
I just spent all that time talking about streaming, but I want to leave you with one final recommendation: make streaming optional, especially if you are developing a project for others to deploy. There are some web hosts that may not support streaming as readily as others, so developers appreciate the option to turn streaming off. There are also some use cases where streaming may not make sense, and it should be easy for developers (or even users) to turn it off.
As part of my role the Python advocacy team for Azure, I am now one of the maintainers on several ChatGPT samples, like my simple chat app and this popular chat + search app. In this series of blog posts, I'll share my learnings for writing chat-like applications. My experience is from apps with Python backends, but many of these practices apply cross-language.
My first tip is to use an asynchronous backend framework so that your app is capable of fulfilling concurrent requests from users.
The need for concurrency
Why? Let's imagine that we used a synchronous framework, like Flask. We deploy that to a server using gunicorn and several workers. One of those workers receives a POST request to the "/chat" endpoint. That chat endpoint in turns makes a request to the Azure ChatCompletions API. The request can take a while to complete - several seconds! During that time, the worker is tied up and cannot handle any more user requests. We could throw more CPUs and thus workers and threads at the problem, but that's a waste of server resources.
Without concurrency, requests must be handled serially:
The better approach when our app has long blocking I/O calls is to use an asynchronous framework. That way, when a request has gone out to a potentially slow-to-respond API, the Python program can pause that coroutine and handle a brand new request.
With concurrency, workers can handle new requests during I/O calls:
When we deploy those apps, we still use gunicorn, but with the uvicorn worker, which is designed for Python ASGI apps. The gunicorn.conf.py configures it like so:
To really benefit from the port to an asynchronous framework, we need to make asynchronous calls to all of the APIs, so that a worker can handle a new request whenever an API call is being awaited.
Our API calls to the openai SDK now use await with the acreate variant:
chat_coroutine = openai.ChatCompletion.acreate(
deployment_id=os.getenv("AZURE_OPENAI_CHATGPT_DEPLOYMENT", "chatgpt"),
messages=[{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": request_message}],
stream=True,
)
For the RAG sample, we also have calls to Azure services like Azure Cognitive Search. To make those asynchronous, we first import the async variant of the credential and client classes in the aio module:
from azure.identity.aio import DefaultAzureCredential
from azure.search.documents.aio import SearchClient
Then the API calls themselves require await to the same function name:
As part of my role the Python advocacy team for Azure, I am now one of the maintainers on several ChatGPT samples, like my simple chat app and the very popular chat + search app. Both of those samples use Quart, the asynchronous version of Flask, which enables them to use the asynchronous versions of the functions from the openai package.
response = openai.ChatCompletion.create(
messages=[{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": request_message}],
stream=True)
An asynchronous call to that same API looks like:
response = await openai.ChatCompletion.acreate(
messages=[{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": request_message},],
stream=True)
The difference is just the addition of await to wait for the results of the asynchronous function (and signal that the process can work on other tasks), along with the change in method name from create to acreate. That's a small difference in our app code, but it's a significant difference when it comes to mocking those calls, so it's worth pointing out.
Mocking a streaming call
In our tests of the apps, we don't want to actually make calls to the OpenAI servers, since that'd require authentication and would use up quota needlessly. Instead, we can mock the calls using the built-in pytest fixture monkeypatch with code that mimics the openai package's response.
Here's the fixture that I use to mock the asynchronous acreate call:
@pytest.fixture
def mock_openai_chatcompletion(monkeypatch):
class AsyncChatCompletionIterator:
def __init__(self, answer: str):
self.answer_index = 0
self.answer_deltas = answer.split(" ")
def __aiter__(self):
return self
async def __anext__(self):
if self.answer_index < len(self.answer_deltas):
answer_chunk = self.answer_deltas[self.answer_index]
self.answer_index += 1
return openai.util.convert_to_openai_object(
{"choices": [{"delta": {"content": answer_chunk}}]})
else:
raise StopAsyncIteration
async def mock_acreate(*args, **kwargs):
return AsyncChatCompletionIterator("The capital of France is Paris.")
monkeypatch.setattr(openai.ChatCompletion, "acreate", mock_acreate)
The final line of that fixture swaps the acreate method with my mock method that returns a class that acts like an asynchronous generator thanks to its __anext__ dunder method. That method returns a chunk of the answer each time it's called, until there are no chunks left.
Mocking non-streaming call
For the other repo, which supports both streaming and non-streaming response, the mock acreate method must account for the non-streaming case by immediately returning the full answer.
async def mock_acreate(*args, **kwargs):
messages = kwargs["messages"]
answer = "The capital of France is Paris."
if "stream" in kwargs and kwargs["stream"] is True:
return AsyncChatCompletionIterator(answer)
else:
return openai.util.convert_to_openai_object(
{"choices": [{"message": {"content": answer}}]})
Mocking multiple answers
If necessary, it's possible to make the mock respond with different answers based off the passed on the last message passed in. We need that for the chat + search app, since we also use a ChatGPT call to generate keyword searches based on the user question.
Just change the answer based off the messages keyword arg:
async def mock_acreate(*args, **kwargs):
messages = kwargs["messages"]
if messages[-1]["content"] == "Generate search query for: What is the capital of France?":
answer = "capital of France"
else:
answer = "The capital of France is Paris."
Mocking other openai calls
We also make other calls through the openai package, like to create embeddings. That's a much simpler mock, since there's no streaming involved:
Recently, as part of my work on Azure OpenAI code samples, I've been experimenting with different ways of streaming data from a server into a website. The most well known technique is web sockets, but there are also other approaches, like server-sent events and readable streams. A readable stream is the simplest of the options, and works well if your website only needs to stream a response from the server (i.e. it doesn't need bi-directional streaming).
HTTP streaming in Python
To stream an HTTP response, your backend needs to set the "Transfer Encoding" to "chunked".
Most web frameworks provide documentation about streaming responses,
such as Flask: Streaming and Quart: Streaming responses. In both Flask and Quart, the response must be a Python generator, so that the server can continually get the next data from the generator until it's exhausted.
This example from the Flask doc streams data from a CSV:
@app.route('/large.csv')
def generate_large_csv():
def generate():
for row in iter_all_rows():
yield f"{','.join(row)}\n"
return generate(), {"Content-Type": "text/csv"}
This example from the Quart docs is an infinite stream of timestamps:
The standard way to consume HTTP requests in JavaScript is the fetch() function, and fortunately, that function can also be used to consume HTTP streams. When the browser sees that the data is chunked, it sets response.body to a ReadableStream.
This example fetches a URL, treats the response body as a stream, and logs out the output until it's done streaming:
const response = await fetch(url);
const readableStream = response.body;
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
var text = new TextDecoder("utf-8").decode(value);
console.log("Received ", text);
}
Streaming JSON
You might think it'd be super straightforward to stream JSON: just generate a JSON string on the server, and then JSON.parse the received text on the client. But there's a gotcha: the client could receive multiple JSON objects in the same chunk, and then an attempt to parse as JSON will fail.
The solution: JSON objects separated by new lines, known either as NDJSON or JSONlines.
This expression converts a Python dict to NDJSON, using the std lib json module:
json.dumps(some_dict) + "\n"
Here's how I actually used that, for one of the ChatGPT samples:
@bp.post("/chat")
def chat_handler():
request_message = request.json["message"]
def response_stream():
response = openai.ChatCompletion.create(
engine=os.getenv("AZURE_OPENAI_CHATGPT_DEPLOYMENT", "chatgpt"),
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": request_message},
],
stream=True,
)
for event in response:
yield json.dumps(event) + "\n"
return Response(response_stream())
Consuming NDJSON streams in JavaScript
Once the server is outputting NDJSON, then we can write parsing code in JavaScript that splits by newlines and attempts to parse the resulting objects as JSON objects.
const response = await fetch(url);
const readableStream = response.body;
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
var text = new TextDecoder("utf-8").decode(value);
const objects = text.split("\n");
for (const obj of objects) {
try {
runningText += obj;
let result = JSON.parse(runningText);
console.log("Received", result);
runningText = "";
} catch (e) {
// Not a valid JSON object
}
}
}
Since I need to use this same processing code in multiple Azure OpenAI samples, I packaged that into a tiny npm package called ndjson-readablestream.
Here's how you can use the package from JavaScript to make NDJSON parsing easier:
import readNDJSONStream from "ndjson-readablestream";
const response = await chatApi(request);
if (!response.body) {
throw Error("No response body");
}
for await (const event of readNDJSONStream(response.body)) {
console.log("Received", event);
}