feat(sdk): enhance Python SDK with 27 new Service API endpoints (#26401)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
lyzno1
2025-09-29 19:22:58 +08:00
committed by GitHub
parent b2bcb6d21a
commit f60aa36fa0
3 changed files with 640 additions and 1 deletions

View File

@@ -4,6 +4,7 @@ from dify_client.client import (
DifyClient,
KnowledgeBaseClient,
WorkflowClient,
WorkspaceClient,
)
__all__ = [
@@ -12,4 +13,5 @@ __all__ = [
"DifyClient",
"KnowledgeBaseClient",
"WorkflowClient",
"WorkspaceClient",
]

View File

@@ -1,5 +1,6 @@
import json
from typing import IO, Literal
from typing import Literal, Union, Dict, List, Any, Optional, IO
import requests
@@ -49,6 +50,18 @@ class DifyClient:
params = {"user": user}
return self._send_request("GET", "/meta", params=params)
def get_app_info(self):
"""Get basic application information including name, description, tags, and mode."""
return self._send_request("GET", "/info")
def get_app_site_info(self):
"""Get application site information."""
return self._send_request("GET", "/site")
def get_file_preview(self, file_id: str):
"""Get file preview by file ID."""
return self._send_request("GET", f"/files/{file_id}/preview")
class CompletionClient(DifyClient):
def create_completion_message(
@@ -144,6 +157,51 @@ class ChatClient(DifyClient):
files = {"file": audio_file}
return self._send_request_with_files("POST", "/audio-to-text", data, files)
# Annotation APIs
def annotation_reply_action(
self,
action: Literal["enable", "disable"],
score_threshold: float,
embedding_provider_name: str,
embedding_model_name: str,
):
"""Enable or disable annotation reply feature."""
# Backend API requires these fields to be non-None values
if score_threshold is None or embedding_provider_name is None or embedding_model_name is None:
raise ValueError("score_threshold, embedding_provider_name, and embedding_model_name cannot be None")
data = {
"score_threshold": score_threshold,
"embedding_provider_name": embedding_provider_name,
"embedding_model_name": embedding_model_name,
}
return self._send_request("POST", f"/apps/annotation-reply/{action}", json=data)
def get_annotation_reply_status(self, action: Literal["enable", "disable"], job_id: str):
"""Get the status of an annotation reply action job."""
return self._send_request("GET", f"/apps/annotation-reply/{action}/status/{job_id}")
def list_annotations(self, page: int = 1, limit: int = 20, keyword: str = ""):
"""List annotations for the application."""
params = {"page": page, "limit": limit}
if keyword:
params["keyword"] = keyword
return self._send_request("GET", "/apps/annotations", params=params)
def create_annotation(self, question: str, answer: str):
"""Create a new annotation."""
data = {"question": question, "answer": answer}
return self._send_request("POST", "/apps/annotations", json=data)
def update_annotation(self, annotation_id: str, question: str, answer: str):
"""Update an existing annotation."""
data = {"question": question, "answer": answer}
return self._send_request("PUT", f"/apps/annotations/{annotation_id}", json=data)
def delete_annotation(self, annotation_id: str):
"""Delete an annotation."""
return self._send_request("DELETE", f"/apps/annotations/{annotation_id}")
class WorkflowClient(DifyClient):
def run(self, inputs: dict, response_mode: Literal["blocking", "streaming"] = "streaming", user: str = "abc-123"):
@@ -157,6 +215,55 @@ class WorkflowClient(DifyClient):
def get_result(self, workflow_run_id):
return self._send_request("GET", f"/workflows/run/{workflow_run_id}")
def get_workflow_logs(
self,
keyword: str = None,
status: Literal["succeeded", "failed", "stopped"] | None = None,
page: int = 1,
limit: int = 20,
created_at__before: str = None,
created_at__after: str = None,
created_by_end_user_session_id: str = None,
created_by_account: str = None,
):
"""Get workflow execution logs with optional filtering."""
params = {"page": page, "limit": limit}
if keyword:
params["keyword"] = keyword
if status:
params["status"] = status
if created_at__before:
params["created_at__before"] = created_at__before
if created_at__after:
params["created_at__after"] = created_at__after
if created_by_end_user_session_id:
params["created_by_end_user_session_id"] = created_by_end_user_session_id
if created_by_account:
params["created_by_account"] = created_by_account
return self._send_request("GET", "/workflows/logs", params=params)
def run_specific_workflow(
self,
workflow_id: str,
inputs: dict,
response_mode: Literal["blocking", "streaming"] = "streaming",
user: str = "abc-123",
):
"""Run a specific workflow by workflow ID."""
data = {"inputs": inputs, "response_mode": response_mode, "user": user}
return self._send_request(
"POST", f"/workflows/{workflow_id}/run", data, stream=True if response_mode == "streaming" else False
)
class WorkspaceClient(DifyClient):
"""Client for workspace-related operations."""
def get_available_models(self, model_type: str):
"""Get available models by model type."""
url = f"/workspaces/current/models/model-types/{model_type}"
return self._send_request("GET", url)
class KnowledgeBaseClient(DifyClient):
def __init__(
@@ -443,3 +550,117 @@ class KnowledgeBaseClient(DifyClient):
data = {"segment": segment_data}
url = f"/datasets/{self._get_dataset_id()}/documents/{document_id}/segments/{segment_id}"
return self._send_request("POST", url, json=data, **kwargs)
# Advanced Knowledge Base APIs
def hit_testing(
self, query: str, retrieval_model: Dict[str, Any] = None, external_retrieval_model: Dict[str, Any] = None
):
"""Perform hit testing on the dataset."""
data = {"query": query}
if retrieval_model:
data["retrieval_model"] = retrieval_model
if external_retrieval_model:
data["external_retrieval_model"] = external_retrieval_model
url = f"/datasets/{self._get_dataset_id()}/hit-testing"
return self._send_request("POST", url, json=data)
def get_dataset_metadata(self):
"""Get dataset metadata."""
url = f"/datasets/{self._get_dataset_id()}/metadata"
return self._send_request("GET", url)
def create_dataset_metadata(self, metadata_data: Dict[str, Any]):
"""Create dataset metadata."""
url = f"/datasets/{self._get_dataset_id()}/metadata"
return self._send_request("POST", url, json=metadata_data)
def update_dataset_metadata(self, metadata_id: str, metadata_data: Dict[str, Any]):
"""Update dataset metadata."""
url = f"/datasets/{self._get_dataset_id()}/metadata/{metadata_id}"
return self._send_request("PATCH", url, json=metadata_data)
def get_built_in_metadata(self):
"""Get built-in metadata."""
url = f"/datasets/{self._get_dataset_id()}/metadata/built-in"
return self._send_request("GET", url)
def manage_built_in_metadata(self, action: str, metadata_data: Dict[str, Any] = None):
"""Manage built-in metadata with specified action."""
data = metadata_data or {}
url = f"/datasets/{self._get_dataset_id()}/metadata/built-in/{action}"
return self._send_request("POST", url, json=data)
def update_documents_metadata(self, operation_data: List[Dict[str, Any]]):
"""Update metadata for multiple documents."""
url = f"/datasets/{self._get_dataset_id()}/documents/metadata"
data = {"operation_data": operation_data}
return self._send_request("POST", url, json=data)
# Dataset Tags APIs
def list_dataset_tags(self):
"""List all dataset tags."""
return self._send_request("GET", "/datasets/tags")
def bind_dataset_tags(self, tag_ids: List[str]):
"""Bind tags to dataset."""
data = {"tag_ids": tag_ids, "target_id": self._get_dataset_id()}
return self._send_request("POST", "/datasets/tags/binding", json=data)
def unbind_dataset_tag(self, tag_id: str):
"""Unbind a single tag from dataset."""
data = {"tag_id": tag_id, "target_id": self._get_dataset_id()}
return self._send_request("POST", "/datasets/tags/unbinding", json=data)
def get_dataset_tags(self):
"""Get tags for current dataset."""
url = f"/datasets/{self._get_dataset_id()}/tags"
return self._send_request("GET", url)
# RAG Pipeline APIs
def get_datasource_plugins(self, is_published: bool = True):
"""Get datasource plugins for RAG pipeline."""
params = {"is_published": is_published}
url = f"/datasets/{self._get_dataset_id()}/pipeline/datasource-plugins"
return self._send_request("GET", url, params=params)
def run_datasource_node(
self,
node_id: str,
inputs: Dict[str, Any],
datasource_type: str,
is_published: bool = True,
credential_id: str = None,
):
"""Run a datasource node in RAG pipeline."""
data = {"inputs": inputs, "datasource_type": datasource_type, "is_published": is_published}
if credential_id:
data["credential_id"] = credential_id
url = f"/datasets/{self._get_dataset_id()}/pipeline/datasource/nodes/{node_id}/run"
return self._send_request("POST", url, json=data, stream=True)
def run_rag_pipeline(
self,
inputs: Dict[str, Any],
datasource_type: str,
datasource_info_list: List[Dict[str, Any]],
start_node_id: str,
is_published: bool = True,
response_mode: Literal["streaming", "blocking"] = "blocking",
):
"""Run RAG pipeline."""
data = {
"inputs": inputs,
"datasource_type": datasource_type,
"datasource_info_list": datasource_info_list,
"start_node_id": start_node_id,
"is_published": is_published,
"response_mode": response_mode,
}
url = f"/datasets/{self._get_dataset_id()}/pipeline/run"
return self._send_request("POST", url, json=data, stream=response_mode == "streaming")
def upload_pipeline_file(self, file_path: str):
"""Upload file for RAG pipeline."""
with open(file_path, "rb") as f:
files = {"file": f}
return self._send_request_with_files("POST", "/datasets/pipeline/file-upload", {}, files)

View File

@@ -0,0 +1,416 @@
#!/usr/bin/env python3
"""
Test suite for the new Service API functionality in the Python SDK.
This test validates the implementation of the missing Service API endpoints
that were added to the Python SDK to achieve complete coverage.
"""
import unittest
from unittest.mock import Mock, patch, MagicMock
import json
from dify_client import (
DifyClient,
ChatClient,
WorkflowClient,
KnowledgeBaseClient,
WorkspaceClient,
)
class TestNewServiceAPIs(unittest.TestCase):
"""Test cases for new Service API implementations."""
def setUp(self):
"""Set up test fixtures."""
self.api_key = "test-api-key"
self.base_url = "https://api.dify.ai/v1"
@patch("dify_client.client.requests.request")
def test_app_info_apis(self, mock_request):
"""Test application info APIs."""
mock_response = Mock()
mock_response.json.return_value = {
"name": "Test App",
"description": "Test Description",
"tags": ["test", "api"],
"mode": "chat",
"author_name": "Test Author",
}
mock_request.return_value = mock_response
client = DifyClient(self.api_key, self.base_url)
# Test get_app_info
result = client.get_app_info()
mock_request.assert_called_with(
"GET",
f"{self.base_url}/info",
json=None,
params=None,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
stream=False,
)
# Test get_app_site_info
client.get_app_site_info()
mock_request.assert_called_with(
"GET",
f"{self.base_url}/site",
json=None,
params=None,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
stream=False,
)
# Test get_file_preview
file_id = "test-file-id"
client.get_file_preview(file_id)
mock_request.assert_called_with(
"GET",
f"{self.base_url}/files/{file_id}/preview",
json=None,
params=None,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
stream=False,
)
@patch("dify_client.client.requests.request")
def test_annotation_apis(self, mock_request):
"""Test annotation APIs."""
mock_response = Mock()
mock_response.json.return_value = {"result": "success"}
mock_request.return_value = mock_response
client = ChatClient(self.api_key, self.base_url)
# Test annotation_reply_action - enable
client.annotation_reply_action(
action="enable",
score_threshold=0.8,
embedding_provider_name="openai",
embedding_model_name="text-embedding-ada-002",
)
mock_request.assert_called_with(
"POST",
f"{self.base_url}/apps/annotation-reply/enable",
json={
"score_threshold": 0.8,
"embedding_provider_name": "openai",
"embedding_model_name": "text-embedding-ada-002",
},
params=None,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
stream=False,
)
# Test annotation_reply_action - disable (now requires same fields as enable)
client.annotation_reply_action(
action="disable",
score_threshold=0.5,
embedding_provider_name="openai",
embedding_model_name="text-embedding-ada-002",
)
# Test annotation_reply_action with score_threshold=0 (edge case)
client.annotation_reply_action(
action="enable",
score_threshold=0.0, # This should work and not raise ValueError
embedding_provider_name="openai",
embedding_model_name="text-embedding-ada-002",
)
# Test get_annotation_reply_status
client.get_annotation_reply_status("enable", "job-123")
# Test list_annotations
client.list_annotations(page=1, limit=20, keyword="test")
# Test create_annotation
client.create_annotation("Test question?", "Test answer.")
# Test update_annotation
client.update_annotation("annotation-123", "Updated question?", "Updated answer.")
# Test delete_annotation
client.delete_annotation("annotation-123")
# Verify all calls were made (8 calls: enable + disable + enable with 0.0 + 5 other operations)
self.assertEqual(mock_request.call_count, 8)
@patch("dify_client.client.requests.request")
def test_knowledge_base_advanced_apis(self, mock_request):
"""Test advanced knowledge base APIs."""
mock_response = Mock()
mock_response.json.return_value = {"result": "success"}
mock_request.return_value = mock_response
dataset_id = "test-dataset-id"
client = KnowledgeBaseClient(self.api_key, self.base_url, dataset_id)
# Test hit_testing
client.hit_testing("test query", {"type": "vector"})
mock_request.assert_called_with(
"POST",
f"{self.base_url}/datasets/{dataset_id}/hit-testing",
json={"query": "test query", "retrieval_model": {"type": "vector"}},
params=None,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
stream=False,
)
# Test metadata operations
client.get_dataset_metadata()
client.create_dataset_metadata({"key": "value"})
client.update_dataset_metadata("meta-123", {"key": "new_value"})
client.get_built_in_metadata()
client.manage_built_in_metadata("enable", {"type": "built_in"})
client.update_documents_metadata([{"document_id": "doc1", "metadata": {"key": "value"}}])
# Test tag operations
client.list_dataset_tags()
client.bind_dataset_tags(["tag1", "tag2"])
client.unbind_dataset_tag("tag1")
client.get_dataset_tags()
# Verify multiple calls were made
self.assertGreater(mock_request.call_count, 5)
@patch("dify_client.client.requests.request")
def test_rag_pipeline_apis(self, mock_request):
"""Test RAG pipeline APIs."""
mock_response = Mock()
mock_response.json.return_value = {"result": "success"}
mock_request.return_value = mock_response
dataset_id = "test-dataset-id"
client = KnowledgeBaseClient(self.api_key, self.base_url, dataset_id)
# Test get_datasource_plugins
client.get_datasource_plugins(is_published=True)
mock_request.assert_called_with(
"GET",
f"{self.base_url}/datasets/{dataset_id}/pipeline/datasource-plugins",
json=None,
params={"is_published": True},
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
stream=False,
)
# Test run_datasource_node
client.run_datasource_node(
node_id="node-123",
inputs={"param": "value"},
datasource_type="online_document",
is_published=True,
credential_id="cred-123",
)
# Test run_rag_pipeline with blocking mode
client.run_rag_pipeline(
inputs={"query": "test"},
datasource_type="online_document",
datasource_info_list=[{"id": "ds1"}],
start_node_id="start-node",
is_published=True,
response_mode="blocking",
)
# Test run_rag_pipeline with streaming mode
client.run_rag_pipeline(
inputs={"query": "test"},
datasource_type="online_document",
datasource_info_list=[{"id": "ds1"}],
start_node_id="start-node",
is_published=True,
response_mode="streaming",
)
self.assertEqual(mock_request.call_count, 4)
@patch("dify_client.client.requests.request")
def test_workspace_apis(self, mock_request):
"""Test workspace APIs."""
mock_response = Mock()
mock_response.json.return_value = {
"data": [{"name": "gpt-3.5-turbo", "type": "llm"}, {"name": "gpt-4", "type": "llm"}]
}
mock_request.return_value = mock_response
client = WorkspaceClient(self.api_key, self.base_url)
# Test get_available_models
result = client.get_available_models("llm")
mock_request.assert_called_with(
"GET",
f"{self.base_url}/workspaces/current/models/model-types/llm",
json=None,
params=None,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
stream=False,
)
@patch("dify_client.client.requests.request")
def test_workflow_advanced_apis(self, mock_request):
"""Test advanced workflow APIs."""
mock_response = Mock()
mock_response.json.return_value = {"result": "success"}
mock_request.return_value = mock_response
client = WorkflowClient(self.api_key, self.base_url)
# Test get_workflow_logs
client.get_workflow_logs(keyword="test", status="succeeded", page=1, limit=20)
mock_request.assert_called_with(
"GET",
f"{self.base_url}/workflows/logs",
json=None,
params={"page": 1, "limit": 20, "keyword": "test", "status": "succeeded"},
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
stream=False,
)
# Test get_workflow_logs with additional filters
client.get_workflow_logs(
keyword="test",
status="succeeded",
page=1,
limit=20,
created_at__before="2024-01-01",
created_at__after="2023-01-01",
created_by_account="user123",
)
# Test run_specific_workflow
client.run_specific_workflow(
workflow_id="workflow-123", inputs={"param": "value"}, response_mode="streaming", user="user-123"
)
self.assertEqual(mock_request.call_count, 3)
def test_error_handling(self):
"""Test error handling for required parameters."""
client = ChatClient(self.api_key, self.base_url)
# Test annotation_reply_action with missing required parameters would be a TypeError now
# since parameters are required in method signature
with self.assertRaises(TypeError):
client.annotation_reply_action("enable")
# Test annotation_reply_action with explicit None values should raise ValueError
with self.assertRaises(ValueError) as context:
client.annotation_reply_action("enable", None, "provider", "model")
self.assertIn("cannot be None", str(context.exception))
# Test KnowledgeBaseClient without dataset_id
kb_client = KnowledgeBaseClient(self.api_key, self.base_url)
with self.assertRaises(ValueError) as context:
kb_client.hit_testing("test query")
self.assertIn("dataset_id is not set", str(context.exception))
@patch("dify_client.client.open")
@patch("dify_client.client.requests.request")
def test_file_upload_apis(self, mock_request, mock_open):
"""Test file upload APIs."""
mock_response = Mock()
mock_response.json.return_value = {"result": "success"}
mock_request.return_value = mock_response
mock_file = MagicMock()
mock_open.return_value.__enter__.return_value = mock_file
dataset_id = "test-dataset-id"
client = KnowledgeBaseClient(self.api_key, self.base_url, dataset_id)
# Test upload_pipeline_file
client.upload_pipeline_file("/path/to/test.pdf")
mock_open.assert_called_with("/path/to/test.pdf", "rb")
mock_request.assert_called_once()
def test_comprehensive_coverage(self):
"""Test that all previously missing APIs are now implemented."""
# Test DifyClient methods
dify_methods = ["get_app_info", "get_app_site_info", "get_file_preview"]
client = DifyClient(self.api_key)
for method in dify_methods:
self.assertTrue(hasattr(client, method), f"DifyClient missing method: {method}")
# Test ChatClient annotation methods
chat_methods = [
"annotation_reply_action",
"get_annotation_reply_status",
"list_annotations",
"create_annotation",
"update_annotation",
"delete_annotation",
]
chat_client = ChatClient(self.api_key)
for method in chat_methods:
self.assertTrue(hasattr(chat_client, method), f"ChatClient missing method: {method}")
# Test WorkflowClient advanced methods
workflow_methods = ["get_workflow_logs", "run_specific_workflow"]
workflow_client = WorkflowClient(self.api_key)
for method in workflow_methods:
self.assertTrue(hasattr(workflow_client, method), f"WorkflowClient missing method: {method}")
# Test KnowledgeBaseClient advanced methods
kb_methods = [
"hit_testing",
"get_dataset_metadata",
"create_dataset_metadata",
"update_dataset_metadata",
"get_built_in_metadata",
"manage_built_in_metadata",
"update_documents_metadata",
"list_dataset_tags",
"bind_dataset_tags",
"unbind_dataset_tag",
"get_dataset_tags",
"get_datasource_plugins",
"run_datasource_node",
"run_rag_pipeline",
"upload_pipeline_file",
]
kb_client = KnowledgeBaseClient(self.api_key)
for method in kb_methods:
self.assertTrue(hasattr(kb_client, method), f"KnowledgeBaseClient missing method: {method}")
# Test WorkspaceClient methods
workspace_methods = ["get_available_models"]
workspace_client = WorkspaceClient(self.api_key)
for method in workspace_methods:
self.assertTrue(hasattr(workspace_client, method), f"WorkspaceClient missing method: {method}")
if __name__ == "__main__":
unittest.main()