HTTP 流式服务快速上手

本指南演示如何在 FlowLLM 中搭建一个支持流式响应的 HTTP 服务,覆盖:编写流式 Op、配置流式 Flow、启动服务与客户端流式调用。


一、编写流式 Op

流式 Op 与普通 Op 的主要区别在于使用 self.context.add_stream_chunk() 来发送流式数据块,而不是一次性返回完整结果。基类见 flowllm/core/op/base_async_op.py

from flowllm.core.context import C
from flowllm.core.op import BaseAsyncOp
from flowllm.core.enumeration import ChunkEnum, Role
from flowllm.core.schema import FlowStreamChunk, Message

@C.register_op()
class StreamChatOp(BaseAsyncOp):
    file_path: str = __file__

    async def async_execute(self):
        messages = self.context.messages
        system_prompt = self.context.system_prompt

        # 构建包含系统提示的消息列表
        messages = [Message(role=Role.SYSTEM, content=system_prompt)] + messages

        # 从 LLM 获取流式响应
        async for stream_chunk in self.llm.astream_chat(messages):
            assert isinstance(stream_chunk, FlowStreamChunk)
            # 只处理特定类型的块(答案、思考、错误、工具调用)
            if stream_chunk.chunk_type in [
                ChunkEnum.ANSWER,
                ChunkEnum.THINK,
                ChunkEnum.ERROR,
                ChunkEnum.TOOL
            ]:
                # 将流式块添加到上下文,会自动发送到客户端
                await self.context.add_stream_chunk(stream_chunk)

要点: - 类名需以 Op 结尾(框架有断言) - 用 @C.register_op() 注册后方可在 Flow 中引用 - 流式 Op 继承 BaseAsyncOp 并实现 async_execute - 使用 self.context.add_stream_chunk(stream_chunk) 发送流式数据块 - 流式块类型包括:ANSWER(答案)、THINK(思考过程)、ERROR(错误)、TOOL(工具调用) - 框架会自动处理流式传输,无需手动管理连接


二、编写 yaml config

保存如下配置为: - 覆盖默认:flowllm/flowllm/config/default.yaml - 自定义:项目根新建 my_stream_config.yaml

backend: http
thread_pool_max_workers: 128

http:
  host: "0.0.0.0"
  port: 8002

flow:
  # 流式 HTTP 流程(POST /demo_stream_http_flow)
  demo_stream_http_flow:
    flow_content: GenSystemPromptOp() >> StreamChatOp()
    stream: true  # 关键:标识这是一个流式 Flow
    description: "AI 对话助手(流式返回)"
    input_schema:
      query:
        type: string
        description: "用户问题"
        required: true

llm:
  default:
    backend: openai_compatible
    model_name: qwen3-30b-a3b-instruct-2507
    params:
      temperature: 0.6

embedding_model:
  default:
    backend: openai_compatible
    model_name: text-embedding-v4
    params:
      dimensions: 1024

vector_store:
  default:
    backend: memory
    embedding_model: default

要点: - 关键配置:在 Flow 配置中添加 stream: true 标识,框架会自动将其注册为流式端点 - Flow 的编排在 flow_content 中定义,流式 Op(如 StreamChatOp)可以与其他 Op 组合 - backend: http 指定以 HTTP 服务启动 - input_schema 在 HTTP 模式下可选,建议填写以生成更完善的 OpenAPI 与入参校验 - llm/embedding_model/vector_store 为默认能力,可按需替换 - 更多 Flow 表达式与字段说明,参考: - docs/zh/guide/flow_guide.md - docs/zh/guide/config_guide.md


三、启动服务

确保已安装 FlowLLM,并在.env中设置模型相关环境变量,可直接参考项目根的示例文件 example.env

export FLOW_LLM_API_KEY="sk-xxxx"
export FLOW_LLM_BASE_URL="https://xxxx/v1"
export FLOW_EMBEDDING_API_KEY="sk-xxxx"
export FLOW_EMBEDDING_BASE_URL="https://xxxx/v1"
  • 使用my_stream_config启动:
flowllm config=my_stream_config backend=http

成功后: - 健康检查:GET http://0.0.0.0:8002/health - OpenAPI:GET http://0.0.0.0:8002/docs - 流式接口:POST http://0.0.0.0:8002/demo_stream_http_flow

服务行为(见 flowllm/core/service/http_service.py): - 为每个标记了 stream: true 的 Flow 生成流式 POST /{flow_name} 接口 - 使用 Server-Sent Events (SSE) 格式返回流式数据 - 自动注入 CORS(默认允许所有来源) - 流式数据格式:每行以 data: 开头,包含 JSON 格式的 FlowStreamChunk,结束时发送 data:[DONE]

若使用 demo_stream_http_flowPOST http://0.0.0.0:8002/demo_stream_http_flow


四、客户端调用与测试

使用内置 HttpClient 进行流式调用:

import asyncio
import json
from flowllm.core.utils import HttpClient


async def main():
    async with HttpClient("http://0.0.0.0:8002") as client:
        # 健康检查
        health = await client.health_check()
        print("health:", json.dumps(health, indent=2, ensure_ascii=False))

        # 查看可用 endpoints
        schema = await client.list_endpoints()
        print("endpoints:", list(schema.get("paths", {}).keys()))

        # 流式调用
        print("=" * 50)
        print("流式响应:")
        async for chunk in client.execute_stream_flow(
            "demo_stream_http_flow",
            query="什么是人工智能?"
        ):
            chunk_type = chunk.get("type", "answer")
            chunk_content = chunk.get("content", "")
            if chunk_content:
                # 根据块类型进行不同处理
                if chunk_type == "answer":
                    print(chunk_content, end="", flush=True)  # 实时打印答案
                elif chunk_type == "think":
                    # 思考过程可以单独处理
                    print(f"\n[思考] {chunk_content}", end="", flush=True)
                elif chunk_type == "tool":
                    print(f"\n[工具调用] {chunk_content}")
                elif chunk_type == "error":
                    print(f"\n[错误] {chunk_content}")
        print("\n" + "=" * 50)


if __name__ == "__main__":
    asyncio.run(main())

也可直接运行内置测试脚本(以 demo_stream_http_flow 为例):

python -m flowllm.tests.http_client_test

流式响应格式说明: - 每个 chunk 是一个字典,包含: - type: 块类型(answerthinktoolerror) - content: 块内容(字符串或 JSON) - 流式响应使用 SSE 格式,客户端通过 async for 循环接收 - 当收到 [DONE] 信号时,流式传输结束

使用 curl 测试流式接口

curl -X POST http://localhost:8002/demo_stream_http_flow \
  -H "Content-Type: application/json" \
  -d '{
    "query": "what is ai"
  }'

五、流式 vs 同步接口对比

特性 同步接口 流式接口
配置 无需特殊配置 需设置 stream: true
Op 实现 使用 self.context.response.answer 使用 self.context.add_stream_chunk()
返回方式 一次性返回完整结果 实时返回数据块
适用场景 快速响应、简单查询 长文本生成、实时反馈、思考过程展示
客户端调用 execute_flow() execute_stream_flow()
响应格式 JSON SSE (Server-Sent Events)