人们说MCP的下一步时AG-UI,那么AG-UI到底是什么呢?
其实看他的定义就知道,代理+前端。我最近开发时应用时发现Cursor对前端的支持很糟糕,我需要新的工具和方法!!

以下是一些AG-UI的尝试,希望能帮助我解决这一难题。

什么是AG-UI

AG-UI(Agent-User Interaction Protocol)的开源协议由CopilotKit提供,你可以把它想象成一套“通用语言”,专门用来解决AI代理和前端应用之间的沟通难题。

AG-UI

简单来说,AG-UI就像是在AI代理和你的App界面之间搭了一座“鹊桥”!我喜欢这个想法,不管它好用与否,他就是我需要的。
https://docs.agentwire.io/introduction
https://github.com/ag-ui-protocol/ag-ui
这是它的官方信息

AG-UI的优点

1、实时代币流
2、工具使用进度
3、状态差异和补丁
4、错误和生命周期事件
5、多代理切换

它的架构很重要,也就是说:前端既是代理又是UI。让它可以自动交互。我开发一个产品如果调用一个MCP还需要多种方法验证是否真实的调用和返回数据,如果它能解决我的问题,那我就可以节约很多时间。
AG-UI

目前尽管多步骤代理工作流(multi-step agentic workflows)很强大,把代理放到app时还是会遇到一些问题,人们常提到的场景有:
1、当想逐个令牌地传输 LLM 响应,而无需构建自定义 WebSocket 服务器
2、实时显示工具执行进度,并在不阻断或丢失上下文的情况下暂停以等待人工反馈
3、同步大型、不断变化的对象(例如代码或表格),而无需将所有内容重新发送到 UI
4、允许用户在代理运行过程中中断、取消或回复,而不会丢失上下文

而且每个代理后端有自己的工具调用机制、ReAct规划、状态差异(state diff)、输出格式等等。比如使用LangGraph时,前端会实现自定义的WebSocket逻辑、复杂的JSON以及LangGraph专用的UI适配——如果要迁移到CrewAI,这一切必须要因地制宜对应地调整。可扩展性的缺乏可见一斑。

**随着我对CURSOR的使用,我实在太爱这个产品了,我认为它代表着AI Agent的产品的未来的方向。**想做出这样的AI应用产品,工具的进化是必不可少的。

截至目前它能支持的Framework有:
AG-UI

如何使用AG-UI

我这边先放一个官网的例子,我也准备用它开发一些好玩的玩意,之后单开一个板块吧。

直接在代理框架中实施 AG-UI 事件
在本教程中,您将学习如何构建与 AG-UI 协议兼容的 HTTP 端点。
Prerequisites 先决条件

确保已安装 Python 和 Poetry。Poetry 是一款用于 Python 依赖性管理和打包的工具

https://python-poetry.org/docs/#installing-with-pipx

1
pipx install poetry

或者pipx 可以安装不同版本的 Poetry,语法与 pip 相同

1
2
pipx install --suffix=@1.8.4 poetry==1.8.4
poetry@1.8.4 --version

用Poetry创建新项目
首先,让我们创建一个新项目,并为依赖关系管理设置 Poetry:

1
poetry new my-endpoint && cd my-endpoint

Install Dependencies 安装依赖项

1
poetry add ag-ui openai fastapi uvicorn

使用 FastAPI 创建基本端点

创建一个名为 my_endpoint/main.py 的新文件,其中包含以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
from fastapi import FastAPI, Request  # 导入FastAPI和Request模块,用于创建Web API服务和处理HTTP请求
import json # 导入json模块,用于处理JSON格式数据
from ag_ui.core.types import RunAgentInput # 导入AG-UI核心类型中的RunAgentInput,用于处理代理输入数据

app = FastAPI(title="AG-UI Endpoint") # 创建FastAPI应用实例,设置应用标题为"AG-UI Endpoint"

@app.post("/awp") # 定义一个POST请求路由,路径为"/awp"
async def my_endpoint(): # 定义一个异步处理函数作为端点
return { "message": "Hello World" } # 返回一个包含问候消息的JSON响应

if __name__ == "__main__": # 如果直接运行此脚本(而非作为模块导入)
import uvicorn # 导入uvicorn服务器,用于运行FastAPI应用
uvicorn.run(app, host="0.0.0.0", port=8000) # 启动uvicorn服务器,监听所有网络接口的8000端口

