feat: add stream output.

This commit is contained in:
Novice
2025-12-04 17:57:19 +08:00
parent 0fd4e4a46e
commit f4592828d7
19 changed files with 1038 additions and 7 deletions

View File

@@ -369,7 +369,6 @@ class WorkflowRunNodeExecutionListApi(Resource):
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@marshal_with(workflow_run_node_execution_list_model)
def get(self, app_model: App, run_id):
"""
Get workflow run node execution list

View File

@@ -70,7 +70,7 @@ from core.workflow.runtime import GraphRuntimeState
from core.workflow.system_variable import SystemVariable
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models import Account, Conversation, EndUser, Message, MessageFile
from models import Account, Conversation, EndUser, LLMGenerationDetail, Message, MessageFile
from models.enums import CreatorUserRole
from models.workflow import Workflow
@@ -835,6 +835,75 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
]
session.add_all(message_files)
# Save merged LLM generation detail from all LLM nodes
self._save_generation_detail(session=session, message=message)
def _save_generation_detail(self, *, session: Session, message: Message) -> None:
"""
Save merged LLM generation detail for Chatflow.
Merges generation details from all LLM nodes in the workflow.
"""
if not message.workflow_run_id:
return
# Query all LLM generation details for this workflow run
generation_details = (
session.query(LLMGenerationDetail)
.filter_by(workflow_run_id=message.workflow_run_id)
.all()
)
if not generation_details:
return
# Merge all generation details
merged_reasoning: list[str] = []
merged_tool_calls: list[dict] = []
merged_sequence: list[dict] = []
content_offset = 0
for detail in generation_details:
# Add content segment for this node's contribution
# Note: In a more sophisticated implementation, we could track
# which parts of the answer came from which LLM node
reasoning_list = detail.reasoning_content_list
tool_calls_list = detail.tool_calls_list
for reasoning in reasoning_list:
merged_reasoning.append(reasoning)
merged_sequence.append({"type": "reasoning", "index": len(merged_reasoning) - 1})
for tool_call in tool_calls_list:
merged_tool_calls.append(tool_call)
merged_sequence.append({"type": "tool_call", "index": len(merged_tool_calls) - 1})
# Only save if there's meaningful data
if not merged_reasoning and not merged_tool_calls:
return
# Add content segment for the final answer
answer = message.answer or ""
if answer:
merged_sequence.insert(0, {"type": "content", "start": 0, "end": len(answer)})
# Check if generation detail already exists for this message
existing = session.query(LLMGenerationDetail).filter_by(message_id=message.id).first()
if existing:
existing.reasoning_content = json.dumps(merged_reasoning) if merged_reasoning else None
existing.tool_calls = json.dumps(merged_tool_calls) if merged_tool_calls else None
existing.sequence = json.dumps(merged_sequence) if merged_sequence else None
else:
generation_detail = LLMGenerationDetail(
tenant_id=self._application_generate_entity.app_config.tenant_id,
app_id=self._application_generate_entity.app_config.app_id,
message_id=message.id,
reasoning_content=json.dumps(merged_reasoning) if merged_reasoning else None,
tool_calls=json.dumps(merged_tool_calls) if merged_tool_calls else None,
sequence=json.dumps(merged_sequence) if merged_sequence else None,
)
session.add(generation_detail)
def _seed_graph_runtime_state_from_queue_manager(self) -> None:
"""Bootstrap the cached runtime state from the queue manager when present."""
candidate = self._base_task_pipeline.queue_manager.graph_runtime_state

View File

@@ -0,0 +1,70 @@
"""
LLM Generation Detail entities.
Defines the structure for storing and transmitting LLM generation details
including reasoning content, tool calls, and their sequence.
"""
from typing import Literal
from pydantic import BaseModel, Field
class ContentSegment(BaseModel):
"""Represents a content segment in the generation sequence."""
type: Literal["content"] = "content"
start: int = Field(..., description="Start position in the text")
end: int = Field(..., description="End position in the text")
class ReasoningSegment(BaseModel):
"""Represents a reasoning segment in the generation sequence."""
type: Literal["reasoning"] = "reasoning"
index: int = Field(..., description="Index into reasoning_content array")
class ToolCallSegment(BaseModel):
"""Represents a tool call segment in the generation sequence."""
type: Literal["tool_call"] = "tool_call"
index: int = Field(..., description="Index into tool_calls array")
SequenceSegment = ContentSegment | ReasoningSegment | ToolCallSegment
class ToolCallDetail(BaseModel):
"""Represents a tool call with its arguments and result."""
id: str = Field(default="", description="Unique identifier for the tool call")
name: str = Field(..., description="Name of the tool")
arguments: str = Field(default="", description="JSON string of tool arguments")
result: str = Field(default="", description="Result from the tool execution")
class LLMGenerationDetailData(BaseModel):
"""
Domain model for LLM generation detail.
Contains the structured data for reasoning content, tool calls,
and their display sequence.
"""
reasoning_content: list[str] = Field(default_factory=list, description="List of reasoning segments")
tool_calls: list[ToolCallDetail] = Field(default_factory=list, description="List of tool call details")
sequence: list[SequenceSegment] = Field(default_factory=list, description="Display order of segments")
def is_empty(self) -> bool:
"""Check if there's any meaningful generation detail."""
return not self.reasoning_content and not self.tool_calls
def to_response_dict(self) -> dict:
"""Convert to dictionary for API response."""
return {
"reasoning_content": self.reasoning_content,
"tool_calls": [tc.model_dump() for tc in self.tool_calls],
"sequence": [seg.model_dump() for seg in self.sequence],
}

