import json import logging from typing import Optional, Union from configs import dify_config from core.tools.entities.api_entities import UserTool, UserToolProvider from core.tools.entities.common_entities import I18nObject from core.tools.entities.tool_bundle import ApiToolBundle from core.tools.entities.tool_entities import ( ApiProviderAuthType, ToolParameter, ToolProviderCredentials, ToolProviderType, ) from core.tools.provider.api_tool_provider import ApiToolProviderController from core.tools.provider.builtin_tool_provider import BuiltinToolProviderController from core.tools.provider.workflow_tool_provider import WorkflowToolProviderController from core.tools.tool.tool import Tool from core.tools.tool.workflow_tool import WorkflowTool from core.tools.utils.configuration import ToolConfigurationManager from models.tools import ApiToolProvider, BuiltinToolProvider, WorkflowToolProvider logger = logging.getLogger(__name__) class ToolTransformService: @staticmethod def get_tool_provider_icon_url(provider_type: str, provider_name: str, icon: str) -> Union[str, dict]: """ get tool provider icon url """ url_prefix = dify_config.CONSOLE_API_URL + "/console/api/workspaces/current/tool-provider/" if provider_type == ToolProviderType.BUILT_IN.value: return url_prefix + "builtin/" + provider_name + "/icon" elif provider_type in {ToolProviderType.API.value, ToolProviderType.WORKFLOW.value}: try: return json.loads(icon) except: return {"background": "#252525", "content": "\ud83d\ude01"} return "" @staticmethod def repack_provider(provider: Union[dict, UserToolProvider]): """ repack provider :param provider: the provider dict """ if isinstance(provider, dict) and "icon" in provider: provider["icon"] = ToolTransformService.get_tool_provider_icon_url( provider_type=provider["type"], provider_name=provider["name"], icon=provider["icon"] ) elif isinstance(provider, UserToolProvider): provider.icon = ToolTransformService.get_tool_provider_icon_url( provider_type=provider.type.value, provider_name=provider.name, icon=provider.icon ) @staticmethod def builtin_provider_to_user_provider( provider_controller: BuiltinToolProviderController, db_provider: Optional[BuiltinToolProvider], decrypt_credentials: bool = True, ) -> UserToolProvider: """ convert provider controller to user provider """ result = UserToolProvider( id=provider_controller.identity.name, author=provider_controller.identity.author, name=provider_controller.identity.name, description=I18nObject( en_US=provider_controller.identity.description.en_US, zh_Hans=provider_controller.identity.description.zh_Hans, pt_BR=provider_controller.identity.description.pt_BR, ja_JP=provider_controller.identity.description.ja_JP, ), icon=provider_controller.identity.icon, label=I18nObject( en_US=provider_controller.identity.label.en_US, zh_Hans=provider_controller.identity.label.zh_Hans, pt_BR=provider_controller.identity.label.pt_BR, ja_JP=provider_controller.identity.label.ja_JP, ), type=ToolProviderType.BUILT_IN, masked_credentials={}, is_team_authorization=False, tools=[], labels=provider_controller.tool_labels, ) # get credentials schema schema = provider_controller.get_credentials_schema() for name, value in schema.items(): result.masked_credentials[name] = ToolProviderCredentials.CredentialsType.default(value.type) # check if the provider need credentials if not provider_controller.need_credentials: result.is_team_authorization = True result.allow_delete = False elif db_provider: result.is_team_authorization = True if decrypt_credentials: credentials = db_provider.credentials # init tool configuration tool_configuration = ToolConfigurationManager( tenant_id=db_provider.tenant_id, provider_controller=provider_controller ) # decrypt the credentials and mask the credentials decrypted_credentials = tool_configuration.decrypt_tool_credentials(credentials=credentials) masked_credentials = tool_configuration.mask_tool_credentials(credentials=decrypted_credentials) result.masked_credentials = masked_credentials result.original_credentials = decrypted_credentials return result @staticmethod def api_provider_to_controller( db_provider: ApiToolProvider, ) -> ApiToolProviderController: """ convert provider controller to user provider """ # package tool provider controller controller = ApiToolProviderController.from_db( db_provider=db_provider, auth_type=ApiProviderAuthType.API_KEY if db_provider.credentials["auth_type"] == "api_key" else ApiProviderAuthType.NONE, ) return controller @staticmethod def workflow_provider_to_controller(db_provider: WorkflowToolProvider) -> WorkflowToolProviderController: """ convert provider controller to provider """ return WorkflowToolProviderController.from_db(db_provider) @staticmethod def workflow_provider_to_user_provider( provider_controller: WorkflowToolProviderController, labels: Optional[list[str]] = None ): """ convert provider controller to user provider """ return UserToolProvider( id=provider_controller.provider_id, author=provider_controller.identity.author, name=provider_controller.identity.name, description=I18nObject( en_US=provider_controller.identity.description.en_US, zh_Hans=provider_controller.identity.description.zh_Hans, ), icon=provider_controller.identity.icon, label=I18nObject( en_US=provider_controller.identity.label.en_US, zh_Hans=provider_controller.identity.label.zh_Hans, ), type=ToolProviderType.WORKFLOW, masked_credentials={}, is_team_authorization=True, tools=[], labels=labels or [], ) @staticmethod def api_provider_to_user_provider( provider_controller: ApiToolProviderController, db_provider: ApiToolProvider, decrypt_credentials: bool = True, labels: Optional[list[str]] = None, ) -> UserToolProvider: """ convert provider controller to user provider """ username = "Anonymous" try: username = db_provider.user.name except Exception as e: logger.exception(f"failed to get user name for api provider {db_provider.id}: {str(e)}") # add provider into providers credentials = db_provider.credentials result = UserToolProvider( id=db_provider.id, author=username, name=db_provider.name, description=I18nObject( en_US=db_provider.description, zh_Hans=db_provider.description, ), icon=db_provider.icon, label=I18nObject( en_US=db_provider.name, zh_Hans=db_provider.name, ), type=ToolProviderType.API, masked_credentials={}, is_team_authorization=True, tools=[], labels=labels or [], ) if decrypt_credentials: # init tool configuration tool_configuration = ToolConfigurationManager( tenant_id=db_provider.tenant_id, provider_controller=provider_controller ) # decrypt the credentials and mask the credentials decrypted_credentials = tool_configuration.decrypt_tool_credentials(credentials=credentials) masked_credentials = tool_configuration.mask_tool_credentials(credentials=decrypted_credentials) result.masked_credentials = masked_credentials return result @staticmethod def tool_to_user_tool( tool: Union[ApiToolBundle, WorkflowTool, Tool], credentials: Optional[dict] = None, tenant_id: Optional[str] = None, labels: Optional[list[str]] = None, ) -> UserTool: """ convert tool to user tool """ if isinstance(tool, Tool): # fork tool runtime tool = tool.fork_tool_runtime( runtime={ "credentials": credentials, "tenant_id": tenant_id, } ) # get tool parameters parameters = tool.parameters or [] # get tool runtime parameters runtime_parameters = tool.get_runtime_parameters() or [] # override parameters current_parameters = parameters.copy() for runtime_parameter in runtime_parameters: found = False for index, parameter in enumerate(current_parameters): if parameter.name == runtime_parameter.name and parameter.form == runtime_parameter.form: current_parameters[index] = runtime_parameter found = True break if not found and runtime_parameter.form == ToolParameter.ToolParameterForm.FORM: current_parameters.append(runtime_parameter) return UserTool( author=tool.identity.author, name=tool.identity.name, label=tool.identity.label, description=tool.description.human, parameters=current_parameters, labels=labels, ) if isinstance(tool, ApiToolBundle): return UserTool( author=tool.author, name=tool.operation_id, label=I18nObject(en_US=tool.operation_id, zh_Hans=tool.operation_id), description=I18nObject(en_US=tool.summary or "", zh_Hans=tool.summary or ""), parameters=tool.parameters, labels=labels, )