diff --git a/api/migrations/versions/2024_11_12_0925-01d6889832f7_add_created_at_index_for_messages.py b/api/migrations/versions/2024_11_12_0925-01d6889832f7_add_created_at_index_for_messages.py new file mode 100644 index 0000000000..d94508edcf --- /dev/null +++ b/api/migrations/versions/2024_11_12_0925-01d6889832f7_add_created_at_index_for_messages.py @@ -0,0 +1,31 @@ +"""add_created_at_index_for_messages + +Revision ID: 01d6889832f7 +Revises: 09a8d1878d9b +Create Date: 2024-11-12 09:25:05.527827 + +""" +from alembic import op +import models as models +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '01d6889832f7' +down_revision = '09a8d1878d9b' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('messages', schema=None) as batch_op: + batch_op.create_index('message_created_at_idx', ['created_at'], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('messages', schema=None) as batch_op: + batch_op.drop_index('message_created_at_idx') + # ### end Alembic commands ### diff --git a/api/schedule/clean_messages.py b/api/schedule/clean_messages.py index 60de154b41..72ee2a8901 100644 --- a/api/schedule/clean_messages.py +++ b/api/schedule/clean_messages.py @@ -2,23 +2,32 @@ import datetime import time import click +from werkzeug.exceptions import NotFound import app from configs import dify_config -from core.rag.datasource.vdb.tidb_on_qdrant.tidb_service import TidbService from extensions.ext_database import db -from models.account import Tenant -from models.model import App, Message, MessageAgentThought, MessageAnnotation, MessageChain, MessageFeedback, MessageFile +from extensions.ext_redis import redis_client +from models.model import ( + App, + Message, + MessageAgentThought, + MessageAnnotation, + MessageChain, + MessageFeedback, + MessageFile, +) from models.web import SavedMessage from services.feature_service import FeatureService -from extensions.ext_redis import redis_client -from werkzeug.exceptions import NotFound + @app.celery.task(queue="dataset") def clean_messages(): click.echo(click.style("Start clean messages.", fg="green")) start_at = time.perf_counter() - plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING) + plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta( + days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING + ) page = 1 while True: try: @@ -35,7 +44,7 @@ def clean_messages(): if messages.items is None or len(messages.items) == 0: break for message in messages.items: - app = App.query.filter_by(id=message.app_id).first() + app = App.query.filter_by(id=message.app_id).first() features_cache_key = f"features:{app.tenant_id}" plan_cache = redis_client.get(features_cache_key) if plan_cache is None: @@ -52,12 +61,18 @@ def clean_messages(): db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete( synchronize_session=False ) - db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(synchronize_session=False) + db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete( + synchronize_session=False + ) db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete( synchronize_session=False ) - db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(synchronize_session=False) - db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(synchronize_session=False) + db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete( + synchronize_session=False + ) + db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete( + synchronize_session=False + ) db.session.query(Message).filter(Message.id == message.id).delete() db.session.commit() end_at = time.perf_counter()