2881099/FreeRedis

请问有支持sharded pub sub的计划吗?

caoyang1024 opened this issue · 13 comments

Sharded Pub/Sub

From Redis 7.0, sharded Pub/Sub is introduced in which shard channels are assigned to slots by the same algorithm used to assign keys to slots. A shard message must be sent to a node that owns the slot the shard channel is hashed to. The cluster makes sure the published shard messages are forwarded to all nodes in the shard, so clients can subscribe to a shard channel by connecting to either the master responsible for the slot, or to any of its replicas. SSUBSCRIBE, SUNSUBSCRIBE and SPUBLISH are used to implement sharded Pub/Sub.

Sharded Pub/Sub helps to scale the usage of Pub/Sub in cluster mode. It restricts the propagation of messages to be within the shard of a cluster. Hence, the amount of data passing through the cluster bus is limited in comparison to global Pub/Sub where each message propagates to each node in the cluster. This allows users to horizontally scale the Pub/Sub usage by adding more shards.

请问有支持此功能的计划吗?
感谢

可以,这几天增加命令

v1.1.8 尝试一下吧,记得反馈

  • .SPublish
  • .PubSubShardChannels
  • .PubSubShardNumSub
  • .SSubscribe
  • .SUnSubscribe

上述方法,记得都尝试一下后反馈

我看到docs里面提到:
public static RedisClient cli = new RedisClient(
new ConnectionStringBuilder[] { "192.168.0.2:7001", "192.168.0.2:7002", "192.168.0.2:7003" }
); // assume 3 masters + 3 slaves
连接cluster要提供3个master的地址。

请问假如我只提供一个master node 地址,也是可行的吗?
谢谢

可行

9/6/2023 2:37:16 AM +00:00
Unhandled exception. System.ArgumentOutOfRangeException: Non-negative number required. (Parameter 'maxValue')
at System.Random.ThrowMaxValueMustBeNonNegative()
at System.Random.ThreadSafeRandom.Next(Int32 )
at FreeRedisTest.Program.Main(String[] args) in D:\Work\FreeRedisTest\Program.cs:line 44
at FreeRedisTest.Program.

(String[] args)

有遇到这个问题。

sample test code:

public class Program
{
    private static async Task Main(string[] args)
    {
        RedisClient cli = new(new ConnectionStringBuilder[]
        {
            "xxxxxxxx.amazonaws.com:6379,password=xxxxxxxxxxxxx"
        });

        List<string> priceList = new();

        for (int i = 0; i < 100; i++)
        {
            var price = new Price
            {
                Id = Guid.NewGuid(),
                Symbol = $"test{i}",
                Bid = 131323.32m,
                Ask = 131323.35m,
                BidVolume = 24242,
                AskVolume = 24432,
                Source = "fake",
            };

            priceList.Add(JsonSerializer.Serialize(price));
        }

        while (true)
        {
            for (int i = 0; i < 100; i++)
            {
                foreach (var price in priceList)
                {
                    await cli.PublishAsync("Price", price);

                    Console.WriteLine($"Price: {DateTimeOffset.UtcNow}");

                    await Task.Delay(Random.Shared.Next(400 - 600));
                }

                Console.WriteLine($"now: {DateTimeOffset.UtcNow}");
            }
            break;
        }
    }
}

public record Price
{
    public Guid Id { get; set; }
    public string Symbol { get; set; }
    public decimal Bid { get; set; }
    public decimal Ask { get; set; }
    public decimal BidVolume { get; set; }
    public decimal AskVolume { get; set; }
    public string Source { get; set; }
}

Unhandled exception. System.ArgumentOutOfRangeException: Non-negative number required. (Parameter 'maxValue')
at System.Random.ThrowMaxValueMustBeNonNegative()
at System.Random.ThreadSafeRandom.Next(Int32 )

是你的 Random.Shared.Next(400 - 600) 报错?

await _cli.SPublishAsync("Price", price);

FreeRedis.RedisServerException: 'MOVED 7599 127.0.0.1:7001'

这个是expected的吗

'MOVED 7599 127.0.0.1:7001'

集群报的错,你那边方便引用源码吗

方便进群,加我的QQ,提供一下测试服务器吗

我测一下几个方法。

'MOVED 7599 127.0.0.1:7001'

集群报的错,你那边方便引用源码吗

可以 我尽快测一下 (QQ没有了...)

weinxin: q2881099