add message clean task

This commit is contained in:
jyong 2024-11-12 17:27:47 +08:00
parent a4ab8f225c
commit 4f389dab9d
2 changed files with 56 additions and 10 deletions

View File

@ -0,0 +1,31 @@
"""add_created_at_index_for_messages
Revision ID: 01d6889832f7
Revises: 09a8d1878d9b
Create Date: 2024-11-12 09:25:05.527827
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '01d6889832f7'
down_revision = '09a8d1878d9b'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('messages', schema=None) as batch_op:
batch_op.create_index('message_created_at_idx', ['created_at'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('messages', schema=None) as batch_op:
batch_op.drop_index('message_created_at_idx')
# ### end Alembic commands ###

View File

@ -2,23 +2,32 @@ import datetime
import time import time
import click import click
from werkzeug.exceptions import NotFound
import app import app
from configs import dify_config from configs import dify_config
from core.rag.datasource.vdb.tidb_on_qdrant.tidb_service import TidbService
from extensions.ext_database import db from extensions.ext_database import db
from models.account import Tenant from extensions.ext_redis import redis_client
from models.model import App, Message, MessageAgentThought, MessageAnnotation, MessageChain, MessageFeedback, MessageFile from models.model import (
App,
Message,
MessageAgentThought,
MessageAnnotation,
MessageChain,
MessageFeedback,
MessageFile,
)
from models.web import SavedMessage from models.web import SavedMessage
from services.feature_service import FeatureService from services.feature_service import FeatureService
from extensions.ext_redis import redis_client
from werkzeug.exceptions import NotFound
@app.celery.task(queue="dataset") @app.celery.task(queue="dataset")
def clean_messages(): def clean_messages():
click.echo(click.style("Start clean messages.", fg="green")) click.echo(click.style("Start clean messages.", fg="green"))
start_at = time.perf_counter() start_at = time.perf_counter()
plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING) plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
)
page = 1 page = 1
while True: while True:
try: try:
@ -35,7 +44,7 @@ def clean_messages():
if messages.items is None or len(messages.items) == 0: if messages.items is None or len(messages.items) == 0:
break break
for message in messages.items: for message in messages.items:
app = App.query.filter_by(id=message.app_id).first() app = App.query.filter_by(id=message.app_id).first()
features_cache_key = f"features:{app.tenant_id}" features_cache_key = f"features:{app.tenant_id}"
plan_cache = redis_client.get(features_cache_key) plan_cache = redis_client.get(features_cache_key)
if plan_cache is None: if plan_cache is None:
@ -52,12 +61,18 @@ def clean_messages():
db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete( db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
synchronize_session=False synchronize_session=False
) )
db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(synchronize_session=False) db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete( db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
synchronize_session=False synchronize_session=False
) )
db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(synchronize_session=False) db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(synchronize_session=False) synchronize_session=False
)
db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(Message).filter(Message.id == message.id).delete() db.session.query(Message).filter(Message.id == message.id).delete()
db.session.commit() db.session.commit()
end_at = time.perf_counter() end_at = time.perf_counter()