运行并测试终端
用以下命令启动服务器

1
poetry run uvicorn my_endpoint.main:app --reload

在另一个终端中,使用 curl 测试终端是否正在运行:

1
curl -X POST http://localhost:8000/awp

您应该会看到如下回复:

1
{ "message": "Hello World" }

解析 AG-UI 输入

接下来,让我们更新端点,使用 RunAgentInput Pydantic 模型正确解析传入的 AG-UI 请求:

1
2
3
4
5
6
7
8
9
10
from fastapi import FastAPI, Request, HTTPException
from ag_ui.core import RunAgentInput, Message

app = FastAPI(title="AG-UI Endpoint")

@app.post("/awp")
async def my_endpoint(input_data: RunAgentInput):
thread_id = input_data.thread_id

return { "message": "Hello World from " + thread_id }

FastAPI 会根据 RunAgentInput 模式自动验证传入的请求。如果请求与预期格式不符,它将返回 422 验证错误,并详细说明出错的原因。

添加事件流 Add Event Streaming

AG-UI 支持使用服务器发送事件(SSE)进行事件流处理,跟MCP是一样的。让我们修改 /awp 端点,将事件流传回客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from fastapi import FastAPI, Request, HTTPException  # 导入FastAPI相关模块,用于创建API服务
from fastapi.responses import StreamingResponse # 导入StreamingResponse用于实现流式响应
from ag_ui.core import RunAgentInput, Message, EventType, RunStartedEvent, RunFinishedEvent # 导入AG-UI核心组件,包括输入模型和事件类型
from ag_ui.encoder import EventEncoder # 导入事件编码器,用于格式化SSE事件

app = FastAPI(title="AG-UI Endpoint") # 创建FastAPI应用实例,设置标题为"AG-UI Endpoint"

@app.post("/awp") # 定义POST路由"/awp",用于接收AG-UI的请求
async def my_endpoint(input_data: RunAgentInput): # 定义异步端点函数,接收RunAgentInput类型的输入数据
async def event_generator(): # 定义异步生成器函数,用于生成事件流
# Create an event encoder to properly format SSE events
encoder = EventEncoder() # 创建事件编码器实例,用于正确格式化SSE事件

# Send run started event
yield encoder.encode( # 生成并编码运行开始事件
RunStartedEvent( # 创建运行开始事件对象
type=EventType.RUN_STARTED, # 设置事件类型为运行开始
thread_id=input_data.thread_id, # 使用输入数据中的线程ID
run_id=input_data.run_id # 使用输入数据中的运行ID
)
)

# Send run finished event
yield encoder.encode( # 生成并编码运行结束事件
RunFinishedEvent( # 创建运行结束事件对象
type=EventType.RUN_FINISHED, # 设置事件类型为运行结束
thread_id=input_data.thread_id, # 使用输入数据中的线程ID
run_id=input_data.run_id # 使用输入数据中的运行ID
)
)

return StreamingResponse( # 返回流式响应
event_generator(), # 使用事件生成器作为响应内容
media_type="text/event-stream" # 设置媒体类型为服务器发送事件(SSE)
)

if __name__ == "__main__": # 如果直接运行此脚本
import uvicorn # 导入uvicorn服务器
uvicorn.run(app, host="0.0.0.0", port=8000) # 启动uvicorn服务器,监听所有网络接口的8000端口

太棒了!我们已经在发送 RunStartedEvent 和 RunFinishedEvent 事件,这为我们提供了一个基本的 AG-UI 兼容端点。现在,让我们让它做些有用的事情。

Implementing Basic Chat 实现基本聊天

让我们增强我们的端点,以调用 OpenAI 的 API 并将响应作为 AG-UI 事件流传回去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from fastapi import FastAPI, Request  # 导入FastAPI和Request,用于创建Web API服务和处理HTTP请求
from fastapi.responses import StreamingResponse # 导入StreamingResponse,用于实现流式响应功能
from ag_ui.core import ( # 从ag_ui.core导入所需的核心组件
RunAgentInput, # 运行代理输入模型,定义了接收请求的数据结构
Message, # 消息模型,用于表示聊天消息
EventType, # 事件类型枚举,定义了不同类型的事件
RunStartedEvent, # 运行开始事件,表示代理开始运行
RunFinishedEvent, # 运行结束事件,表示代理完成运行
TextMessageStartEvent, # 文本消息开始事件,表示开始发送文本消息
TextMessageContentEvent, # 文本消息内容事件,表示发送文本消息的内容部分
TextMessageEndEvent # 文本消息结束事件,表示文本消息发送完成
)
from ag_ui.encoder import EventEncoder # 导入事件编码器,用于将事件对象编码为SSE格式
import uuid # 导入uuid模块,用于生成唯一标识符
from openai import OpenAI # 导入OpenAI客户端,用于调用OpenAI API