View File

@@ -58,7 +58,7 @@ from core.prompt.utils.prompt_template_parser import PromptTemplateParser
from events.message_event import message_was_created
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.model import AppMode, Conversation, Message, MessageAgentThought
from models.model import AppMode, Conversation, LLMGenerationDetail, Message, MessageAgentThought
logger = logging.getLogger(__name__)
@@ -407,11 +407,92 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
)
)
# Save LLM generation detail if there's reasoning_content
self._save_generation_detail(session=session, message=message, llm_result=llm_result)
message_was_created.send(
message,
application_generate_entity=self._application_generate_entity,
)
def _save_generation_detail(
self, *, session: Session, message: Message, llm_result: LLMResult
) -> None:
"""
Save LLM generation detail for Completion/Chat/Agent-Chat applications.
For Agent-Chat, also merges MessageAgentThought records.
"""
import json
reasoning_list: list[str] = []
tool_calls_list: list[dict] = []
sequence: list[dict] = []
answer = message.answer or ""
# Check if this is Agent-Chat mode by looking for agent thoughts
agent_thoughts = (
session.query(MessageAgentThought)
.filter_by(message_id=message.id)
.order_by(MessageAgentThought.position.asc())
.all()
)
if agent_thoughts:
# Agent-Chat mode: merge MessageAgentThought records
content_pos = 0
for thought in agent_thoughts:
# Add thought/reasoning
if thought.thought:
reasoning_list.append(thought.thought)
sequence.append({"type": "reasoning", "index": len(reasoning_list) - 1})
# Add tool calls
if thought.tool:
tool_calls_list.append({
"name": thought.tool,
"arguments": thought.tool_input or "",
"result": thought.observation or "",
})
sequence.append({"type": "tool_call", "index": len(tool_calls_list) - 1})
# Add answer content if present
if thought.answer:
start = content_pos
end = content_pos + len(thought.answer)
sequence.append({"type": "content", "start": start, "end": end})
content_pos = end
else:
# Completion/Chat mode: use reasoning_content from llm_result
reasoning_content = llm_result.reasoning_content
if reasoning_content:
reasoning_list = [reasoning_content]
# Content comes first, then reasoning
if answer:
sequence.append({"type": "content", "start": 0, "end": len(answer)})
sequence.append({"type": "reasoning", "index": 0})
# Only save if there's meaningful generation detail
if not reasoning_list and not tool_calls_list:
return
# Check if generation detail already exists
existing = session.query(LLMGenerationDetail).filter_by(message_id=message.id).first()
if existing:
existing.reasoning_content = json.dumps(reasoning_list) if reasoning_list else None
existing.tool_calls = json.dumps(tool_calls_list) if tool_calls_list else None
existing.sequence = json.dumps(sequence) if sequence else None
else:
generation_detail = LLMGenerationDetail(
tenant_id=self._application_generate_entity.app_config.tenant_id,
app_id=self._application_generate_entity.app_config.app_id,
message_id=message.id,
reasoning_content=json.dumps(reasoning_list) if reasoning_list else None,
tool_calls=json.dumps(tool_calls_list) if tool_calls_list else None,
sequence=json.dumps(sequence) if sequence else None,
)
session.add(generation_detail)
def _handle_stop(self, event: QueueStopEvent):
"""
Handle stop.

View File

@@ -29,6 +29,7 @@ from models import (
Account,
CreatorUserRole,
EndUser,
LLMGenerationDetail,
WorkflowNodeExecutionModel,
WorkflowNodeExecutionTriggeredFrom,
)
@@ -457,6 +458,88 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository)
session.merge(db_model)
session.flush()
# Save LLMGenerationDetail for LLM nodes with successful execution
if (
domain_model.node_type == NodeType.LLM
and domain_model.status == WorkflowNodeExecutionStatus.SUCCEEDED
and domain_model.outputs is not None
):
self._save_llm_generation_detail(session, domain_model)
def _save_llm_generation_detail(self, session, execution: WorkflowNodeExecution) -> None:
"""
Save LLM generation detail for LLM nodes.
Extracts reasoning_content, tool_calls, and sequence from outputs and metadata.
"""
outputs = execution.outputs or {}
metadata = execution.metadata or {}
# Extract reasoning_content from outputs
reasoning_content = outputs.get("reasoning_content")
reasoning_list: list[str] = []
if reasoning_content:
# reasoning_content could be a string or already a list
if isinstance(reasoning_content, str):
reasoning_list = [reasoning_content] if reasoning_content else []
elif isinstance(reasoning_content, list):
reasoning_list = reasoning_content
# Extract tool_calls from metadata.agent_log
tool_calls_list: list[dict] = []
agent_log = metadata.get(WorkflowNodeExecutionMetadataKey.AGENT_LOG)
if agent_log and isinstance(agent_log, list):
for log in agent_log:
# Each log entry has label, data, status, etc.
log_data = log.data if hasattr(log, "data") else log.get("data", {})
if log_data.get("tool_name"):
tool_calls_list.append({
"id": log_data.get("tool_call_id", ""),
"name": log_data.get("tool_name", ""),
"arguments": json.dumps(log_data.get("tool_args", {})),
"result": str(log_data.get("output", "")),
})
# Build sequence based on content, reasoning, and tool_calls
sequence: list[dict] = []
text = outputs.get("text", "")
# For now, use a simple sequence: content -> reasoning -> tool_calls
# This can be enhanced later to track actual streaming order
if text:
sequence.append({"type": "content", "start": 0, "end": len(text)})
for i, _ in enumerate(reasoning_list):
sequence.append({"type": "reasoning", "index": i})
for i, _ in enumerate(tool_calls_list):
sequence.append({"type": "tool_call", "index": i})
# Only save if there's meaningful generation detail
if not reasoning_list and not tool_calls_list:
return
# Check if generation detail already exists for this node execution
existing = session.query(LLMGenerationDetail).filter_by(
workflow_run_id=execution.workflow_execution_id,
node_id=execution.node_id,
).first()
if existing:
# Update existing record
existing.reasoning_content = json.dumps(reasoning_list) if reasoning_list else None
existing.tool_calls = json.dumps(tool_calls_list) if tool_calls_list else None
existing.sequence = json.dumps(sequence) if sequence else None
else:
# Create new record
generation_detail = LLMGenerationDetail(
tenant_id=self._tenant_id,
app_id=self._app_id,
workflow_run_id=execution.workflow_execution_id,
node_id=execution.node_id,
reasoning_content=json.dumps(reasoning_list) if reasoning_list else None,
tool_calls=json.dumps(tool_calls_list) if tool_calls_list else None,
sequence=json.dumps(sequence) if sequence else None,
)
session.add(generation_detail)
def get_db_models_by_workflow_run(
self,
workflow_run_id: str,

View File

@@ -341,6 +341,11 @@ class LLMNode(Node):
"reasoning_content": reasoning_content,
"usage": jsonable_encoder(usage),
"finish_reason": finish_reason,
"generation": {
"content": clean_text,
"reasoning_content": [reasoning_content] if reasoning_content else [],
"tool_calls": [],
},
}
if structured_output:
outputs["structured_output"] = structured_output.structured_output
@@ -1561,6 +1566,18 @@ class LLMNode(Node):
is_final=True,
)
# Build generation field from agent_logs
tool_calls_for_generation = []
for log in agent_logs:
if log.label == "Tool Call":
tool_call_data = {
"id": log.data.get("tool_call_id", ""),
"name": log.data.get("tool_name", ""),
"arguments": json.dumps(log.data.get("tool_args", {})),
"result": log.data.get("output", ""),
}
tool_calls_for_generation.append(tool_call_data)
# Complete with results
yield StreamCompletedEvent(
node_run_result=NodeRunResult(
@@ -1570,6 +1587,11 @@ class LLMNode(Node):
"files": ArrayFileSegment(value=files),
"usage": jsonable_encoder(usage),
"finish_reason": finish_reason,
"generation": {
"reasoning_content": [],
"tool_calls": tool_calls_for_generation,
"sequence": [],
},
},
metadata={
WorkflowNodeExecutionMetadataKey.AGENT_LOG: agent_logs,

View File

@@ -89,6 +89,7 @@ message_detail_fields = {
"status": fields.String,
"error": fields.String,
"parent_message_id": fields.String,
"generation_detail": fields.Raw,
}
feedback_stat_fields = {"like": fields.Integer, "dislike": fields.Integer}

View File

@@ -68,6 +68,7 @@ message_fields = {
"message_files": fields.List(fields.Nested(message_file_fields)),
"status": fields.String,
"error": fields.String,
"generation_detail": fields.Raw,
}
message_infinite_scroll_pagination_fields = {

View File

@@ -129,6 +129,7 @@ workflow_run_node_execution_fields = {
"inputs_truncated": fields.Boolean,
"outputs_truncated": fields.Boolean,
"process_data_truncated": fields.Boolean,
"generation_detail": fields.Raw,
}
workflow_run_node_execution_list_fields = {

432
api/llm_generation.md Normal file
View File

@@ -0,0 +1,432 @@
# LLM Generation Detail 存储方案
## 需求背景
需要存储 LLM 生成的详细内容,包括:
- `content`:模型输出的文本
- `reasoning_content`:模型的推理过程(可能有多段)
- `tool_calls`:工具调用记录
对于 Workflow/Chatflow这会体现为 LLM 节点新增一个 `generation` 输出变量;对于 Basic App则是把这些信息关联到 Message 上。
核心挑战在于,这些内容在生成时是**交错出现**的:
```
content_1 → reasoning_1 → content_2 → tool_call_1 → content_3
```
流式输出按时间顺序展示即可,但存储后再展示需要保留这个顺序信息。
---
## 应用类型分析
Dify 有 5 种应用类型,存储方式有所不同:
| 应用类型 | 有 Message | 有 WorkflowRun | Pipeline 类 |
| ---------- | ---------- | -------------- | ------------------------------------- |
| Completion | ✅ | ❌ | `EasyUIBasedGenerateTaskPipeline` |
| Chat | ✅ | ❌ | `EasyUIBasedGenerateTaskPipeline` |
| Agent-Chat | ✅ | ❌ | `EasyUIBasedGenerateTaskPipeline` |
| Chatflow | ✅ | ✅ | `AdvancedChatAppGenerateTaskPipeline` |
| Workflow | ❌ | ✅ | `WorkflowAppGenerateTaskPipeline` |
前 4 种都有 Message只有 Workflow 没有。因此设计思路为:
- 有 Message 的应用 → 用 `message_id` 关联
- Workflow → 用 `workflow_run_id + node_id` 关联
---
## 表结构设计
```python
class LLMGenerationDetail(Base):
"""
存储 LLM 生成的详细内容,包括推理过程和工具调用。
关联方式二选一:
- 有 Message 的应用:用 message_id一对一
- Workflow用 workflow_run_id + node_id一个 run 可能有多个 LLM 节点)
"""
__tablename__ = "llm_generation_details"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="llm_generation_detail_pkey"),
sa.Index("idx_llm_generation_detail_message", "message_id"),
sa.Index("idx_llm_generation_detail_workflow", "workflow_run_id", "node_id"),
)
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuid4()))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
# 关联字段,二选一
message_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True, unique=True)
workflow_run_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
node_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
# 核心数据JSON 字符串
reasoning_content: Mapped[str | None] = mapped_column(LongText) # ["推理1", "推理2", ...]
tool_calls: Mapped[str | None] = mapped_column(LongText) # [{name, arguments, result}, ...]
sequence: Mapped[str | None] = mapped_column(LongText) # 顺序信息
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
```
### sequence 字段格式
用于记录展示顺序。content 用 start/end 表示在原文中的位置reasoning 和 tool_call 用 index 指向数组下标:
```json
[
{ "type": "content", "start": 0, "end": 100 },
{ "type": "reasoning", "index": 0 },
{ "type": "content", "start": 100, "end": 200 },
{ "type": "tool_call", "index": 0 },
{ "type": "content", "start": 200, "end": 350 }
]
```
---
## 各应用类型的存储与查询
### Completion / Chat
直接调用 LLM流程最简单。
- **写入位置**`EasyUIBasedGenerateTaskPipeline._save_message()`
- **数据来源**`llm_result` 对象
- **查询方式**`WHERE message_id = ?`
目前这两种应用可能还没有 reasoning_content 和 tool_calls取决于 LLM 是否支持),可以先预留字段。
### Agent-Chat
Agent 模式会多轮调用 LLM每轮的思考和工具调用存储在 `MessageAgentThought` 表中。需要在保存时将这些记录合并为一条 `LLMGenerationDetail`
- **写入位置**`EasyUIBasedGenerateTaskPipeline._save_message()`
- **数据来源**:合并 `MessageAgentThought` 记录
- **查询方式**`WHERE message_id = ?`
合并逻辑:
```python
def _save_agent_generation_detail(self, session: Session, message: Message):
# 获取所有轮次,按 position 排序
thoughts = (
session.query(MessageAgentThought)
.where(MessageAgentThought.message_id == message.id)
.order_by(MessageAgentThought.position.asc())
.all()
)
if not thoughts:
return
reasoning_content = []
tool_calls = []
sequence = []
for thought in thoughts:
# 思考内容
if thought.thought:
reasoning_content.append(thought.thought)
sequence.append({"type": "reasoning", "index": len(reasoning_content) - 1})
# 工具调用
if thought.tool:
tool_calls.append({
"name": thought.tool,
"arguments": thought.tool_input or "",
"result": thought.observation or ""
})
sequence.append({"type": "tool_call", "index": len(tool_calls) - 1})
# 回答内容
if thought.answer:
# 处理 content 的位置...
detail = LLMGenerationDetail(
message_id=message.id,
reasoning_content=json.dumps(reasoning_content),
tool_calls=json.dumps(tool_calls),
sequence=json.dumps(sequence),
...
)
session.add(detail)
```
### Chatflow
走 Workflow 引擎,但最终产出 Message。Answer 来自 `_task_state.answer`,如果 Answer 节点引用了多个 LLM 节点的 generation需要合并。
- **写入位置**`AdvancedChatAppGenerateTaskPipeline._save_message()`
- **数据来源**`_task_state` 中的合并结果
- **查询方式**`WHERE message_id = ?`
### Workflow
没有 Message每个 LLM 节点执行完都存一条 Detail。
- **写入位置**`LLMNode._run()` 节点执行完成时
- **数据来源**:单个 LLM 节点的输出
- **查询方式**
- 查某个节点:`WHERE workflow_run_id = ? AND node_id = ?`
- 查所有 LLM 节点:`WHERE workflow_run_id = ?`
```python
detail = LLMGenerationDetail(
message_id=None,
workflow_run_id=self._workflow_run_id,
node_id=self._node_id,
reasoning_content=json.dumps(reasoning_contents),
tool_calls=json.dumps(tool_calls),
sequence=json.dumps(sequence),
...
)
```
---
## Workflow/Chatflow 示例
### Chatflow存储时合并读取时直接用
Chatflow 有 Messagegeneration_detail 在存储时就已经按 answer 的引用顺序合并好了。
```
[Start] → [LLM_1] → [LLM_2] → [Answer]
answer = {{#llm_1.generation#}} + {{#llm_2.generation#}}
```
**写入**:在 `_save_message()` 时,按 answer 的引用顺序合并所有 LLM 的 generation
```python
# 合并后存储
LLMGenerationDetail(
message_id=message.id,
workflow_run_id=None,
node_id=None,
reasoning_content='["llm_1的推理", "llm_2的推理"]', # 已按顺序合并
tool_calls='[{...}, {...}]', # 已按顺序合并
sequence='[...]', # 已按顺序合并
)
```
**读取**:直接在 message 接口返回,不需要额外处理
```python
# 现有 message 接口扩展返回 generation_detail
{
"id": "msg_123",
"answer": "最终回答...",
"generation_detail": {
"reasoning_content": ["llm_1的推理", "llm_2的推理"],
"tool_calls": [...],
"sequence": [...]
}
}
```
前端拿到就直接按 sequence 顺序展示,不需要关心数据来自哪个节点。
### Workflow按节点分别存储在节点详情 API 中自动附加
Workflow 没有 Message每个 LLM 节点各存一条 Detail。
```
[Start] → [LLM_1] → [LLM_2] → [End]
```
**写入**:每个 LLM 节点执行完成时存一条
```python
# LLM_1 执行完成
LLMGenerationDetail(
workflow_run_id="run_123",
node_id="llm_1",
...
)
# LLM_2 执行完成
LLMGenerationDetail(
workflow_run_id="run_123",
node_id="llm_2",
...
)
```
**读取**:在节点执行详情 API 中,后端自动查询并附加 `generation_detail` 字段
```python
# 节点详情接口内部逻辑
def get_node_execution(node_execution_id):
node_execution = ...
# 如果是 LLM 节点,自动附加 generation_detail
if node_execution.node_type == NodeType.LLM:
generation_detail = (
db.session.query(LLMGenerationDetail)
.filter_by(
workflow_run_id=node_execution.workflow_run_id,
node_id=node_execution.node_id
)
.first()
)
if generation_detail:
node_execution.generation_detail = generation_detail.to_dict()
return node_execution
```
---
## API 设计
### 有 Message 的应用Completion / Chat / Agent-Chat / Chatflow
不新增接口,直接在现有 message 接口中返回 `generation_detail` 字段:
```json
{
"id": "msg_123",
"answer": "最终回答...",
"generation_detail": {
"reasoning_content": ["推理内容1", "推理内容2"],
"tool_calls": [{ "name": "search", "arguments": "{}", "result": "..." }],
"sequence": [
{ "type": "content", "start": 0, "end": 100 },
{ "type": "reasoning", "index": 0 },
{ "type": "tool_call", "index": 0 }
]
}
}
```
### Workflow
Workflow 没有 Message不新增接口。在现有的**节点执行详情接口**中,后端自动附加 `generation_detail` 字段:
```
GET /apps/{app_id}/workflow-runs/{run_id}/node-executions/{node_execution_id}
```
响应:
```json
{
"id": "node_exec_123",
"node_id": "llm_1",
"node_type": "llm",
"inputs": {...},
"outputs": {
"text": "最终回答...",
"reasoning_content": "推理内容..."
},
"execution_metadata": {...},
"generation_detail": {
"reasoning_content": ["推理内容1", "推理内容2"],
"tool_calls": [{ "name": "search", "arguments": "{}", "result": "..." }],
"sequence": [
{ "type": "content", "start": 0, "end": 100 },
{ "type": "reasoning", "index": 0 },
{ "type": "tool_call", "index": 0 }
]
}
}
```
`generation_detail` 字段由后端从 `LLMGenerationDetail` 表查询后附加,只有 LLM 节点才会有此字段。
### 前端判断逻辑
前端无需判断 `node_type`,只需检查响应中是否有 `generation_detail` 字段:
- **有** `generation_detail` → 渲染 generation 详情面板
- **没有** `generation_detail` → 不展示
这样设计的好处:
1. **安全可信**`generation_detail` 来自后端查表,不是从 `outputs` 解析,无法被用户伪造
2. **逻辑简单**:前端不需要额外验证,有就展示,没有就不展示
3. **减少请求**:不需要单独调用接口获取 generation 信息
---
## 总结
### Completion / Chat
- **关联字段**`message_id`
- **写入位置**`EasyUIBasedGenerateTaskPipeline._save_message()`
- **查询方式**Message 接口自动附加 `generation_detail`
### Agent-Chat
- **关联字段**`message_id`
- **写入位置**`EasyUIBasedGenerateTaskPipeline._save_message()`(合并 AgentThought
- **查询方式**Message 接口自动附加 `generation_detail`
### Chatflow
- **关联字段**`message_id`
- **写入位置**`AdvancedChatAppGenerateTaskPipeline._save_message()`
- **查询方式**Message 接口自动附加 `generation_detail`
### Workflow
- **关联字段**`workflow_run_id + node_id`
- **写入位置**`LLMNode._run()` 节点执行完成时
- **查询方式**:节点执行详情接口自动附加 `generation_detail`
---
## 数据流
### 有 Message 的应用
```
请求 → Pipeline → LLM 调用 → 保存 Message
└──→ 保存 LLMGenerationDetail (message_id)
API 响应Message 接口自动附加 generation_detail 字段
```
### Workflow
```
请求 → WorkflowRun → LLM_1 节点 → 保存 NodeExecution
│ │
│ └──→ 保存 LLMGenerationDetail (workflow_run_id, node_id="llm_1")
└──→ LLM_2 节点 → 保存 NodeExecution
└──→ 保存 LLMGenerationDetail (workflow_run_id, node_id="llm_2")
API 响应:节点执行详情接口自动附加 generation_detail 字段
```
---
## 设计要点
### 统一存储
所有应用类型都使用 `LLMGenerationDetail` 表存储,保持一致性。
### 自动附加
后端在返回数据时自动查询并附加 `generation_detail` 字段,前端无需额外请求。
### 安全可信
`generation_detail` 来自后端查表,不是从 `outputs` 解析,无法被用户通过 Code 节点等方式伪造。
### 前端判断简单
前端只需检查响应中是否有 `generation_detail` 字段,有就展示,没有就不展示。不需要判断 `node_type` 或验证数据结构。

View File

@@ -0,0 +1,47 @@
"""add llm generation detail table.
Revision ID: 0ecb3c8c6062
Revises: 09cfdda155d1
Create Date: 2025-12-04 10:25:46.883416
"""
from alembic import op
import models as models
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = '0ecb3c8c6062'
down_revision = '09cfdda155d1'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('llm_generation_details',
sa.Column('id', models.types.StringUUID(), nullable=False),
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
sa.Column('app_id', models.types.StringUUID(), nullable=False),
sa.Column('message_id', models.types.StringUUID(), nullable=True),
sa.Column('workflow_run_id', models.types.StringUUID(), nullable=True),
sa.Column('node_id', sa.String(length=255), nullable=True),
sa.Column('reasoning_content', models.types.LongText(), nullable=True),
sa.Column('tool_calls', models.types.LongText(), nullable=True),
sa.Column('sequence', models.types.LongText(), nullable=True),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.PrimaryKeyConstraint('id', name='llm_generation_detail_pkey'),
sa.UniqueConstraint('message_id', name=op.f('llm_generation_details_message_id_key'))
)
with op.batch_alter_table('llm_generation_details', schema=None) as batch_op:
batch_op.create_index('idx_llm_generation_detail_message', ['message_id'], unique=False)
batch_op.create_index('idx_llm_generation_detail_workflow', ['workflow_run_id', 'node_id'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('llm_generation_details')
# ### end Alembic commands ###

View File

@@ -49,6 +49,7 @@ from .model import (
EndUser,
IconType,
InstalledApp,
LLMGenerationDetail,
Message,
MessageAgentThought,
MessageAnnotation,
@@ -154,6 +155,7 @@ __all__ = [
"IconType",
"InstalledApp",
"InvitationCode",
"LLMGenerationDetail",
"LoadBalancingModelConfig",
"Message",
"MessageAgentThought",

View File

@@ -31,6 +31,8 @@ from .provider_ids import GenericProviderID
from .types import LongText, StringUUID
if TYPE_CHECKING:
from core.app.entities.llm_generation_entities import LLMGenerationDetailData
from .workflow import Workflow
@@ -1165,6 +1167,17 @@ class Message(Base):
.all()
)
@property
def generation_detail(self) -> dict[str, Any] | None:
"""
Get LLM generation detail for this message.
Returns the detail as a dictionary or None if not found.
"""
detail = db.session.query(LLMGenerationDetail).filter_by(message_id=self.id).first()
if detail:
return detail.to_dict()
return None
@property
def retriever_resources(self) -> Any:
return self.message_metadata_dict.get("retriever_resources") if self.message_metadata else []
@@ -2007,3 +2020,73 @@ class TraceAppConfig(TypeBase):
"created_at": str(self.created_at) if self.created_at else None,
"updated_at": str(self.updated_at) if self.updated_at else None,
}
class LLMGenerationDetail(Base):
"""
Store LLM generation details including reasoning process and tool calls.
Association (choose one):
- For apps with Message: use message_id (one-to-one)
- For Workflow: use workflow_run_id + node_id (one run may have multiple LLM nodes)
"""
__tablename__ = "llm_generation_details"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="llm_generation_detail_pkey"),
sa.Index("idx_llm_generation_detail_message", "message_id"),
sa.Index("idx_llm_generation_detail_workflow", "workflow_run_id", "node_id"),
)
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuid4()))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
# Association fields (choose one)
message_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True, unique=True)
workflow_run_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
node_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
# Core data as JSON strings
reasoning_content: Mapped[str | None] = mapped_column(LongText)
tool_calls: Mapped[str | None] = mapped_column(LongText)
sequence: Mapped[str | None] = mapped_column(LongText)
created_at: Mapped[datetime] = mapped_column(sa.DateTime, nullable=False, server_default=func.current_timestamp())
def to_domain_model(self) -> "LLMGenerationDetailData":
"""Convert to Pydantic domain model with proper validation."""
from core.app.entities.llm_generation_entities import LLMGenerationDetailData
return LLMGenerationDetailData(
reasoning_content=json.loads(self.reasoning_content) if self.reasoning_content else [],
tool_calls=json.loads(self.tool_calls) if self.tool_calls else [],
sequence=json.loads(self.sequence) if self.sequence else [],
)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for API response."""
return self.to_domain_model().to_response_dict()
@classmethod
def from_domain_model(
cls,
data: "LLMGenerationDetailData",
*,
tenant_id: str,
app_id: str,
message_id: str | None = None,
workflow_run_id: str | None = None,
node_id: str | None = None,
) -> "LLMGenerationDetail":
"""Create from Pydantic domain model."""
return cls(
tenant_id=tenant_id,
app_id=app_id,
message_id=message_id,
workflow_run_id=workflow_run_id,
node_id=node_id,
reasoning_content=json.dumps(data.reasoning_content) if data.reasoning_content else None,
tool_calls=json.dumps([tc.model_dump() for tc in data.tool_calls]) if data.tool_calls else None,
sequence=json.dumps([seg.model_dump() for seg in data.sequence]) if data.sequence else None,
)

