mirror of
https://github.com/langgenius/dify.git
synced 2024-11-16 03:32:23 +08:00
17fd773a30
Co-authored-by: -LAN- <laipz8200@outlook.com>
102 lines
3.6 KiB
Python
102 lines
3.6 KiB
Python
from typing import Optional, Union
|
|
|
|
from core.app.entities.app_invoke_entities import InvokeFrom
|
|
from extensions.ext_database import db
|
|
from libs.infinite_scroll_pagination import InfiniteScrollPagination
|
|
from models.account import Account
|
|
from models.model import App, EndUser
|
|
from models.web import PinnedConversation
|
|
from services.conversation_service import ConversationService
|
|
|
|
|
|
class WebConversationService:
|
|
@classmethod
|
|
def pagination_by_last_id(
|
|
cls,
|
|
app_model: App,
|
|
user: Optional[Union[Account, EndUser]],
|
|
last_id: Optional[str],
|
|
limit: int,
|
|
invoke_from: InvokeFrom,
|
|
pinned: Optional[bool] = None,
|
|
sort_by="-updated_at",
|
|
) -> InfiniteScrollPagination:
|
|
include_ids = None
|
|
exclude_ids = None
|
|
if pinned is not None:
|
|
pinned_conversations = (
|
|
db.session.query(PinnedConversation)
|
|
.filter(
|
|
PinnedConversation.app_id == app_model.id,
|
|
PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
|
|
PinnedConversation.created_by == user.id,
|
|
)
|
|
.order_by(PinnedConversation.created_at.desc())
|
|
.all()
|
|
)
|
|
pinned_conversation_ids = [pc.conversation_id for pc in pinned_conversations]
|
|
if pinned:
|
|
include_ids = pinned_conversation_ids
|
|
else:
|
|
exclude_ids = pinned_conversation_ids
|
|
|
|
return ConversationService.pagination_by_last_id(
|
|
app_model=app_model,
|
|
user=user,
|
|
last_id=last_id,
|
|
limit=limit,
|
|
invoke_from=invoke_from,
|
|
include_ids=include_ids,
|
|
exclude_ids=exclude_ids,
|
|
sort_by=sort_by,
|
|
)
|
|
|
|
@classmethod
|
|
def pin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
|
|
pinned_conversation = (
|
|
db.session.query(PinnedConversation)
|
|
.filter(
|
|
PinnedConversation.app_id == app_model.id,
|
|
PinnedConversation.conversation_id == conversation_id,
|
|
PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
|
|
PinnedConversation.created_by == user.id,
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if pinned_conversation:
|
|
return
|
|
|
|
conversation = ConversationService.get_conversation(
|
|
app_model=app_model, conversation_id=conversation_id, user=user
|
|
)
|
|
|
|
pinned_conversation = PinnedConversation(
|
|
app_id=app_model.id,
|
|
conversation_id=conversation.id,
|
|
created_by_role="account" if isinstance(user, Account) else "end_user",
|
|
created_by=user.id,
|
|
)
|
|
|
|
db.session.add(pinned_conversation)
|
|
db.session.commit()
|
|
|
|
@classmethod
|
|
def unpin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
|
|
pinned_conversation = (
|
|
db.session.query(PinnedConversation)
|
|
.filter(
|
|
PinnedConversation.app_id == app_model.id,
|
|
PinnedConversation.conversation_id == conversation_id,
|
|
PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
|
|
PinnedConversation.created_by == user.id,
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if not pinned_conversation:
|
|
return
|
|
|
|
db.session.delete(pinned_conversation)
|
|
db.session.commit()
|