Compare commits

...

14 Commits

Author SHA1 Message Date
Novice
51d6607832 chore: delete the unused change 2025-02-24 10:08:22 +08:00
Novice
a8091ee540 fix: default value got wrong source handle 2025-02-24 10:02:41 +08:00
Novice
9bda0848a2 Merge branch 'main' into fix/condition-stream-output 2025-02-24 09:36:10 +08:00
Novice
338fe7d439 Merge branch 'chore/graph-merge-conflict' 2025-02-24 09:11:54 +08:00
Novice
96a0922e56 Merge branch 'fix/condition-stream-output-error' 2025-02-24 09:11:12 +08:00
Novice
e26381ef30 Merge branch 'fix/condition-stream-output' 2025-02-24 09:11:06 +08:00
Novice
12fb4fec8d chore: Revert the code from conflict resolution. 2025-02-21 11:13:40 +08:00
Novice
90af04428d chore: Restore the parts that were overwritten during conflict resolution. 2025-02-21 11:09:04 +08:00
Novice
f0e5b52cd2 chore: Restore the parts that were overwritten during conflict resolution. 2025-02-21 11:06:41 +08:00
Novice
ccb0b02b58 fix: unit test error 2025-02-21 11:06:12 +08:00
Novice
96691eb066 Merge branch 'main' into fix/condition-stream-output-error 2025-02-20 13:55:15 +08:00
Novice
1c332f4dc9 fix: conditional node in parallel execution cause streaming output to fail. 2025-02-14 13:46:10 +08:00
Novice Lee
302d87eded fix: solve the ci problem 2025-02-10 08:57:50 +08:00
Novice Lee
c518d1b373 fix: fail-branch stream output error 2025-02-08 16:18:52 +08:00
2 changed files with 11 additions and 7 deletions

View File

@@ -730,8 +730,10 @@ class GraphEngine:
)
should_continue_retry = False
elif run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
if node_instance.should_continue_on_error and self.graph.edge_mapping.get(
node_instance.node_id
if (
node_instance.should_continue_on_error
and self.graph.edge_mapping.get(node_instance.node_id)
and node_instance.node_data.error_strategy is ErrorStrategy.FAIL_BRANCH
):
run_result.edge_source_handle = FailBranchSourceHandle.SUCCESS
if run_result.metadata and run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS):

View File

@@ -82,7 +82,7 @@ class AnswerStreamProcessor(StreamProcessor):
:param event: node run succeeded event
:return:
"""
for answer_node_id, position in self.route_position.items():
for answer_node_id in self.route_position:
# all depends on answer node id not in rest node ids
if event.route_node_state.node_id != answer_node_id and (
answer_node_id not in self.rest_node_ids
@@ -155,11 +155,13 @@ class AnswerStreamProcessor(StreamProcessor):
for answer_node_id, route_position in self.route_position.items():
if answer_node_id not in self.rest_node_ids:
continue
# exclude current node id
answer_dependencies = self.generate_routes.answer_dependencies
if event.node_id in answer_dependencies[answer_node_id]:
answer_dependencies[answer_node_id].remove(event.node_id)
answer_dependencies_ids = answer_dependencies.get(answer_node_id, [])
# all depends on answer node id not in rest node ids
if all(
dep_id not in self.rest_node_ids for dep_id in self.generate_routes.answer_dependencies[answer_node_id]
):
if all(dep_id not in self.rest_node_ids for dep_id in answer_dependencies_ids):
if route_position >= len(self.generate_routes.answer_generate_route[answer_node_id]):
continue