This year, we're seeing the rise in "reasoning models", models that include an additional thinking process in order to generate their answer. Reasoning models can produce more accurate answers and can answer more complex questions. Some of those models, like o1 and o3, do the reasoning behind the scenes and only report how many tokens it took them (quite a few!).
The DeepSeek-R1 model is interesting because it reveals its reasoning process along the way. When we can see the "thoughts" of a model, we can see how we might approach the question ourself in the future, and we can also get a better idea for how to get better answers from that model. We learn both how to think with the model, and how to think without it.
So, if we want to build an app using a transparent reasoning model like DeepSeek-R1, we ideally want our app to have special handling for the thoughts, to make it clear to the user the difference between the reasoning and the answer itself. It's also very important for a user-facing app to stream the response, since otherwise a user will have to wait a very long time for both the reasoning and answer to come down the wire.
Here's an app with streamed, collapsible thoughts:

You can deploy that app yourself from github.com/Azure-Samples/deepseek-python today, or you can keep reading to see how it's built.
Deploying DeepSeek-R1 on Azure
We first deploy a DeepSeek-R1 model on Azure, using Bicep files (infrastructure-as-code) that provision a new Azure AI Services resource with the DeepSeek-R1 deployment. This deployment is what's called a "serverless model", so we only pay for what we use (as opposed to dedicated endpoints, where the pay is by hour).
var aiServicesNameAndSubdomain = '${resourceToken}-aiservices'
module aiServices 'br/public:avm/res/cognitive-services/account:0.7.2' = {
name: 'deepseek'
scope: resourceGroup
params: {
name: aiServicesNameAndSubdomain
location: aiServicesResourceLocation
tags: tags
kind: 'AIServices'
customSubDomainName: aiServicesNameAndSubdomain
sku: 'S0'
publicNetworkAccess: 'Enabled'
deployments: [
{
name: aiServicesDeploymentName
model: {
format: 'DeepSeek'
name: 'DeepSeek-R1'
version: '1'
}
sku: {
name: 'GlobalStandard'
capacity: 1
}
}
]
disableLocalAuth: disableKeyBasedAuth
roleAssignments: [
{
principalId: principalId
principalType: 'User'
roleDefinitionIdOrName: 'Cognitive Services User'
}
]
}
}
We give both our local developer account and our application backend role-based access to use the deployment, by assigning the "Cognitive Services User" role. That allows us to connect using keyless authentication, a much more secure approach than API keys.
Connecting to DeepSeek-R1 on Azure from Python
We have a few different options for making API requests to a DeepSeek-R1 serverless deployment on Azure:
- HTTP calls, using the Azure AI Model Inference REST API and a Python package like
requests
oraiohttp
- Azure AI Inference client library for Python, a package designed especially for making calls with that inference API
- OpenAI Python API library, which is focused on supporting OpenAI models but can also be used with any models that are compatible with the OpenAI HTTP API, which includes Azure AI models like DeepSeek-R1
- Any of your favorite Python LLM packages that have support for OpenAI-compatible APIs, like Langchain, Litellm, etc.
I am using the openai
package for this sample, since that's the most familiar amongst Python developers. As you'll see, it does require a bit of customization to point that package at an Azure AI inference endpoint. We need to change:
- Base URL: Instead of pointing to openai.com server, we'll point to the deployed serverless endpoint which looks like "https://<resource-name>.services.ai.azure.com/models"
- API key: Instead of providing an OpenAI key (or Azure AI services key), we're going to pass in an OAuth token, and we will set the token every time we make a request, to make sure that it has not expired.
- API version: The Azure AI Inference APIs require an API version string, which allows for versioning of API responses. You can see that API version in the API reference. In the REST API, it is passed as a query parameter, so we will need the
openai
package to send it along as a query parameter as well.
First, we need to acquire a token provider for our current credential, using the azure-identity package:
from azure.identity.aio import AzureDeveloperCliCredential, ManagedIdentityCredential, get_bearer_token_provider
if os.getenv("RUNNING_IN_PRODUCTION"):
azure_credential = ManagedIdentityCredential(client_id=os.environ["AZURE_CLIENT_ID"])
else:
azure_credential = AzureDeveloperCliCredential(tenant_id=os.environ["AZURE_TENANT_ID"])
token_provider = get_bearer_token_provider(
azure_credential, "https://cognitiveservices.azure.com/.default"
)
That code uses either ManagedIdentityCredential
when it's running in production (on Azure Container Apps, with a user-assigned identity) or AzureDeveloperCliCredential
when it's running locally. The token_provider
function returns a token string every time we call it
Now we can create a AsyncOpenAI
client by passing the correct base URL, token, and API version:
from openai import AsyncOpenAI
openai_client = AsyncOpenAI(
base_url=os.environ["AZURE_INFERENCE_ENDPOINT"],
api_key=token_provider(),
default_query={"api-version": "2024-05-01-preview"},
)
Making chat completion requests
When we receive a question from the user, we prepare to call the chat completions API with that question. Before issuing the request, we make sure we're using a non-expired token by setting the api_key
of the client to a token string from the token provider function. Behind the scenes, it will make sure that the token has not yet expired, and fetch a new one as necessary.
openai_client.api_key = await token_provider()
Now we're ready to call the chat completions API:
chat_coroutine = openai_client.chat.completions.create(
model=os.getenv("AZURE_DEEPSEEK_DEPLOYMENT"),
messages=all_messages,
stream=True)
You'll notice that instead of the typical model name that we send in when using OpenAI, we send in the deployment name. For convenience, I often name deployments the same as the model, so that they will match even if I mistakenly pass in the model name.
Streaming the response from the backend
As I've discussed previously on this blog, we should always use streaming responses when building user-facing chat applications, to reduce perceive latency and improve the user experience.
To receive a streamed response from the chat completions API, we specified stream=True
in the call above. Then, as we receive each event from the server, we check whether the content is the special "<think>" start token or "</think>" end token. When we know the model is currently in a thinking mode, we pass down the content chunks in a "reasoning_content" field. Otherwise, we pass down the content chunks in the "content" field.
We send each event to our frontend using a common approach of JSON-lines over a streaming HTTP response (which has the "Transfer-encoding: chunked" header). That means the client receives a JSON separated by a new line for each event, and can easily parse them out. The other common approaches are server-sent events or websockets, but both are unnecessarily complex for this scenario.
is_thinking = False
async for update in await chat_coroutine:
if update.choices:
content = update.choices[0].delta.content
if content == "":
is_thinking = True
update.choices[0].delta.content = None
update.choices[0].delta.reasoning_content = ""
elif content == " ":
is_thinking = False
update.choices[0].delta.content = None
update.choices[0].delta.reasoning_content = ""
elif content:
if is_thinking:
yield json.dumps(
{"delta": {"content": None, "reasoning_content": content, "role": "assistant"}},
ensure_ascii=False,
) + "\n"
else:
yield json.dumps(
{"delta": {"content": content, "reasoning_content": None, "role": "assistant"}},
ensure_ascii=False,
) + "\n"
Rendering the streamed response in the frontend
The frontend code makes a standard fetch()
request to the backend route, passing in the message history:
const response = await fetch("/chat/stream", {
method: "POST",
headers: {"Content-Type": "application/json"},
body: JSON.stringify({messages: messages})
});
r
To process the streaming JSON lines that are returned from the server, I brought in my tiny ndjson-readablestream package, which uses ReadableStream
along with JSON.parse
to make it easy to iterate over each JSON object as it comes in. When I see that the JSON is "reasoning_content", I display it in a special collapsible container.
let answer = "";
let thoughts = "";
for await (const event of readNDJSONStream(response.body)) {
if (!event.delta) {
continue;
}
if (event.delta.reasoning_content) {
thoughts += event.delta.reasoning_content;
if (thoughts.trim().length > 0) {
// Only show thoughts if they are more than just whitespace
messageDiv.querySelector(".loading-bar").style.display = "none";
messageDiv.querySelector(".thoughts").style.display = "block";
messageDiv.querySelector(".thoughts-content").innerHTML = converter.makeHtml(thoughts);
}
} else {
messageDiv.querySelector(".loading-bar").style.display = "none";
answer += event.delta.content;
messageDiv.querySelector(".answer-content").innerHTML = converter.makeHtml(answer);
}
messageDiv.scrollIntoView();
if (event.error) {
messageDiv.innerHTML = "Error: " + event.error;
}
}
All together now
The full code is available in github.com/Azure-Samples/deepseek-python. Here are the key files for the code snippeted in this blog post:
File | Purpose |
---|---|
infra/main.bicep | Bicep files for deployment |
src/quartapp/chat.py | Quart app with the client setup and streaming chat route |
src/quartapp/templates/index.html | Webpage with HTML/JS for rendering stream |