View File

@@ -0,0 +1,119 @@
"""
LLM Generation Detail Service.
Provides methods to query and attach generation details to workflow node executions
and messages, avoiding N+1 query problems.
"""
from collections.abc import Sequence
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.app.entities.llm_generation_entities import LLMGenerationDetailData
from models import LLMGenerationDetail, WorkflowNodeExecutionModel
class LLMGenerationService:
"""Service for handling LLM generation details."""
def __init__(self, session: Session):
self._session = session
def get_generation_details_for_workflow_run(
self,
workflow_run_id: str,
) -> dict[str, LLMGenerationDetailData]:
"""
Batch query generation details for all LLM nodes in a workflow run.
Returns dict mapping node_id to LLMGenerationDetailData.
"""
stmt = select(LLMGenerationDetail).where(LLMGenerationDetail.workflow_run_id == workflow_run_id)
details = self._session.scalars(stmt).all()
return {detail.node_id: detail.to_domain_model() for detail in details if detail.node_id}
def get_generation_detail_for_message(self, message_id: str) -> LLMGenerationDetailData | None:
"""Query generation detail for a specific message."""
stmt = select(LLMGenerationDetail).where(LLMGenerationDetail.message_id == message_id)
detail = self._session.scalars(stmt).first()
return detail.to_domain_model() if detail else None
def get_generation_details_for_messages(
self,
message_ids: list[str],
) -> dict[str, LLMGenerationDetailData]:
"""Batch query generation details for multiple messages."""
if not message_ids:
return {}
stmt = select(LLMGenerationDetail).where(LLMGenerationDetail.message_id.in_(message_ids))
details = self._session.scalars(stmt).all()
return {detail.message_id: detail.to_domain_model() for detail in details if detail.message_id}
def attach_generation_details_to_node_executions(
self,
node_executions: Sequence[WorkflowNodeExecutionModel],
workflow_run_id: str,
) -> list[dict]:
"""
Attach generation details to node executions and return as dicts.
Queries generation details in batch and attaches them to the corresponding
node executions, avoiding N+1 queries.
"""
generation_details = self.get_generation_details_for_workflow_run(workflow_run_id)
return [
{
"id": node.id,
"index": node.index,
"predecessor_node_id": node.predecessor_node_id,
"node_id": node.node_id,
"node_type": node.node_type,
"title": node.title,
"inputs": node.inputs_dict,
"process_data": node.process_data_dict,
"outputs": node.outputs_dict,
"status": node.status,
"error": node.error,
"elapsed_time": node.elapsed_time,
"execution_metadata": node.execution_metadata_dict,
"extras": node.extras,
"created_at": int(node.created_at.timestamp()) if node.created_at else None,
"created_by_role": node.created_by_role,
"created_by_account": _serialize_account(node.created_by_account),
"created_by_end_user": _serialize_end_user(node.created_by_end_user),
"finished_at": int(node.finished_at.timestamp()) if node.finished_at else None,
"inputs_truncated": node.inputs_truncated,
"outputs_truncated": node.outputs_truncated,
"process_data_truncated": node.process_data_truncated,
"generation_detail": generation_details[node.node_id].to_response_dict()
if node.node_id in generation_details
else None,
}
for node in node_executions
]
def _serialize_account(account) -> dict | None:
"""Serialize Account to dict for API response."""
if not account:
return None
return {
"id": account.id,
"name": account.name,
"email": account.email,
}
def _serialize_end_user(end_user) -> dict | None:
"""Serialize EndUser to dict for API response."""
if not end_user:
return None
return {
"id": end_user.id,
"type": end_user.type,
"is_anonymous": end_user.is_anonymous,
"session_id": end_user.session_id,
}