app = FastAPI(title="AG-UI Endpoint") # 创建FastAPI应用实例,设置标题为"AG-UI Endpoint"

@app.post("/awp") # 定义POST路由"/awp",用于接收AG-UI的请求
async def my_endpoint(input_data: RunAgentInput): # 定义异步端点函数,接收RunAgentInput类型的输入数据
async def event_generator(): # 定义异步生成器函数,用于生成事件流
# Create an event encoder to properly format SSE events
encoder = EventEncoder() # 创建事件编码器实例,用于正确格式化SSE事件

# Send run started event
yield encoder.encode( # 生成并编码运行开始事件
RunStartedEvent( # 创建运行开始事件对象
type=EventType.RUN_STARTED, # 设置事件类型为运行开始
thread_id=input_data.thread_id, # 使用输入数据中的线程ID
run_id=input_data.run_id # 使用输入数据中的运行ID
)
)

# Initialize OpenAI client
client = OpenAI() # 初始化OpenAI客户端,用于调用OpenAI的API服务

# Generate a message ID for the assistant's response
message_id = uuid.uuid4() # 生成一个UUID作为消息ID,确保每条消息都有唯一标识

# Send text message start event
yield encoder.encode( # 生成并编码文本消息开始事件
TextMessageStartEvent( # 创建文本消息开始事件对象
type=EventType.TEXT_MESSAGE_START, # 设置事件类型为文本消息开始
message_id=message_id, # 设置消息ID
role="assistant" # 设置角色为助手,表示这是AI助手的回复
)
)

# Create a streaming completion request
stream = client.chat.completions.create( # 创建流式聊天完成请求
model="gpt-3.5-turbo", # 使用gpt-3.5-turbo模型
messages=openai_messages, # 设置消息历史,这个变量需要在实际使用时定义
stream=True # 启用流式响应,逐步返回生成的内容
)

# Process the streaming response and send content events
for chunk in stream: # 遍历流式响应中的每个数据块
if hasattr(chunk.choices[0].delta, "content") and chunk.choices[0].delta.content: # 检查数据块是否包含内容
content = chunk.choices[0].delta.content # 获取数据块中的内容
yield encoder.encode( # 生成并编码文本消息内容事件
TextMessageContentEvent( # 创建文本消息内容事件对象
type=EventType.TEXT_MESSAGE_CONTENT, # 设置事件类型为文本消息内容
message_id=message_id, # 设置消息ID,与开始事件相同
delta=content # 设置增量内容,即当前数据块的文本
)
)

# Send text message end event
yield encoder.encode( # 生成并编码文本消息结束事件
TextMessageEndEvent( # 创建文本消息结束事件对象
type=EventType.TEXT_MESSAGE_END, # 设置事件类型为文本消息结束
message_id=message_id # 设置消息ID,与开始事件相同
)
)

# Send run finished event
yield encoder.encode( # 生成并编码运行结束事件
RunFinishedEvent( # 创建运行结束事件对象
type=EventType.RUN_FINISHED, # 设置事件类型为运行结束
thread_id=input_data.thread_id, # 使用输入数据中的线程ID
run_id=input_data.run_id # 使用输入数据中的运行ID
)
)

return StreamingResponse( # 返回流式响应
event_generator(), # 使用事件生成器作为响应内容
media_type="text/event-stream" # 设置媒体类型为服务器发送事件(SSE)
)

if __name__ == "__main__": # 如果直接运行此脚本
import uvicorn # 导入uvicorn服务器
uvicorn.run(app, host="0.0.0.0", port=8000) # 启动uvicorn服务器,监听所有网络接口的8000端口
)

return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

您需要将 OpenAI API 密钥设置为环境变量,然后重启服务器:

1
2
export OPENAI_API_KEY=your-api-key
poetry run uvicorn my_endpoint.main:app --reload

该实施方案创建了一个功能齐全的 AG-UI 端点,可实时处理信息并回传响应流。

动手试试吧