from core.ops.ops_trace_manager import OpsTraceManager, provider_config_map from extensions.ext_database import db from models.model import App, TraceAppConfig class OpsService: @classmethod def get_tracing_app_config(cls, app_id: str, tracing_provider: str): """ Get tracing app config :param app_id: app id :param tracing_provider: tracing provider :return: """ trace_config_data: TraceAppConfig = db.session.query(TraceAppConfig).filter( TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider ).first() if not trace_config_data: return None # decrypt_token and obfuscated_token tenant_id = db.session.query(App).filter(App.id == app_id).first().tenant_id decrypt_tracing_config = OpsTraceManager.decrypt_tracing_config(tenant_id, tracing_provider, trace_config_data.tracing_config) if tracing_provider == 'langfuse' and ('project_key' not in decrypt_tracing_config or not decrypt_tracing_config.get('project_key')): project_key = OpsTraceManager.get_trace_config_project_key(decrypt_tracing_config, tracing_provider) decrypt_tracing_config['project_key'] = project_key decrypt_tracing_config = OpsTraceManager.obfuscated_decrypt_token(tracing_provider, decrypt_tracing_config) trace_config_data.tracing_config = decrypt_tracing_config return trace_config_data.to_dict() @classmethod def create_tracing_app_config(cls, app_id: str, tracing_provider: str, tracing_config: dict): """ Create tracing app config :param app_id: app id :param tracing_provider: tracing provider :param tracing_config: tracing config :return: """ if tracing_provider not in provider_config_map.keys() and tracing_provider: return {"error": f"Invalid tracing provider: {tracing_provider}"} config_class, other_keys = provider_config_map[tracing_provider]['config_class'], \ provider_config_map[tracing_provider]['other_keys'] default_config_instance = config_class(**tracing_config) for key in other_keys: if key in tracing_config and tracing_config[key] == "": tracing_config[key] = getattr(default_config_instance, key, None) # api check if not OpsTraceManager.check_trace_config_is_effective(tracing_config, tracing_provider): return {"error": "Invalid Credentials"} # get project key project_key = OpsTraceManager.get_trace_config_project_key(tracing_config, tracing_provider) # check if trace config already exists trace_config_data: TraceAppConfig = db.session.query(TraceAppConfig).filter( TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider ).first() if trace_config_data: return None # get tenant id tenant_id = db.session.query(App).filter(App.id == app_id).first().tenant_id tracing_config = OpsTraceManager.encrypt_tracing_config(tenant_id, tracing_provider, tracing_config) if tracing_provider == 'langfuse': tracing_config['project_key'] = project_key trace_config_data = TraceAppConfig( app_id=app_id, tracing_provider=tracing_provider, tracing_config=tracing_config, ) db.session.add(trace_config_data) db.session.commit() return {"result": "success"} @classmethod def update_tracing_app_config(cls, app_id: str, tracing_provider: str, tracing_config: dict): """ Update tracing app config :param app_id: app id :param tracing_provider: tracing provider :param tracing_config: tracing config :return: """ if tracing_provider not in provider_config_map.keys(): raise ValueError(f"Invalid tracing provider: {tracing_provider}") # check if trace config already exists current_trace_config = db.session.query(TraceAppConfig).filter( TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider ).first() if not current_trace_config: return None # get tenant id tenant_id = db.session.query(App).filter(App.id == app_id).first().tenant_id tracing_config = OpsTraceManager.encrypt_tracing_config( tenant_id, tracing_provider, tracing_config, current_trace_config.tracing_config ) # api check # decrypt_token decrypt_tracing_config = OpsTraceManager.decrypt_tracing_config(tenant_id, tracing_provider, tracing_config) if not OpsTraceManager.check_trace_config_is_effective(decrypt_tracing_config, tracing_provider): raise ValueError("Invalid Credentials") current_trace_config.tracing_config = tracing_config db.session.commit() return current_trace_config.to_dict() @classmethod def delete_tracing_app_config(cls, app_id: str, tracing_provider: str): """ Delete tracing app config :param app_id: app id :param tracing_provider: tracing provider :return: """ trace_config = db.session.query(TraceAppConfig).filter( TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider ).first() if not trace_config: return None db.session.delete(trace_config) db.session.commit() return True