View File

@@ -1,8 +1,9 @@
import threading
from collections.abc import Sequence
from typing import Any
from sqlalchemy import Engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import Session, sessionmaker
import contexts
from extensions.ext_database import db
@@ -17,6 +18,7 @@ from models import (
)
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
from repositories.factory import DifyAPIRepositoryFactory
from services.llm_generation_service import LLMGenerationService
class WorkflowRunService:
@@ -137,9 +139,9 @@ class WorkflowRunService:
app_model: App,
run_id: str,
user: Account | EndUser,
) -> Sequence[WorkflowNodeExecutionModel]:
) -> list[dict[str, Any]]:
"""
Get workflow run node execution list
Get workflow run node execution list with generation details attached.
"""
workflow_run = self.get_workflow_run(app_model, run_id)
@@ -154,8 +156,16 @@ class WorkflowRunService:
if tenant_id is None:
raise ValueError("User tenant_id cannot be None")
return self._node_execution_service_repo.get_executions_by_workflow_run(
node_executions = self._node_execution_service_repo.get_executions_by_workflow_run(
tenant_id=tenant_id,
app_id=app_model.id,
workflow_run_id=run_id,
)
# Attach generation details using batch query
with Session(db.engine) as session:
generation_service = LLMGenerationService(session)
return generation_service.attach_generation_details_to_node_executions(
node_executions=node_executions,
workflow_run_id=run_id,
)

View File

@@ -131,6 +131,10 @@ export const LLM_OUTPUT_STRUCT: Var[] = [
variable: 'usage',
type: VarType.object,
},
{
variable: 'generation',
type: VarType.object,
},
]
export const KNOWLEDGE_RETRIEVAL_OUTPUT_STRUCT: Var[] = [

View File

@@ -324,6 +324,11 @@ const Panel: FC<NodePanelProps<LLMNodeType>> = ({
type='object'
description={t(`${i18nPrefix}.outputVars.usage`)}
/>
<VarItem
name='generation'
type='object'
description={t(`${i18nPrefix}.outputVars.generation`)}
/>
{inputs.structured_output_enabled && (
<>
<Split className='mt-3' />

View File

@@ -522,6 +522,7 @@ const translation = {
output: 'Generate content',
reasoning_content: 'Reasoning Content',
usage: 'Model Usage Information',
generation: 'Generation details including reasoning, tool calls and their sequence',
},
singleRun: {
variable: 'Variable',

View File

@@ -522,6 +522,7 @@ const translation = {
output: '生成内容',
reasoning_content: '推理内容',
usage: '模型用量信息',
generation: '生成详情,包含推理内容、工具调用及其顺序',
},
singleRun: {
variable: '变量',