This commit is contained in:
liuhaoran 2024-11-15 18:08:28 +08:00 committed by GitHub
commit 499dee02f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 41 additions and 10 deletions

View File

@ -68,3 +68,18 @@ class RedisConfig(BaseSettings):
description="Socket timeout in seconds for Redis Sentinel connections",
default=0.1,
)
REDIS_USE_CLUSTERS: bool = Field(
description="Enable Redis Clusters mode for high availability",
default=False,
)
REDIS_CLUSTERS: Optional[str] = Field(
description="Comma-separated list of Redis Clusters nodes (host:port)",
default=None,
)
REDIS_CLUSTERS_PASSWORD: Optional[str] = Field(
description="Password for Redis Clusters authentication (if required)",
default=None,
)

View File

@ -1,11 +1,12 @@
import redis
from redis.cluster import ClusterNode, RedisCluster
from redis.connection import Connection, SSLConnection
from redis.sentinel import Sentinel
from configs import dify_config
class RedisClientWrapper(redis.Redis):
class RedisClientWrapper:
"""
A wrapper class for the Redis client that addresses the issue where the global
`redis_client` variable cannot be updated when a new Redis instance is returned
@ -71,6 +72,12 @@ def init_app(app):
)
master = sentinel.master_for(dify_config.REDIS_SENTINEL_SERVICE_NAME, **redis_params)
redis_client.initialize(master)
elif dify_config.REDIS_USE_CLUSTERS:
nodes = [
ClusterNode(host=node.split(":")[0], port=int(node.split.split(":")[1]))
for node in dify_config.REDIS_CLUSTERS.split(",")
]
redis_client.initialize(RedisCluster(startup_nodes=nodes, password=dify_config.REDIS_CLUSTERS_PASSWORD))
else:
redis_params.update(
{

View File

@ -1,10 +1,12 @@
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch
import pytest
import redis
from core.entities.provider_entities import ModelLoadBalancingConfiguration
from core.model_manager import LBModelManager
from core.model_runtime.entities.model_entities import ModelType
from extensions.ext_redis import redis_client
@pytest.fixture
@ -38,6 +40,9 @@ def lb_model_manager():
def test_lb_model_manager_fetch_next(mocker, lb_model_manager):
# initialize redis client
redis_client.initialize(redis.Redis())
assert len(lb_model_manager._load_balancing_configs) == 3
config1 = lb_model_manager._load_balancing_configs[0]
@ -55,10 +60,11 @@ def test_lb_model_manager_fetch_next(mocker, lb_model_manager):
start_index += 1
return start_index
mocker.patch("redis.Redis.incr", side_effect=incr)
mocker.patch("redis.Redis.set", return_value=None)
mocker.patch("redis.Redis.expire", return_value=None)
with (
patch.object(redis_client, "incr", side_effect=incr),
patch.object(redis_client, "set", return_value=None),
patch.object(redis_client, "expire", return_value=None),
):
config = lb_model_manager.fetch_next()
assert config == config2

View File

@ -55,6 +55,9 @@ x-shared-env: &shared-api-worker-env
REDIS_SENTINEL_USERNAME: ${REDIS_SENTINEL_USERNAME:-}
REDIS_SENTINEL_PASSWORD: ${REDIS_SENTINEL_PASSWORD:-}
REDIS_SENTINEL_SOCKET_TIMEOUT: ${REDIS_SENTINEL_SOCKET_TIMEOUT:-0.1}
REDIS_CLUSTERS: ${REDIS_CLUSTERS:-}
REDIS_USE_CLUSTERS: ${REDIS_USE_CLUSTERS:-false}
REDIS_CLUSTERS_PASSWORD: ${REDIS_CLUSTERS_PASSWORD:-}
ACCESS_TOKEN_EXPIRE_MINUTES: ${ACCESS_TOKEN_EXPIRE_MINUTES:-60}
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://:difyai123456@redis:6379/1}
BROKER_USE_SSL: ${BROKER_USE_SSL:-false}