mirror of
https://gitee.com/dify_ai/dify.git
synced 2025-12-06 19:42:42 +08:00
173
api/tests/test_containers_integration_tests/trigger/conftest.py
Normal file
173
api/tests/test_containers_integration_tests/trigger/conftest.py
Normal file
@@ -0,0 +1,173 @@
|
||||
"""
|
||||
Fixtures for trigger integration tests.
|
||||
|
||||
This module provides fixtures for creating test data (tenant, account, app)
|
||||
and mock objects used across trigger-related tests.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole
|
||||
from models.model import App
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tenant_and_account(db_session_with_containers: Session) -> Generator[tuple[Tenant, Account], None, None]:
|
||||
"""
|
||||
Create a tenant and account for testing.
|
||||
|
||||
This fixture creates a tenant, account, and their association,
|
||||
then cleans up after the test completes.
|
||||
|
||||
Yields:
|
||||
tuple[Tenant, Account]: The created tenant and account
|
||||
"""
|
||||
tenant = Tenant(name="trigger-e2e")
|
||||
account = Account(name="tester", email="tester@example.com", interface_language="en-US")
|
||||
db_session_with_containers.add_all([tenant, account])
|
||||
db_session_with_containers.commit()
|
||||
|
||||
join = TenantAccountJoin(tenant_id=tenant.id, account_id=account.id, role=TenantAccountRole.OWNER.value)
|
||||
db_session_with_containers.add(join)
|
||||
db_session_with_containers.commit()
|
||||
|
||||
yield tenant, account
|
||||
|
||||
# Cleanup
|
||||
db_session_with_containers.query(TenantAccountJoin).filter_by(tenant_id=tenant.id).delete()
|
||||
db_session_with_containers.query(Account).filter_by(id=account.id).delete()
|
||||
db_session_with_containers.query(Tenant).filter_by(id=tenant.id).delete()
|
||||
db_session_with_containers.commit()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app_model(
|
||||
db_session_with_containers: Session, tenant_and_account: tuple[Tenant, Account]
|
||||
) -> Generator[App, None, None]:
|
||||
"""
|
||||
Create an app for testing.
|
||||
|
||||
This fixture creates a workflow app associated with the tenant and account,
|
||||
then cleans up after the test completes.
|
||||
|
||||
Yields:
|
||||
App: The created app
|
||||
"""
|
||||
tenant, account = tenant_and_account
|
||||
app = App(
|
||||
tenant_id=tenant.id,
|
||||
name="trigger-app",
|
||||
description="trigger e2e",
|
||||
mode="workflow",
|
||||
icon_type="emoji",
|
||||
icon="robot",
|
||||
icon_background="#FFEAD5",
|
||||
enable_site=True,
|
||||
enable_api=True,
|
||||
api_rpm=100,
|
||||
api_rph=1000,
|
||||
is_demo=False,
|
||||
is_public=False,
|
||||
is_universal=False,
|
||||
created_by=account.id,
|
||||
)
|
||||
db_session_with_containers.add(app)
|
||||
db_session_with_containers.commit()
|
||||
|
||||
yield app
|
||||
|
||||
# Cleanup - delete related records first
|
||||
from models.trigger import AppTrigger, WorkflowSchedulePlan, WorkflowTriggerLog, WorkflowWebhookTrigger
|
||||
from models.workflow import Workflow
|
||||
|
||||
db_session_with_containers.query(WorkflowTriggerLog).filter_by(app_id=app.id).delete()
|
||||
db_session_with_containers.query(WorkflowSchedulePlan).filter_by(app_id=app.id).delete()
|
||||
db_session_with_containers.query(WorkflowWebhookTrigger).filter_by(app_id=app.id).delete()
|
||||
db_session_with_containers.query(AppTrigger).filter_by(app_id=app.id).delete()
|
||||
db_session_with_containers.query(Workflow).filter_by(app_id=app.id).delete()
|
||||
db_session_with_containers.query(App).filter_by(id=app.id).delete()
|
||||
db_session_with_containers.commit()
|
||||
|
||||
|
||||
class MockCeleryGroup:
|
||||
"""Mock for celery group() function that collects dispatched tasks."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.collected: list[dict[str, Any]] = []
|
||||
self._applied = False
|
||||
|
||||
def __call__(self, items: Any) -> MockCeleryGroup:
|
||||
self.collected = list(items)
|
||||
return self
|
||||
|
||||
def apply_async(self) -> None:
|
||||
self._applied = True
|
||||
|
||||
@property
|
||||
def applied(self) -> bool:
|
||||
return self._applied
|
||||
|
||||
|
||||
class MockCelerySignature:
|
||||
"""Mock for celery task signature that returns task info dict."""
|
||||
|
||||
def s(self, schedule_id: str) -> dict[str, str]:
|
||||
return {"schedule_id": schedule_id}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_celery_group() -> MockCeleryGroup:
|
||||
"""
|
||||
Provide a mock celery group for testing task dispatch.
|
||||
|
||||
Returns:
|
||||
MockCeleryGroup: Mock group that collects dispatched tasks
|
||||
"""
|
||||
return MockCeleryGroup()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_celery_signature() -> MockCelerySignature:
|
||||
"""
|
||||
Provide a mock celery signature for testing task dispatch.
|
||||
|
||||
Returns:
|
||||
MockCelerySignature: Mock signature generator
|
||||
"""
|
||||
return MockCelerySignature()
|
||||
|
||||
|
||||
class MockPluginSubscription:
|
||||
"""Mock plugin subscription for testing plugin triggers."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
subscription_id: str = "sub-1",
|
||||
tenant_id: str = "tenant-1",
|
||||
provider_id: str = "provider-1",
|
||||
) -> None:
|
||||
self.id = subscription_id
|
||||
self.tenant_id = tenant_id
|
||||
self.provider_id = provider_id
|
||||
self.credentials: dict[str, str] = {"token": "secret"}
|
||||
self.credential_type = "api-key"
|
||||
|
||||
def to_entity(self) -> MockPluginSubscription:
|
||||
return self
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_plugin_subscription() -> MockPluginSubscription:
|
||||
"""
|
||||
Provide a mock plugin subscription for testing.
|
||||
|
||||
Returns:
|
||||
MockPluginSubscription: Mock subscription instance
|
||||
"""
|
||||
return MockPluginSubscription()
|
||||
@@ -5,9 +5,11 @@ import json
|
||||
import time
|
||||
from datetime import timedelta
|
||||
from types import SimpleNamespace
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from flask import Response
|
||||
from flask import Flask, Response
|
||||
from flask.testing import FlaskClient
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from configs import dify_config
|
||||
@@ -17,54 +19,29 @@ from core.trigger.debug.event_selectors import PluginTriggerDebugEventPoller, We
|
||||
from core.trigger.debug.events import PluginTriggerDebugEvent
|
||||
from core.workflow.enums import NodeType
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole
|
||||
from models.account import Account, Tenant
|
||||
from models.enums import AppTriggerStatus, AppTriggerType, CreatorUserRole, WorkflowTriggerStatus
|
||||
from models.model import App
|
||||
from models.trigger import AppTrigger, WorkflowSchedulePlan, WorkflowTriggerLog, WorkflowWebhookTrigger
|
||||
from models.workflow import Workflow
|
||||
from schedule import workflow_schedule_task
|
||||
from schedule.workflow_schedule_task import poll_workflow_schedules
|
||||
from services import feature_service as feature_service_module
|
||||
from services.trigger import webhook_service
|
||||
from services.workflow_service import WorkflowService
|
||||
from tasks import trigger_processing_tasks
|
||||
|
||||
from .conftest import MockCeleryGroup, MockCelerySignature, MockPluginSubscription
|
||||
|
||||
def _create_tenant_and_account(session: Session) -> tuple[Tenant, Account]:
|
||||
tenant = Tenant(name="trigger-e2e")
|
||||
account = Account(name="tester", email="tester@example.com", interface_language="en-US")
|
||||
session.add_all([tenant, account])
|
||||
session.commit()
|
||||
|
||||
join = TenantAccountJoin(tenant_id=tenant.id, account_id=account.id, role=TenantAccountRole.OWNER.value)
|
||||
session.add(join)
|
||||
session.commit()
|
||||
return tenant, account
|
||||
|
||||
|
||||
def _create_app(session: Session, tenant: Tenant, account: Account) -> App:
|
||||
app = App(
|
||||
tenant_id=tenant.id,
|
||||
name="trigger-app",
|
||||
description="trigger e2e",
|
||||
mode="workflow",
|
||||
icon_type="emoji",
|
||||
icon="🤖",
|
||||
icon_background="#FFEAD5",
|
||||
enable_site=True,
|
||||
enable_api=True,
|
||||
api_rpm=100,
|
||||
api_rph=1000,
|
||||
is_demo=False,
|
||||
is_public=False,
|
||||
is_universal=False,
|
||||
created_by=account.id,
|
||||
)
|
||||
session.add(app)
|
||||
session.commit()
|
||||
return app
|
||||
# Test constants
|
||||
WEBHOOK_ID_PRODUCTION = "wh1234567890123456789012"
|
||||
WEBHOOK_ID_DEBUG = "whdebug1234567890123456"
|
||||
TEST_TRIGGER_URL = "https://trigger.example.com/base"
|
||||
|
||||
|
||||
def _build_workflow_graph(root_node_id: str, trigger_type: NodeType) -> str:
|
||||
node_data: dict[str, object] = {"type": trigger_type.value, "title": "trigger"}
|
||||
"""Build a minimal workflow graph JSON for testing."""
|
||||
node_data: dict[str, Any] = {"type": trigger_type.value, "title": "trigger"}
|
||||
if trigger_type == NodeType.TRIGGER_WEBHOOK:
|
||||
node_data.update(
|
||||
{
|
||||
@@ -87,11 +64,12 @@ def _build_workflow_graph(root_node_id: str, trigger_type: NodeType) -> str:
|
||||
|
||||
def test_publish_blocks_start_and_trigger_coexistence(
|
||||
db_session_with_containers: Session,
|
||||
monkeypatch: pytest.MonkeyPatch
|
||||
):
|
||||
"""发布时包含 start 与 trigger 节点应直接报错。"""
|
||||
tenant, account = _create_tenant_and_account(db_session_with_containers)
|
||||
app = _create_app(db_session_with_containers, tenant, account)
|
||||
tenant_and_account: tuple[Tenant, Account],
|
||||
app_model: App,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""Publishing should fail when both start and trigger nodes coexist."""
|
||||
tenant, account = tenant_and_account
|
||||
|
||||
graph = {
|
||||
"nodes": [
|
||||
@@ -102,7 +80,7 @@ def test_publish_blocks_start_and_trigger_coexistence(
|
||||
}
|
||||
draft_workflow = Workflow.new(
|
||||
tenant_id=tenant.id,
|
||||
app_id=app.id,
|
||||
app_id=app_model.id,
|
||||
type="workflow",
|
||||
version=Workflow.VERSION_DRAFT,
|
||||
graph=json.dumps(graph),
|
||||
@@ -116,7 +94,6 @@ def test_publish_blocks_start_and_trigger_coexistence(
|
||||
db_session_with_containers.commit()
|
||||
|
||||
workflow_service = WorkflowService()
|
||||
from services import feature_service as feature_service_module
|
||||
|
||||
monkeypatch.setattr(
|
||||
feature_service_module.FeatureService,
|
||||
@@ -126,41 +103,50 @@ def test_publish_blocks_start_and_trigger_coexistence(
|
||||
monkeypatch.setattr("services.workflow_service.dify_config", SimpleNamespace(BILLING_ENABLED=False))
|
||||
|
||||
with pytest.raises(ValueError, match="Start node and trigger nodes cannot coexist"):
|
||||
workflow_service.publish_workflow(session=db_session_with_containers, app_model=app, account=account)
|
||||
workflow_service.publish_workflow(session=db_session_with_containers, app_model=app_model, account=account)
|
||||
|
||||
|
||||
def test_trigger_url_uses_config_base(monkeypatch: pytest.MonkeyPatch):
|
||||
"""TRIGGER_URL 配置应体现在生成的 webhook 与 plugin endpoint 上。"""
|
||||
monkeypatch.setattr(dify_config, "TRIGGER_URL", "https://trigger.example.com/base")
|
||||
endpoint_module = importlib.reload(importlib.import_module("core.trigger.utils.endpoint"))
|
||||
def test_trigger_url_uses_config_base(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""TRIGGER_URL config should be reflected in generated webhook and plugin endpoints."""
|
||||
original_url = getattr(dify_config, "TRIGGER_URL", None)
|
||||
|
||||
assert (
|
||||
endpoint_module.generate_webhook_trigger_endpoint("wh1234567890123456789012")
|
||||
== "https://trigger.example.com/base/triggers/webhook/wh1234567890123456789012"
|
||||
)
|
||||
assert (
|
||||
endpoint_module.generate_webhook_trigger_endpoint("wh1234567890123456789012", True)
|
||||
== "https://trigger.example.com/base/triggers/webhook-debug/wh1234567890123456789012"
|
||||
)
|
||||
assert (
|
||||
endpoint_module.generate_plugin_trigger_endpoint_url("end-1")
|
||||
== "https://trigger.example.com/base/triggers/plugin/end-1"
|
||||
)
|
||||
importlib.reload(importlib.import_module("core.trigger.utils.endpoint"))
|
||||
try:
|
||||
monkeypatch.setattr(dify_config, "TRIGGER_URL", TEST_TRIGGER_URL)
|
||||
endpoint_module = importlib.reload(importlib.import_module("core.trigger.utils.endpoint"))
|
||||
|
||||
assert (
|
||||
endpoint_module.generate_webhook_trigger_endpoint(WEBHOOK_ID_PRODUCTION)
|
||||
== f"{TEST_TRIGGER_URL}/triggers/webhook/{WEBHOOK_ID_PRODUCTION}"
|
||||
)
|
||||
assert (
|
||||
endpoint_module.generate_webhook_trigger_endpoint(WEBHOOK_ID_PRODUCTION, True)
|
||||
== f"{TEST_TRIGGER_URL}/triggers/webhook-debug/{WEBHOOK_ID_PRODUCTION}"
|
||||
)
|
||||
assert (
|
||||
endpoint_module.generate_plugin_trigger_endpoint_url("end-1") == f"{TEST_TRIGGER_URL}/triggers/plugin/end-1"
|
||||
)
|
||||
finally:
|
||||
# Restore original config and reload module
|
||||
if original_url is not None:
|
||||
monkeypatch.setattr(dify_config, "TRIGGER_URL", original_url)
|
||||
importlib.reload(importlib.import_module("core.trigger.utils.endpoint"))
|
||||
|
||||
|
||||
def test_webhook_trigger_creates_trigger_log(
|
||||
test_client_with_containers, db_session_with_containers: Session, monkeypatch: pytest.MonkeyPatch
|
||||
):
|
||||
"""生产 webhook 触发应落库触发日志。"""
|
||||
tenant, account = _create_tenant_and_account(db_session_with_containers)
|
||||
app = _create_app(db_session_with_containers, tenant, account)
|
||||
test_client_with_containers: FlaskClient,
|
||||
db_session_with_containers: Session,
|
||||
tenant_and_account: tuple[Tenant, Account],
|
||||
app_model: App,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""Production webhook trigger should create a trigger log in the database."""
|
||||
tenant, account = tenant_and_account
|
||||
|
||||
webhook_node_id = "webhook-node"
|
||||
graph_json = _build_workflow_graph(webhook_node_id, NodeType.TRIGGER_WEBHOOK)
|
||||
published_workflow = Workflow.new(
|
||||
tenant_id=tenant.id,
|
||||
app_id=app.id,
|
||||
app_id=app_model.id,
|
||||
type="workflow",
|
||||
version=Workflow.version_from_datetime(naive_utc_now()),
|
||||
graph=graph_json,
|
||||
@@ -171,19 +157,19 @@ def test_webhook_trigger_creates_trigger_log(
|
||||
rag_pipeline_variables=[],
|
||||
)
|
||||
db_session_with_containers.add(published_workflow)
|
||||
app.workflow_id = published_workflow.id
|
||||
app_model.workflow_id = published_workflow.id
|
||||
db_session_with_containers.commit()
|
||||
|
||||
webhook_trigger = WorkflowWebhookTrigger(
|
||||
app_id=app.id,
|
||||
app_id=app_model.id,
|
||||
node_id=webhook_node_id,
|
||||
tenant_id=tenant.id,
|
||||
webhook_id="wh1234567890123456789012",
|
||||
webhook_id=WEBHOOK_ID_PRODUCTION,
|
||||
created_by=account.id,
|
||||
)
|
||||
app_trigger = AppTrigger(
|
||||
tenant_id=tenant.id,
|
||||
app_id=app.id,
|
||||
app_id=app_model.id,
|
||||
node_id=webhook_node_id,
|
||||
trigger_type=AppTriggerType.TRIGGER_WEBHOOK,
|
||||
status=AppTriggerStatus.ENABLED,
|
||||
@@ -193,7 +179,7 @@ def test_webhook_trigger_creates_trigger_log(
|
||||
db_session_with_containers.add_all([webhook_trigger, app_trigger])
|
||||
db_session_with_containers.commit()
|
||||
|
||||
def _fake_trigger_workflow_async(session, user, trigger_data):
|
||||
def _fake_trigger_workflow_async(session: Session, user: Any, trigger_data: Any) -> SimpleNamespace:
|
||||
log = WorkflowTriggerLog(
|
||||
tenant_id=trigger_data.tenant_id,
|
||||
app_id=trigger_data.app_id,
|
||||
@@ -216,78 +202,67 @@ def test_webhook_trigger_creates_trigger_log(
|
||||
session.commit()
|
||||
return SimpleNamespace(workflow_trigger_log_id=log.id, task_id=None, status="queued", queue="test")
|
||||
|
||||
monkeypatch.setattr("services.trigger.webhook_service.AsyncWorkflowService.trigger_workflow_async", _fake_trigger_workflow_async)
|
||||
monkeypatch.setattr(
|
||||
webhook_service.AsyncWorkflowService,
|
||||
"trigger_workflow_async",
|
||||
_fake_trigger_workflow_async,
|
||||
)
|
||||
|
||||
response = test_client_with_containers.post(f"/triggers/webhook/{webhook_trigger.webhook_id}", json={"foo": "bar"})
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
db_session_with_containers.expire_all()
|
||||
logs = db_session_with_containers.query(WorkflowTriggerLog).filter_by(app_id=app.id).all()
|
||||
logs = db_session_with_containers.query(WorkflowTriggerLog).filter_by(app_id=app_model.id).all()
|
||||
assert logs, "Webhook trigger should create trigger log"
|
||||
|
||||
|
||||
def test_schedule_poll_dispatches_due_plans(
|
||||
db_session_with_containers: Session, monkeypatch: pytest.MonkeyPatch
|
||||
):
|
||||
"""schedule 两种配置均应被轮询并派发。"""
|
||||
tenant, account = _create_tenant_and_account(db_session_with_containers)
|
||||
app = _create_app(db_session_with_containers, tenant, account)
|
||||
@pytest.mark.parametrize("schedule_type", ["visual", "cron"])
|
||||
def test_schedule_poll_dispatches_due_plan(
|
||||
db_session_with_containers: Session,
|
||||
tenant_and_account: tuple[Tenant, Account],
|
||||
app_model: App,
|
||||
mock_celery_group: MockCeleryGroup,
|
||||
mock_celery_signature: MockCelerySignature,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
schedule_type: str,
|
||||
) -> None:
|
||||
"""Schedule plans (both visual and cron) should be polled and dispatched when due."""
|
||||
tenant, _ = tenant_and_account
|
||||
|
||||
for idx in ("visual", "cron"):
|
||||
app_trigger = AppTrigger(
|
||||
tenant_id=tenant.id,
|
||||
app_id=app.id,
|
||||
node_id=f"schedule-{idx}",
|
||||
trigger_type=AppTriggerType.TRIGGER_SCHEDULE,
|
||||
status=AppTriggerStatus.ENABLED,
|
||||
title=f"schedule-{idx}",
|
||||
)
|
||||
plan = WorkflowSchedulePlan(
|
||||
app_id=app.id,
|
||||
node_id=f"schedule-{idx}",
|
||||
tenant_id=tenant.id,
|
||||
cron_expression="* * * * *",
|
||||
timezone="UTC",
|
||||
next_run_at=naive_utc_now() - timedelta(minutes=1),
|
||||
)
|
||||
db_session_with_containers.add_all([app_trigger, plan])
|
||||
app_trigger = AppTrigger(
|
||||
tenant_id=tenant.id,
|
||||
app_id=app_model.id,
|
||||
node_id=f"schedule-{schedule_type}",
|
||||
trigger_type=AppTriggerType.TRIGGER_SCHEDULE,
|
||||
status=AppTriggerStatus.ENABLED,
|
||||
title=f"schedule-{schedule_type}",
|
||||
)
|
||||
plan = WorkflowSchedulePlan(
|
||||
app_id=app_model.id,
|
||||
node_id=f"schedule-{schedule_type}",
|
||||
tenant_id=tenant.id,
|
||||
cron_expression="* * * * *",
|
||||
timezone="UTC",
|
||||
next_run_at=naive_utc_now() - timedelta(minutes=1),
|
||||
)
|
||||
db_session_with_containers.add_all([app_trigger, plan])
|
||||
db_session_with_containers.commit()
|
||||
|
||||
dispatched_batches: list[list[dict] | str] = []
|
||||
next_time = naive_utc_now() + timedelta(hours=1)
|
||||
|
||||
monkeypatch.setattr(workflow_schedule_task, "calculate_next_run_at", lambda *_args, **_kwargs: next_time)
|
||||
|
||||
class _DummyGroup:
|
||||
def __init__(self, sink: list[list[dict] | str]):
|
||||
self.sink = sink
|
||||
|
||||
def __call__(self, items):
|
||||
collected = list(items)
|
||||
self.sink.append(collected)
|
||||
return self
|
||||
|
||||
def apply_async(self):
|
||||
self.sink.append("applied")
|
||||
|
||||
class _DummySignature:
|
||||
def s(self, schedule_id: str) -> dict:
|
||||
return {"schedule_id": schedule_id}
|
||||
|
||||
monkeypatch.setattr(workflow_schedule_task, "group", _DummyGroup(dispatched_batches))
|
||||
monkeypatch.setattr(workflow_schedule_task, "run_schedule_trigger", _DummySignature())
|
||||
monkeypatch.setattr(workflow_schedule_task, "group", mock_celery_group)
|
||||
monkeypatch.setattr(workflow_schedule_task, "run_schedule_trigger", mock_celery_signature)
|
||||
|
||||
poll_workflow_schedules()
|
||||
|
||||
dispatched = [item for item in dispatched_batches if isinstance(item, list)]
|
||||
assert dispatched, "should dispatch signatures for due schedules"
|
||||
scheduled_ids = {sig["schedule_id"] for sig in dispatched[0]}
|
||||
assert scheduled_ids == {plan.id for plan in db_session_with_containers.query(WorkflowSchedulePlan).all()}
|
||||
assert mock_celery_group.collected, f"Should dispatch signatures for due {schedule_type} schedules"
|
||||
scheduled_ids = {sig["schedule_id"] for sig in mock_celery_group.collected}
|
||||
assert plan.id in scheduled_ids
|
||||
|
||||
|
||||
def test_schedule_visual_debug_poll_generates_event(monkeypatch: pytest.MonkeyPatch):
|
||||
"""可视化配置的 schedule 节点应能在单步调试中产出事件。"""
|
||||
def test_schedule_visual_debug_poll_generates_event(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""Visual mode schedule node should generate event in single-step debug."""
|
||||
base_now = naive_utc_now()
|
||||
monkeypatch.setattr(event_selectors, "naive_utc_now", lambda: base_now)
|
||||
monkeypatch.setattr(
|
||||
@@ -318,34 +293,23 @@ def test_schedule_visual_debug_poll_generates_event(monkeypatch: pytest.MonkeyPa
|
||||
|
||||
|
||||
def test_plugin_trigger_dispatches_and_debug_events(
|
||||
test_client_with_containers, monkeypatch: pytest.MonkeyPatch
|
||||
):
|
||||
"""插件 trigger 端点应派发事件并产生调试事件。"""
|
||||
test_client_with_containers: FlaskClient,
|
||||
mock_plugin_subscription: MockPluginSubscription,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""Plugin trigger endpoint should dispatch events and generate debug events."""
|
||||
endpoint_id = "1cc7fa12-3f7b-4f6a-9c8d-1234567890ab"
|
||||
|
||||
class _FakeSubscription:
|
||||
def __init__(self):
|
||||
self.id = "sub-1"
|
||||
self.tenant_id = "tenant-1"
|
||||
self.provider_id = "provider-1"
|
||||
self.credentials = {"token": "secret"}
|
||||
self.credential_type = "api-key"
|
||||
debug_events: list[dict[str, Any]] = []
|
||||
dispatched_payloads: list[dict[str, Any]] = []
|
||||
|
||||
def to_entity(self):
|
||||
return self
|
||||
|
||||
subscription = _FakeSubscription()
|
||||
debug_events: list[dict] = []
|
||||
dispatched_payloads: list[dict] = []
|
||||
|
||||
# 简化 process_endpoint,仍走实际路由但不依赖真实插件
|
||||
def _fake_process_endpoint(_endpoint_id: str, _request):
|
||||
def _fake_process_endpoint(_endpoint_id: str, _request: Any) -> Response:
|
||||
dispatch_data = {
|
||||
"user_id": "end-user",
|
||||
"tenant_id": subscription.tenant_id,
|
||||
"tenant_id": mock_plugin_subscription.tenant_id,
|
||||
"endpoint_id": _endpoint_id,
|
||||
"provider_id": subscription.provider_id,
|
||||
"subscription_id": subscription.id,
|
||||
"provider_id": mock_plugin_subscription.provider_id,
|
||||
"subscription_id": mock_plugin_subscription.id,
|
||||
"timestamp": int(time.time()),
|
||||
"events": ["created", "updated"],
|
||||
"request_id": f"req-{_endpoint_id}",
|
||||
@@ -364,14 +328,14 @@ def test_plugin_trigger_dispatches_and_debug_events(
|
||||
staticmethod(lambda **kwargs: debug_events.append(kwargs) or 1),
|
||||
)
|
||||
|
||||
def _fake_delay(dispatch_data: dict):
|
||||
def _fake_delay(dispatch_data: dict[str, Any]) -> None:
|
||||
dispatched_payloads.append(dispatch_data)
|
||||
trigger_processing_tasks.dispatch_trigger_debug_event(
|
||||
events=dispatch_data["events"],
|
||||
user_id=dispatch_data["user_id"],
|
||||
timestamp=dispatch_data["timestamp"],
|
||||
request_id=dispatch_data["request_id"],
|
||||
subscription=subscription,
|
||||
subscription=mock_plugin_subscription,
|
||||
)
|
||||
|
||||
monkeypatch.setattr(
|
||||
@@ -383,23 +347,26 @@ def test_plugin_trigger_dispatches_and_debug_events(
|
||||
response = test_client_with_containers.post(f"/triggers/plugin/{endpoint_id}", json={"hello": "world"})
|
||||
|
||||
assert response.status_code == 202
|
||||
assert dispatched_payloads, "plugin trigger should enqueue workflow dispatch payload"
|
||||
assert debug_events, "plugin trigger should dispatch debug events"
|
||||
dispatched_events = {event["event"].name for event in debug_events}
|
||||
assert dispatched_events == {"created", "updated"}
|
||||
assert dispatched_payloads, "Plugin trigger should enqueue workflow dispatch payload"
|
||||
assert debug_events, "Plugin trigger should dispatch debug events"
|
||||
dispatched_event_names = {event["event"].name for event in debug_events}
|
||||
assert dispatched_event_names == {"created", "updated"}
|
||||
|
||||
|
||||
def test_webhook_debug_dispatches_event(
|
||||
test_client_with_containers, db_session_with_containers: Session, monkeypatch: pytest.MonkeyPatch
|
||||
):
|
||||
"""webhook 单步调试应派发调试事件,并可被轮询拿到。"""
|
||||
tenant, account = _create_tenant_and_account(db_session_with_containers)
|
||||
app = _create_app(db_session_with_containers, tenant, account)
|
||||
test_client_with_containers: FlaskClient,
|
||||
db_session_with_containers: Session,
|
||||
tenant_and_account: tuple[Tenant, Account],
|
||||
app_model: App,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""Webhook single-step debug should dispatch debug event and be pollable."""
|
||||
tenant, account = tenant_and_account
|
||||
webhook_node_id = "webhook-debug-node"
|
||||
graph_json = _build_workflow_graph(webhook_node_id, NodeType.TRIGGER_WEBHOOK)
|
||||
draft_workflow = Workflow.new(
|
||||
tenant_id=tenant.id,
|
||||
app_id=app.id,
|
||||
app_id=app_model.id,
|
||||
type="workflow",
|
||||
version=Workflow.VERSION_DRAFT,
|
||||
graph=graph_json,
|
||||
@@ -413,27 +380,27 @@ def test_webhook_debug_dispatches_event(
|
||||
db_session_with_containers.commit()
|
||||
|
||||
webhook_trigger = WorkflowWebhookTrigger(
|
||||
app_id=app.id,
|
||||
app_id=app_model.id,
|
||||
node_id=webhook_node_id,
|
||||
tenant_id=tenant.id,
|
||||
webhook_id="whdebug1234567890123456",
|
||||
webhook_id=WEBHOOK_ID_DEBUG,
|
||||
created_by=account.id,
|
||||
)
|
||||
db_session_with_containers.add(webhook_trigger)
|
||||
db_session_with_containers.commit()
|
||||
|
||||
debug_events: list[dict] = []
|
||||
debug_events: list[dict[str, Any]] = []
|
||||
original_dispatch = TriggerDebugEventBus.dispatch
|
||||
monkeypatch.setattr(
|
||||
"controllers.trigger.webhook.TriggerDebugEventBus.dispatch",
|
||||
lambda **kwargs: (debug_events.append(kwargs), original_dispatch(**kwargs))[1],
|
||||
)
|
||||
|
||||
# listener 预先轮询一次,进入等待池
|
||||
# Listener polls first to enter waiting pool
|
||||
poller = WebhookTriggerDebugEventPoller(
|
||||
tenant_id=tenant.id,
|
||||
user_id=account.id,
|
||||
app_id=app.id,
|
||||
app_id=app_model.id,
|
||||
node_config=draft_workflow.get_node_config_by_id(webhook_node_id),
|
||||
node_id=webhook_node_id,
|
||||
)
|
||||
@@ -446,16 +413,19 @@ def test_webhook_debug_dispatches_event(
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert debug_events, "debug event should be sent to event bus"
|
||||
# 第二次轮询应拿到事件
|
||||
assert debug_events, "Debug event should be sent to event bus"
|
||||
# Second poll should get the event
|
||||
event = poller.poll()
|
||||
assert event is not None
|
||||
assert event.workflow_args["inputs"]["webhook_body"]["foo"] == "bar"
|
||||
assert debug_events[0]["pool_key"].endswith(f":{app.id}:{webhook_node_id}")
|
||||
assert debug_events[0]["pool_key"].endswith(f":{app_model.id}:{webhook_node_id}")
|
||||
|
||||
|
||||
def test_plugin_single_step_debug_flow(flask_app_with_containers, monkeypatch: pytest.MonkeyPatch):
|
||||
"""插件单步调试:监听 -> 派发事件 -> poller 收到并返回变量。"""
|
||||
def test_plugin_single_step_debug_flow(
|
||||
flask_app_with_containers: Flask,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""Plugin single-step debug: listen -> dispatch event -> poller receives and returns variables."""
|
||||
tenant_id = "tenant-1"
|
||||
app_id = "app-1"
|
||||
user_id = "user-1"
|
||||
@@ -474,7 +444,7 @@ def test_plugin_single_step_debug_flow(flask_app_with_containers, monkeypatch: p
|
||||
"parameters": {},
|
||||
},
|
||||
}
|
||||
# 监听
|
||||
# Start listening
|
||||
poller = PluginTriggerDebugEventPoller(
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
|
||||
Reference in New Issue
Block a user