diff --git a/api/core/model_runtime/model_providers/anthropic/llm/claude-3-5-haiku-20241022.yaml b/api/core/model_runtime/model_providers/anthropic/llm/claude-3-5-haiku-20241022.yaml index cae4c67e4a..892146f6a5 100644 --- a/api/core/model_runtime/model_providers/anthropic/llm/claude-3-5-haiku-20241022.yaml +++ b/api/core/model_runtime/model_providers/anthropic/llm/claude-3-5-haiku-20241022.yaml @@ -4,7 +4,6 @@ label: model_type: llm features: - agent-thought - - vision - tool-call - stream-tool-call model_properties: diff --git a/api/core/model_runtime/model_providers/bedrock/llm/anthropic.claude-3-5-haiku-v1.yaml b/api/core/model_runtime/model_providers/bedrock/llm/anthropic.claude-3-5-haiku-v1.yaml index 35fc8d0d11..9d693dcd48 100644 --- a/api/core/model_runtime/model_providers/bedrock/llm/anthropic.claude-3-5-haiku-v1.yaml +++ b/api/core/model_runtime/model_providers/bedrock/llm/anthropic.claude-3-5-haiku-v1.yaml @@ -4,7 +4,6 @@ label: model_type: llm features: - agent-thought - - vision - tool-call - stream-tool-call model_properties: diff --git a/api/core/model_runtime/model_providers/bedrock/llm/us.anthropic.claude-3-5-haiku-v1.yaml b/api/core/model_runtime/model_providers/bedrock/llm/us.anthropic.claude-3-5-haiku-v1.yaml index a9b66b1925..9781965555 100644 --- a/api/core/model_runtime/model_providers/bedrock/llm/us.anthropic.claude-3-5-haiku-v1.yaml +++ b/api/core/model_runtime/model_providers/bedrock/llm/us.anthropic.claude-3-5-haiku-v1.yaml @@ -4,7 +4,6 @@ label: model_type: llm features: - agent-thought - - vision - tool-call - stream-tool-call model_properties: diff --git a/api/core/model_runtime/model_providers/openrouter/llm/claude-3-5-haiku.yaml b/api/core/model_runtime/model_providers/openrouter/llm/claude-3-5-haiku.yaml index 773befbec5..de45093a72 100644 --- a/api/core/model_runtime/model_providers/openrouter/llm/claude-3-5-haiku.yaml +++ b/api/core/model_runtime/model_providers/openrouter/llm/claude-3-5-haiku.yaml @@ -4,7 +4,6 @@ label: model_type: llm features: - agent-thought - - vision - tool-call - stream-tool-call model_properties: diff --git a/api/core/ops/entities/config_entity.py b/api/core/ops/entities/config_entity.py index 5c79867571..3454ab20e6 100644 --- a/api/core/ops/entities/config_entity.py +++ b/api/core/ops/entities/config_entity.py @@ -54,3 +54,6 @@ class LangSmithConfig(BaseTracingConfig): raise ValueError("endpoint must start with https://") return v + + +OPS_FILE_PATH = "ops_trace/" diff --git a/api/core/ops/ops_trace_manager.py b/api/core/ops/ops_trace_manager.py index 764944f799..1421c6ee61 100644 --- a/api/core/ops/ops_trace_manager.py +++ b/api/core/ops/ops_trace_manager.py @@ -6,12 +6,13 @@ import threading import time from datetime import timedelta from typing import Any, Optional, Union -from uuid import UUID +from uuid import UUID, uuid4 from flask import current_app from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token from core.ops.entities.config_entity import ( + OPS_FILE_PATH, LangfuseConfig, LangSmithConfig, TracingProviderEnum, @@ -28,8 +29,9 @@ from core.ops.entities.trace_entity import ( ) from core.ops.langfuse_trace.langfuse_trace import LangFuseDataTrace from core.ops.langsmith_trace.langsmith_trace import LangSmithDataTrace -from core.ops.utils import get_message_data +from core.ops.utils import convert_datetime_to_str, get_message_data from extensions.ext_database import db +from extensions.ext_storage import storage from models.model import App, AppModelConfig, Conversation, Message, MessageAgentThought, MessageFile, TraceAppConfig from models.workflow import WorkflowAppLog, WorkflowRun from tasks.ops_trace_task import process_trace_tasks @@ -740,10 +742,19 @@ class TraceQueueManager: def send_to_celery(self, tasks: list[TraceTask]): with self.flask_app.app_context(): for task in tasks: + file_id = uuid4().hex trace_info = task.execute() task_data = { "app_id": task.app_id, "trace_info_type": type(trace_info).__name__, "trace_info": trace_info.model_dump() if trace_info else {}, } - process_trace_tasks.delay(task_data) + task_data = convert_datetime_to_str(task_data) + json_data = json.dumps(task_data, ensure_ascii=False).encode("utf-8") + file_path = f"{OPS_FILE_PATH}{task.app_id}/{file_id}.json" + storage.save(file_path, json_data) + file_info = { + "file_id": file_id, + "app_id": task.app_id, + } + process_trace_tasks.delay(file_info) diff --git a/api/core/ops/utils.py b/api/core/ops/utils.py index 3cd3fb5756..82401a9de8 100644 --- a/api/core/ops/utils.py +++ b/api/core/ops/utils.py @@ -43,3 +43,17 @@ def replace_text_with_content(data): return [replace_text_with_content(item) for item in data] else: return data + + +def convert_datetime_to_str(data): + if isinstance(data, dict): + for key, value in data.items(): + if isinstance(value, datetime): + data[key] = value.isoformat() + elif isinstance(value, dict): + data[key] = convert_datetime_to_str(value) + elif isinstance(value, list): + data[key] = [convert_datetime_to_str(item) if isinstance(item, dict | list) else item for item in value] + elif isinstance(data, list): + data = [convert_datetime_to_str(item) if isinstance(item, dict | list) else item for item in data] + return data diff --git a/api/tasks/ops_trace_task.py b/api/tasks/ops_trace_task.py index 260069c6e2..848bae25aa 100644 --- a/api/tasks/ops_trace_task.py +++ b/api/tasks/ops_trace_task.py @@ -1,17 +1,19 @@ +import json import logging -import time from celery import shared_task from flask import current_app +from core.ops.entities.config_entity import OPS_FILE_PATH from core.ops.entities.trace_entity import trace_info_info_map from core.rag.models.document import Document +from extensions.ext_storage import storage from models.model import Message from models.workflow import WorkflowRun @shared_task(queue="ops_trace") -def process_trace_tasks(tasks_data): +def process_trace_tasks(file_info): """ Async process trace tasks :param tasks_data: List of dictionaries containing task data @@ -20,9 +22,12 @@ def process_trace_tasks(tasks_data): """ from core.ops.ops_trace_manager import OpsTraceManager - trace_info = tasks_data.get("trace_info") - app_id = tasks_data.get("app_id") - trace_info_type = tasks_data.get("trace_info_type") + app_id = file_info.get("app_id") + file_id = file_info.get("file_id") + file_path = f"{OPS_FILE_PATH}{app_id}/{file_id}.json" + file_data = storage.load(file_path) + trace_info = json.loads(file_data).get("trace_info") + trace_info_type = json.loads(file_data).get("trace_info_type") trace_instance = OpsTraceManager.get_ops_trace_instance(app_id) if trace_info.get("message_data"): @@ -39,6 +44,7 @@ def process_trace_tasks(tasks_data): if trace_type: trace_info = trace_type(**trace_info) trace_instance.trace(trace_info) - end_at = time.perf_counter() except Exception: - logging.exception("Processing trace tasks failed") + logging.exception(f"Processing trace tasks failed, app_id: {app_id}") + finally: + storage.delete(file_path)