开发技术分享

  • 文章分类
    • 技术
    • 工作
    • 相册
    • 杂谈
    • 未分类
  • 工具
    • AI 试衣
靡不有初,鲜克有终
[换一句]
  1. 首页
  2. 技术
  3. 正文

Redis 高可用主线 - 分片集群

2023年5月5日 1444点热度

一、分片集群

分片集群(切片集群),就是指启动多个 Redis 实例组成一个集群,然后按照一定的规则,把收到的数据划分成多份,每一份用一个实例来保存。

在面向百万、千万级别的用户规模时,横向扩展的 Redis 分片集群会是一个非常好的选择。但是同时,又引申出来两个问题:

  1. 数据分片后,在多个实例之间如何分布?
  2. 客户端怎么确定想要访问的数据在哪个实例上?

从 3.0 开始,官方提供了一个名为 Redis Cluster 的方案,用于实现分片集群。Redis Cluster 方案中就规定了数据和实例的对应规则。

具体来说,Redis Cluster 方案采用哈希槽(Hash Slot,接下来我会直接称之为 Slot),来处理数据和实例之间的映射关系。在 Redis Cluster 方案中,一个分片集群共有 16384 个哈希槽(2^14),这些哈希槽类似于数据分区,每个键值对都会根据它的 key,被映射到一个哈希槽中。

具体的映射过程分为两大步:首先根据键值对的 key,按照 CRC16 算法计算一个 16 bit 的值;然后,再用这个 16bit 值对 16384 取模,得到 0~16383 范围内的模数,每个模数代表一个相应编号的哈希槽。

那么,这些哈希槽又是如何被映射到具体的 Redis 实例上的呢?

我们在部署 Redis Cluster 方案时,可以使用 cluster create 命令创建集群,此时,Redis 会自动把这些槽平均分布在集群实例上。例当然,我们也可以使用 cluster meet 命令手动建立实例间的连接,形成集群,再使用 cluster addslots 命令,指定每个实例上的哈希槽个数(比如 Redis 实例的内存大小配置不一的情况)。

在手动分配哈希槽时,需要把 16384 个槽都分配完,否则 Redis 集群无法正常工作。

Availability: Redis Cluster is able to survive partitions where the majority of the master nodes are reachable and there is at least one reachable replica for every master node that is no longer reachable. Moreover using replicas migration, masters no longer replicated by any replica will receive one from a master which is covered by multiple replicas.
当集群中大多数的 master 都存活,并且,断连的 master 都有对应的 slave 时,集群仍然可以正常提供服务。
通过 replicas migration,对于没有 slave 的 master 节点,其他拥有多个 slave 节点的 master,会分一个 slave 给它(orphan master)。

客户端如何定位数据?

客户端在本地通过计算 CRC16 就可以知道数据被分配在了哪一个 slot,但是,要进一步定位到实例,还需要知道哈希槽分布在哪个实例上。

  1. 在集群刚刚创建的时候,每个实例通过配置文件(或者命令中的参数)可以知道自己被分配了哪些哈希槽
  2. Redis 实例会把自己的哈希槽信息发给和它相连接的其它实例,来完成哈希槽分配信息的扩散。

当实例之间相互连接后,每个实例就有所有哈希槽的映射关系了;等到客户端和集群实例建立连接后,实例就会把哈希槽的分配信息发给客户端缓存起来。

但是,在集群中,实例和哈希槽的对应关系并不是一成不变的,最常见的变化有两个:

  1. 在集群中,实例有新增或删除,Redis 需要重新分配哈希槽;
  2. 为了负载均衡,Redis 需要把哈希槽在所有实例上重新分布一遍。

此时,实例之间还可以通过相互传递消息,获得最新的哈希槽分配信息,但是,客户端是无法主动感知这些变化的,为此 Redis Cluster 方案提供了一种重定向机制:

当客户端把一个键值对的操作请求发给一个实例时,如果这个实例上并没有这个键值对映射的哈希槽,那么,这个实例就会给客户端返回下面的 MOVED 命令响应结果,这个结果中就包含了新实例的访问地址。

GET hello:key
(error) MOVED 13320 172.16.19.5:6379

然后,客户端就会再次向新实例请求,同时还会更新本地缓存,把 slot 的对应关系更新过来。

在实际应用时,如果 Slot 中的数据比较多,就可能会出现一种情况:客户端发送请求时,Slot 中的数据只有一部分迁移到了新实例,还有部分数据没有迁移。在这种迁移部分完成的情况下,客户端就会收到一条 ASK 报错信息:

GET hello:key
(error) ASK 13320 172.16.19.5:6379

ASK 命令表示两层含义:第一,表明 Slot 数据还在迁移中;第二,ASK 命令把客户端所请求数据的最新实例地址返回给客户端,此时,客户端需要给实例 3 发送 ASKING 命令,然后再发送操作命令。

需要注意的是:和 MOVED 命令不同,ASK 命令并不会更新客户端缓存的哈希槽分配信息。所以,在上图中,如果客户端再次请求 Slot 中的数据,它还是会给原实例发送请求。这也就是说,ASK 命令的作用只是让客户端能给新实例发送一次请求,而不像 MOVED 命令那样,会更改本地缓存,让后续所有命令都发往新实例。

Hash Tag

上面的方法可以高效的处理单个 key 的映射问题,但是,在处理 multi-key 操作的时候,如果他们被分散在不同的实例上,就会影响操作效率(事实上 redis 并没有提供跨实例的 multi-key operation)。

不过 redis 提供了另一种机制:Hash Tag,来处理 multi-key operations:如果 key 中包含大括号{},那么只有括号中的内容会被用来计算 slot。(一些特殊情况,比如多组括号、括号中没有内容等,可以参考redis 官方文档)

比如:键 foo{user1000}bar 和 foo{user1000}tee 会被映射在同一个 slot 中

二、源码:

0. 补充说明

1. PFAIL 和 FAIL

redis 使用 pfail 和 fail 两个状态来标记异常的节点:

pfail(Possible failure):如果 node A 向 node B 发送 ping 命令,超过NODE_TIMEOUT时间之后,仍没有收到 pong 回复,则 A 将 B 标记为 pfail 状态。

fail:如果 node A 发现,集群中超过半数的节点,都认为 node B 是 pfail 的,node A 就会将 node B 标记为 fail 状态。

redis 集群中,每个节点都通过下面的结构,保存着其他节点的状态信息:

从图中可以看到,redis 的核心结构 redisServer 中保存着 cluster 信息,而 cluster 结构中有一个字典,保存着所有的 node 信息。每一个 node 中,都有flags(红色)和fail_report(蓝色)两个字段。

flags:我们假设图中当前节点(redisServer)是 A,那么它表示在 A 的眼中,node1 目前的状态;如果 A 发送 ping 给 node1,超过NODE_TIMEOUT时间之后,仍没有收到 pong 回复,就会给 flags 中增加 pfail 状态。

fail_reports:在 cluster 节点之间定期 ping-pong 的过程中,会交换自己眼中集群节点的状态信息;比如:当前节点 A 会通过 ping-pong 收到大家(node2,node3)眼中 node1 的状态信息;如果 A 发现,node2 认为 node1 是 pfail,A 就会往 node1 的 fail_report 链表中增加一个记录(这里看似是个链表,其实是当 map 用的,如果已经有了,就替换掉);如果 node2 认为 node1 状态是 ok 的,就会清理掉对应的记录(如果有的话)。redis 会检查这个 fail_report,如果它的长度超过了 n/2+1,就给flags增加 fail 状态,即就是:如果集群中超过半数的节点都认为 node1 是 pfail 的,A 就会将它标记为 fail 状态。然后 A 会通知集群中其他节点,其他节点在收到信息之后,也会将 node1 标记为 fail 状态。

2. 选举

currentEpoch:

新节点的初始值是 0,集群中节点在交换信息的过程中,如果发现数据包中的 currentEpoch 字段大于自己的,就会更新自己的 currentEpoch;通过这样的方式,整个集群中的 currentEpoch 值会逐渐趋于一致。

这个参数主要是用在故障转移的过程中,可以粗略理解为投票的轮数:当一个 slave 节点,在执行 clusterCron 的过程中,发现自己 master 故障(FAIL),slave 会把自己的 currentEpoch+1,然后向集群中其他的 master 发起投票请求(其他节点收到这个节点的任意数据包之后,就会更新自己本地的 currentEpoch,这样,集群中的 currentEpoch 就统一更新了)

configEpoch:

configEpoch 的默认初始值是 0,在 failover 过程中,当一个 slave 赢得了足够的选票时,它会先把自己的 configEpoch 设置为发起投票时候的 currentEpoch,然后再执行升主操作。

configEpoch 每次改变之后,都会保存到磁盘上的nodes.conf文件中(slave 的 configEpoch 是从它的 master 那里取得的);cluster 中的节点会通过 ping、pong、meet 等数据包交换它们的 configEpoch,其他节点收到新 master 的信息之后,发现它的 configEpoch 比以前大了,就会依据数据包中的信息来更新本地的信息(诸如 master 身份、slots 等)。

它的主要用途是:当集群中的节点对集群的认知出现差异时,更大的 configEpoch 会胜出(比如一个离线的 master 重新加入集群,它的 configEpoch 会更小,所以其他节点不会认可他)(再比如,如果同时有两个节点声称自己拥有 slot[1] 的所有权,configEpoch 更大的那一个节点会被大家认可)

一般情况下,两个 master 之间 configEpoch 不会重复(因为每一个 epoch,每个 master 只会投一次票,所以只会有一个节点胜出,并将这个 epoch 设置为自己的 configEpoch),但是特殊情况下,比如管理员手动 resharding 刚好遇到了集群选举,就有可能出现 configEpoch 相同的情况。当 configEpoch 相同时,nodeId 较小的那一个,会给自己的 confgEpoch 加一。

达到以下条件时,replica 节点就可以发起选举请求了:

  • replica 的 master 处于 pfail 状态
  • master 中包含有 slot
  • replica 和 master 的数据同步连接断开超过一定的时间

但 replica 并不是立刻发起选举,而是会等待一小段时间再发起选举,这样做是为了让 fail 状态在集群里充分的传播(否则,可能会短时间内发起多次选举,导致短时间内连续切换 master)

DELAY = 500 milliseconds + random delay between 0 and 500 milliseconds + REPLICA_RANK * 1000 milliseconds.
random 是为了让多个 replica 不要同时发起请求;REPLICA_RANK 是根据 replica 同步的 offset 得来的,某个 replica 的 offset 最接近 master,它的 REPLICA_RANK 就是 0,第二接近的 REPLICA_RANK 就是 1,以此类推。

首先:replica 会增加自己的 currentEpoch,然后通过广播 FAILOVER_AUTH_REQUEST 向其他的 master 请求投票,然后等待最多 NODE_TIMEOUT * 2 的时间。针对同一个 master 的故障转移(failover),其他每个 master 在 NODE_TIMEOUT * 2 时间内,只会投一次票。

这里只会投一次票的含义是:
假设 masterA 有两个从节点 replicaA1 和 replicaA2,如果 masterA 挂了,replicaA1 和 replicaA2 会向 masterB 发起投票请求;masterB 只会答复(投票给)其中的一个,对于另一个请求,会任由它超时。
与此同时,如果还有一个 masterC 有两个从节点 replicaC1 和 replicaC2,也向 masterB 发起投票请求,masterB 会查看自己的 lastVoteEpoch,如果一致,则不会投票,如果不一致,就会投票给 replicaC1 或 replicaC2 中的一个。

如果 reply 中的 epoch 小于 currentEpoch,replica 会忽略掉它,因为这表示这是上一轮选举。

如果在 NODE_TIMEOUT * 2 内,某个 replica 获得了超过半数 master 的选票,就说明他赢得了选举。如果任何 replica 都没有拿到足够的选票,则本轮投票无效,replica 会在 NODE_TIMEOUT * 4 时间后发起新一轮投票。

一旦某个 replica 赢得了选举,他将获得一个新的 configEpoch,这个 configEpoch 比其他所有的 master 都大。这之后,它就开始以 master 的身份来 ping-pong 交换数据了,数据包中带 configEpoch 以及它所承载的 slots

当其他 master 节点检测到这段 slots 换了主人,并且携带者更大的 configEpoch 的时候,就会更新自己的配置信息(upgrade their configuration)。

3. Replica migration

Redis 提供了一种 Replica migration 机制,来提高集群的可用性。它主要被用于处理这样的情况:如果一个集群中,某个 master 和它的 replica 都下线了,集群就无法正常提供服务了。

它的主要思路是:集群中每个节点都会检测是否有 orphaned master,当检测到有 orphaned master(即没有 replica 的 master)时,拥有最多数量 replica 并且 nodeId 最小的 master 就会将它的一个 replica 节点,转移给 orphaned master。

这个机制是通过配置文件中的这两个参数来控制的:

  1. cluster-allow-replica-migration:表示是否打开这个功能;
  2. cluster-migration-barrier:数字,表示至少拥有这么多存活的 replica 的 master 才允许执行 migration;

需要注意的是,这个过程是节点自发进行的,而不像选举一样在集群中达成共识,所以,当集群状态不稳定时,有可能出现两个 master 都分配 replica 给 orphaned master 的情况;但是这种情况其实是无害的,并不会对集群造成影响。

4. manual failover

手动故障转移,即管理员在 slave 节点手动发送 CLUSTER FAILOVER 让其晋升为 master:

  1. 用户手动给 slave 发送 CLUSTER FAILOVER 命令。mf_end(manual failover end)被初始化为未来的一个时间点,超过这个时间,本次 failover 就会被抛弃。
  2. slave 发送 MFSTART 消息给 master,让 master 暂停(pause)与 clients 的交互。master 成功 pause 之后,数据包中就会携带 CLUSTERMSG_FLAG0_PAUSED 字段。
  3. slave 等待 master 发送它的 replication offset(消息中携带 CLUSTERMSG_FLAG0_PAUSED)
  4. 如果 master 的 offset 和 slave 自身相同,说明数据已经同步了,mf_can_start 被设置为 1,表示可以开始 failover,之后就和普通的 failover 一样,通过 clusterHandleSlaveFailover 方法执行了(下文会详细介绍)。

1. 接收数据包

在 redis 的启动过程中,initServer 方法会针对 cluster mode 的节点,运行 clusterInit 方法

if (server.cluster_enabled) clusterInit();

在 clusterInit 方法中,通过下面的调用链,为 cluster 专用的 fd: server.cfd设置了一个监听器,每当有其他节点过来连接(或发送信息)的时候,都会通过 clusterReadHandler 来处理后续接收到的信息:

clusterReadHandler的代码如下:

/* 读数据.
 * 数据包的前面保存着数据包的长度信息. 当读取到了完整的数据包( whole packet)之后,
 * 会调用 clusterProcessPacket 方法处理数据 */
void clusterReadHandler(connection *conn) {
    clusterMsg buf[1];
    ssize_t nread;
    clusterMsg *hdr;
    clusterLink *link = connGetPrivateData(conn);
    unsigned int readlen, rcvbuflen;

    while(1) { /* Read as long as there is data to read. */
        rcvbuflen = link->rcvbuf_len;
        if (rcvbuflen < 8) {
            /* 首先,从前 8 bytes 读取数据包长度 */
            readlen = 8 - rcvbuflen;
        } else {
            /* Finally read the full message. */
            hdr = (clusterMsg*) link->rcvbuf;
            if (rcvbuflen == 8) {
                /* Perform some sanity check on the message signature
                 * and length. */
                if (memcmp(hdr->sig,"RCmb",4) != 0 ||
                    ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
                {
                    serverLog(LL_WARNING,
                        "Bad message length or signature received "
                        "from Cluster bus.");
                    handleLinkIOError(link);
                    return;
                }
            }
            readlen = ntohl(hdr->totlen) - rcvbuflen;
            if (readlen > sizeof(buf)) readlen = sizeof(buf);
        }

        nread = connRead(conn,buf,readlen);
        // 如果数据读完了,连接还没断,则继续等待
        if (nread == -1 && (connGetState(conn) == CONN_STATE_CONNECTED)) return; 

        if (nread <= 0) {
            /* I/O error... */
            serverLog(LL_DEBUG,"I/O error reading from node link: %s",
                (nread == 0) ? "connection closed" : connGetLastError(conn));
            handleLinkIOError(link);
            return;
        } else {
            /* 扩充空间,存储读到的数据 */
            size_t unused = link->rcvbuf_alloc - link->rcvbuf_len;
            if ((size_t)nread > unused) {
                size_t required = link->rcvbuf_len + nread;
                /* If less than 1mb, grow to twice the needed size, if larger grow by 1mb. */
                link->rcvbuf_alloc = required < RCVBUF_MAX_PREALLOC ? required * 2: required + RCVBUF_MAX_PREALLOC;
                link->rcvbuf = zrealloc(link->rcvbuf, link->rcvbuf_alloc);
            }
            memcpy(link->rcvbuf + link->rcvbuf_len, buf, nread);
            link->rcvbuf_len += nread;
            hdr = (clusterMsg*) link->rcvbuf;
            rcvbuflen += nread;
        }

        /* 如果拿到了全部的数据,就开始处理 */
        if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
            if (clusterProcessPacket(link)) {
                if (link->rcvbuf_alloc > RCVBUF_INIT_LEN) {
                    zfree(link->rcvbuf);
                    link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
                }
                link->rcvbuf_len = 0;
            } else {
                return; /* Link no longer valid. */
            }
        }
    }
}

所有的 packet 都包含下面的部分(common header):

  1. Node ID:node 创建的时候分配的 160 bit 随机字符串(会被写在硬盘上配置文件里,除非删掉文件,或者执行 cluster reset 命令,否则不会改变)
  2. currentEpoch 和 configEpoch: 用于支撑 Redis Cluster 的分布式算法(会在下面做详细介绍)。如果节点是 replica,它的 configEpoch 是 master 的 configEpoch.
  3. flags:包含着 node 的身份(master or replica)等其他信息。
  4. 包含着它自己的 slot 信息的 bitmap
  5. 发送方用于接收连接的 port(used by Redis to accept client commands.)
  6. 发送方用于交换 cluster 信息的 port
  7. 在发送方看来,目前的集群状态 (down or ok).
  8. 如果发送方是 replica,这里会携带 master node ID

2. 处理数据包

下来就是重要的clusterProcessPacket方法了,当前 node 接收到其他 node 发来的完整的消息以后,就会调用这个方法处理:

/* clusterLink:encapsulates(封装)everything needed to talk with a remote node. */
/* clusterMsg:本次通信传递的消息,包括消息本身和消息相关的元数据(长度、版本、sender、ip等等) */
/* clusterNode:代表 cluster 中的一个节点,上面的 link 和 msg 都保存有 node 的相关信息,
 * 也就是说从他们任何一个都可以找到 clusterNode */
int clusterProcessPacket(clusterLink *link) {
    clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
    uint32_t totlen = ntohl(hdr->totlen);
    uint16_t type = ntohs(hdr->type);
    mstime_t now = mstime();

    // 如果这是一个已知的命令
    if (type < CLUSTERMSG_TYPE_COUNT)
        // 统计各种命令收到的次数
        server.cluster->stats_bus_messages_received[type]++;
        // 消息类型如下:
        // #define CLUSTERMSG_TYPE_PING 0          /* Ping */
        // #define CLUSTERMSG_TYPE_PONG 1          /* Pong (reply to Ping) */
        // #define CLUSTERMSG_TYPE_MEET 2          /* Meet "let's join" message */
        // #define CLUSTERMSG_TYPE_FAIL 3          /* Mark node xxx as failing */
        // #define CLUSTERMSG_TYPE_PUBLISH 4       /* Pub/Sub Publish propagation */
        // #define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* May I failover? */
        // #define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6     /* Yes, you have my vote */
        // #define CLUSTERMSG_TYPE_UPDATE 7        /* Another node slots configuration */
        // #define CLUSTERMSG_TYPE_MFSTART 8       /* Pause clients for manual failover */
        // #define CLUSTERMSG_TYPE_MODULE 9        /* Module cluster API message. */
        // #define CLUSTERMSG_TYPE_PUBLISHSHARD 10 /* Pub/Sub Publish shard propagation */
        // #define CLUSTERMSG_TYPE_COUNT 11        /* Total number of message types. */
    ······
    // 校验 cluster 协议版本(目前 cluster 协议版本默认都是 1)
    if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
        return 1;
    }

    uint16_t flags = ntohs(hdr->flags);
    uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
    clusterNode *sender;

    ······
    // 此处省略掉一大段代码,主要是校验各种类型(type)的消息,其消息体的长度是否合法
    ······

    // 查找 sender:根据 node name(clusterMsg->sender)去
    // server.cluster->nodes 字典中查询
    sender = clusterLookupNode(hdr->sender);

    /* 更新最后与这个 node 交互的时间,这是用来检测 timeout 的标志 */
    if (sender) sender->data_received = now;

    // sender 已知 && 已经和 sender 发送过至少一次 ping-pong
    if (sender && !nodeInHandshake(sender)) {

        senderCurrentEpoch = ntohu64(hdr->currentEpoch);
        senderConfigEpoch = ntohu64(hdr->configEpoch);

        /* 如果对方的 currentEpoch 更大,则更新本地 currentEpoch */
        if (senderCurrentEpoch > server.cluster->currentEpoch)
            server.cluster->currentEpoch = senderCurrentEpoch;

        /* 更新这个 sender 的 configEpoch */
        if (senderConfigEpoch > sender->configEpoch) {
            sender->configEpoch = senderConfigEpoch;
            // configEpoch 发生了变化,fsync 到磁盘上(nodes.conf)
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
        }

        /* 更新节点的 replication offset  */
        sender->repl_offset = ntohu64(hdr->offset);
        sender->repl_offset_time = now;

        // 这里先介绍下手动把slave升主(manual failover)的流程:
        // 1. 用户手动给 slave 发送 CLUSTER FAILOVER 命令。mf_end(manual failover end)被初始化为未来的一个时间点,超过这个时间,failover 就会被抛弃。
        // 2. slave 发送 MFSTART 消息给 master,让 master 暂停(pause)与 clients 的交互。master 成功 pause 之后,数据包中就会携带 CLUSTERMSG_FLAG0_PAUSED 字段。
        // 3. slave 等待 master 发送它的 replication offset(消息中携带 CLUSTERMSG_FLAG0_PAUSED)
        // 4. 如果 master 的 offset 和 slave 自身相同,mf_can_start 被设置为 1,表示可以开始 failover,之后就和普通的 failover 一样,通过 clusterHandleSlaveFailover 方法执行了。

        /* 如果当前节点是 slave,master 已经 paused(不再接受客户端请求),且发来了 mf_offset
           则更新 mf 状态*/
        if (server.cluster->mf_end &&  // 没有正在执行中的 mf
            nodeIsSlave(myself) &&
            myself->slaveof == sender &&
            hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&  // 表示 master 已经做好准备,可以开始 failover 了
            server.cluster->mf_master_offset == -1)  // master 尚未发送给 slave 执行 mf 的 offset
        {
            // 更新 offset
            server.cluster->mf_master_offset = sender->repl_offset;
            // 在下个循环开始前执行 mf
            // 在这里标记之后,有两个方式都可以触发后续 mf 流程,一是beforeSleep,二是clusterCron
            clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER);
            serverLog(LL_WARNING,
                "Received replication offset for paused master manual failover: %lld",
                server.cluster->mf_master_offset);
        }
    }

    /* PING, MEET: 答复 PONG */
    // 注意,发送 PONG 并不一定是因为接收到了 PING
    // 当一个节点需要广播自己的配置信息的时候,也有可能直接发送 PONG 命令
    // 事实上,ping 和 pong 的数据包除了 type 字段不同,其他部分完全相同
    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
        serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);

        /* 使用 MEET 消息,来更新自身的 ip 地址。通过 socket 信息来发现/更新自己的 ip,
         * 总比硬编码写在配置文件里好一些。
         * 还有一种情况:如果我们的 ip 还没有确定,我们也会从 PING 包里取,即便它是错的,我们也可以从后续的
         * MEET 包里拿到正确的地址。
         */
        if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
            server.cluster_announce_ip == NULL)
        {
            char ip[NET_IP_STR_LEN];

            if (connSockName(link->conn,ip,sizeof(ip),NULL) != -1 &&
                strcmp(ip,myself->ip))
            {
                memcpy(myself->ip,ip,NET_IP_STR_LEN);
                serverLog(LL_WARNING,"IP address for this node updated to %s",
                    myself->ip);
                // 把配置信息写到本地(即就是 CLUSTER NODES 命令得到的结果)
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
            }
        }

        /* 如果这是一个发送 MEET 的新节点,把他添加到 node dict 中。
         * 这里只是简单的构造出 node 结构(录入 ip、port,nodeName 会暂时用一个随机字符串替代)
         * 然后添加到 dict 中,其他信息会在后续的 ping-pong 中,通过接收 PONG 来获取 */
        if (!sender && type == CLUSTERMSG_TYPE_MEET) {
            clusterNode *node;

            node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
            nodeIp2String(node->ip,link,hdr->myip);
            node->port = ntohs(hdr->port);
            node->pport = ntohs(hdr->pport);
            node->cport = ntohs(hdr->cport);
            clusterAddNode(node);
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
        }

        // 处理 MEET 命令携带的 gossip 数据,包括获取对方 POV 中集群的状态、
        // 更新 pong_received 时间、更新其他节点的地址等;这个方法会在下面详细介绍。
        if (!sender && type == CLUSTERMSG_TYPE_MEET)
            clusterProcessGossipSection(hdr,link);

        /* 答复 PONG */
        clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
    }

    /* PING, PONG, MEET: 处理这三类命令中携带的配置信息. */
    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
        type == CLUSTERMSG_TYPE_MEET)
    {
        serverLog(LL_DEBUG,"%s packet received: %.40s",clusterGetMessageTypeString(type),
            link->node ? link->node->name : "NULL");

        if (link->node) {
            // nodeInHandshake 方法:校验 link->node 的 flag 里面的标志位,就可以知道它是不是正在做第一次 ping-pong
            if (nodeInHandshake(link->node)) {
                /* 如果这个 node 已知,尝试更新它的 ip/port */
                if (sender) {
                    serverLog(LL_VERBOSE,
                        "Handshake: we already know node %.40s, "
                        "updating the address if needed.", sender->name);
                    if (nodeUpdateAddressIfNeeded(sender,link,hdr))
                    {
                        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                             CLUSTER_TODO_UPDATE_STATE);
                    }
                    /* Free this node as we already have it. This will
                     * cause the link to be freed as well. */
                    clusterDelNode(link->node);
                    return 0;
                }

                /* 上面说过,第一次 ping-pong 完成之前,node name 都是随机字符串代替的,
                   在 handshake 过程中,需要更新节点名称 node name */
                clusterRenameNode(link->node, hdr->sender);
                serverLog(LL_DEBUG,"Handshake with node %.40s completed.",link->node->name);
                link->node->flags &= ~CLUSTER_NODE_HANDSHAKE; // 取消“握手中”标志位
                link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
            } else if (memcmp(link->node->name,hdr->sender,
                        CLUSTER_NAMELEN) != 0)
            {
                /* 到了这里,说明第一次 ping-pong 已经完成了。
                 * 如果 node ID(node name)对不上号,则断开这个 node 的连接,
                 * 将它标记为没有可用 address 的状态 */
                // 说人话就是:我们向 ip:port 发送 ping,(我们认为它是 nodeA),然而回答里的签名是 nodeB
                // 我们就释放掉这个 node,因为它的信息是错误的
                serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
                    link->node->name,(int)(now-(link->node->ctime)),link->node->flags);
                link->node->flags |= CLUSTER_NODE_NOADDR;
                link->node->ip[0] = '\0';
                link->node->port = 0;
                link->node->pport = 0;
                link->node->cport = 0;
                freeClusterLink(link);
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
                return 0;
            }
        }

        // 更新 CLUSTER_NODE_NOFAILOVER flag,当有这个 flag 时,即便 master 挂了,
        // 这个 slave 也不会自动晋升为 master(管理员手动执行命令是可以打破这个策略的)
        // 这个参数的来源是配置文件中的 cluster-replica-no-failover。
        if (sender) {
            int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
            sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
            sender->flags |= nofailover;
        }

        /* 接收到 ping 命令:update address */
        if (sender && type == CLUSTERMSG_TYPE_PING &&
            !nodeInHandshake(sender) &&
            // 从 clusterMsg->myip 或者从 clusterLink->fd 中拿到最新的 ip/port,
            // 如果和 node->ip/port 不一致,就更新 node->ip/port,并释放 link。
            // 如果 sender 是自己的 master,还需要改变 replicateof 配置。
            nodeUpdateAddressIfNeeded(sender,link,hdr))
        {
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                 CLUSTER_TODO_UPDATE_STATE);
        }

        /* 接收到 pong 命令,Update our info about the node */
        if (!link->inbound && type == CLUSTERMSG_TYPE_PONG) {
            link->node->pong_received = now;
            link->node->ping_sent = 0;

            /* 收到 PONG,说明 node 存活,清理掉临时的 PFAIL 状态(possible fail) */

            // 查看 flag 中是否有 CLUSTER_NODE_PFAIL 标志,如果有的话就清理掉 pfail(possible fail) 标志
            if (nodeTimedOut(link->node)) {
                link->node->flags &= ~CLUSTER_NODE_PFAIL;
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);

            // 即便已经 FAIL 了,也是有救活的可能的
            } else if (nodeFailed(link->node)) {
                // 当某个 node 被标记为 fail 状态,而我们又收到了它的 pong
                // 就会调用这个方法。当下面任意一条满足时,会清理掉 fail 状态:
                // 1. 它是一个 replica
                // 2. 未持有 slots
                // 3. 是 master 节点,且持有 slots,且离线足够长时间(2 * nodeTimeOut)
                // 第三条的意思是:Apparently no one is going to fix these slots after some time
                // 所以当它又活过来的时候,我们就把它加回集群。
                clearNodeFailureIfNeeded(link->node);
            }
        }

        /* 检测角色切换: slave -> master 或 master -> slave. */
        if (sender) {
            if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
                sizeof(hdr->slaveof)))
            {
                /* slaveof 为空,说明是 master. */
                clusterSetNodeAsMaster(sender);
            } else {
                /* slave 节点, 先根据 node name 查找它的 master */
                clusterNode *master = clusterLookupNode(hdr->slaveof);

                if (nodeIsMaster(sender)) {
                    // 本地的信息表示它是 master,他自己汇报自己是 slave
                    // 说明 master 转换成了 slave

                    // 清理保存 slot 信息的 bitmap:server.cluster->slots[?]
                    clusterDelNodeSlots(sender);
                    sender->flags &= ~(CLUSTER_NODE_MASTER| CLUSTER_NODE_MIGRATE_TO);
                    sender->flags |= CLUSTER_NODE_SLAVE;

                    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                         CLUSTER_TODO_UPDATE_STATE);
                }

                /* 这个 slave 汇报的 master 与本地记录不一致,更新本地信息 */
                if (master && sender->slaveof != master) {
                    if (sender->slaveof)
                        clusterNodeRemoveSlave(sender->slaveof,sender);
                    clusterNodeAddSlave(master,sender);
                    sender->slaveof = master;
                    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
                }
            }
        }

        /* 更新 slots 信息*/

        /* 如果本地的 slots 信息和 clusterMsg 中不一致时,需要做一系列的检测: */

        clusterNode *sender_master = NULL; /* Sender or its master if slave. */
        // 注意这里对 dirty_slot 的定义
        int dirty_slots = 0; /* Sender claimed slots don't match my view? */

        if (sender) {
            sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
            if (sender_master) {
                // 比较两个 bitmap,确认是否有 dirty_slot
                // unsigned char myslots[CLUSTER_SLOTS/8] 其实是一个字符串,c++ 中
                // 一个 char 占一个字节,所以这里 myslots 字符串长度是 16384/8
                dirty_slots = memcmp(sender_master->slots,
                        hdr->myslots,sizeof(hdr->myslots)) != 0;
            }
        }

        /* 1) 如果 sender 是 master, 并且和本地的 slot 信息不一致,判断是否要更新本地信息 */
        if (sender && nodeIsMaster(sender) && dirty_slots)
            // 这个方法会遍历 sender 发送的 slot bitmap(hdr->myslots),
            // 依据 hdr 的新旧程度(senderConfigEpoch)和 slot 信息,可能会:
            // 1. 清理不再持有的 slot 以及这部分 slot 中的数据
            // 2. 绑定 slot 给 sender
            // 3. 本机(或者本机的 master)的 slot 被清空的话,就认 sender 为新的 master
            // 下文会展开读这个方法的源码。
            clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);

        /* 2) 相反的,如果 sender 的 configEpoch 更小,那么 sender 说的就不算数了。
         *    这时候我们就需要通知 sender。
         *
         * 这一步是很有用的,因为当 partition 恢复的时候,可能会有一个重新出现的 master,
         * 这时候它的 configEpoch 就很小。比如:
         *
         * A and B are master and slave for slots 1,2,3.
         * A is partitioned away, B gets promoted.
         * B is partitioned away, and A returns available.
         *
         * 通常来说,B 会 PING A 发送它的 slots 和 configEpoch,但是他们之间已经无法通信了,
         * 这时候其他的节点就应该去告知 A,然后 A 会停止 master 角色(或者尝试发起 failover)
          */
        if (sender && dirty_slots) {
            int j;

            for (j = 0; j < CLUSTER_SLOTS; j++) {
                if (bitmapTestBit(hdr->myslots,j)) {
                    // 归属权没有争议的 slot,continue
                    if (server.cluster->slots[j] == sender ||
                        server.cluster->slots[j] == NULL) continue;
                    //
                    if (server.cluster->slots[j]->configEpoch >
                        senderConfigEpoch)
                    {
                        // 通知 sender:你的 slots 信息是错的,正确的应该是xxx
                        serverLog(LL_VERBOSE,
                            "Node %.40s has old slots configuration, sending "
                            "an UPDATE message about %.40s", sender->name, server.cluster->slots[j]->name);
                        clusterSendUpdate(sender->link,
                            server.cluster->slots[j]);

                        // 从作者的注释来看,这里似乎留了个小尾巴
                        // 感觉改成 continue 似乎就可以了
                        /* TODO: instead of exiting the loop send every other
                         * UPDATE packet for other nodes that are the new owner
                         * of sender's slots. */
                        break;
                    }
                }
            }
        }

        /* 如果我们的 config epoch 和 sender 一样,尝试修复 */
        if (sender && nodeIsMaster(myself) && nodeIsMaster(sender) &&
            senderConfigEpoch == myself->configEpoch)
        {   // 这个方法的内容比较简单,但是注释非常长。
            //
            // BACKGROUND:
            //
            // 首先,在 failover 过程中,不同的 slave 之间 config epoch 是不会重复的,因为选举需要
            // 取得大多数 master 的选票。
            // 但是,在手动 resharding 的时候,node 会自己生成新的 config epoch(而不会和其他节点协商)。
            // 通常情况下,手动 resharding 都是在集群稳定的时候进行的,但也有可能恰好发生了 failover,
            // resharding 节点的 config epoch 还没来得及广播,这时候就可能发生冲突。
            //
            // Moreover,当 cluster 创建的时候,所有的节点都使用同一个 config epoch,
            // 这个方法可以自动解决冲突(让所有 node 的 configEpoch 都不相同)
            //
            // 一般情况下(手动操作错误、软件 bugs)导致的 config epoch 冲突是无害的,不过
            // 一旦在 failover 过程中出现就会造成无法预料的影响
            //
            // BEHAVIOR:
            //
            // 如果当前节点的 nodeId 字典排序较小,当前节点就会将集群中最大的 configEpoch + 1
            // 作为自己的 configEpoch。
            //
            // 这样就保证了,即便有多个节点同时冲突,最终也能使集群稳定下来。
            clusterHandleConfigEpochCollision(sender);
        }

        /* 处理 gossip section */
        if (sender) {
            // 处理 ping、pong 命令的中的 gossip 数据,遍历 clusterMsgDataGossip,
            // 其中保存着 sender 眼中,集群里其他节点的状态:
            // a. node 已知(根据 node name 可以从 node 字典中查到):
            //    1. 如果 sender 是 master ,则处理 fail_report(新增或删除),有必要的话标记某个节点为 FAIL 状态
            //    2. 这如果 sender 状态正常,则更新 pong_received
            //    3. 如果 sender 状态异常(fail|pfail),且 addr 发生变化,则更新 addr,断开原来的连接 clusterLink
            //       这样的话,cron 的时候就会重新建立连接了
            // b. node 未知
            //    1. 考虑把 node 加入字典;但是这里不能立刻去连接它的 IP/PORT,因为 IP/PORT 有可能被重用,
            //       这时候我们就有可能加入到别的集群中去。
            clusterProcessGossipSection(hdr,link);
        }
    } else if (type == CLUSTERMSG_TYPE_FAIL) {
        /* CLUSTERMSG_TYPE_FAIL:用于把某个节点标记位为 fail */

        clusterNode *failing;

        if (sender) {
            failing = clusterLookupNode(hdr->data.fail.about.nodename);
            if (failing &&
                !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
            {
                serverLog(LL_NOTICE,
                    "FAIL message received from %.40s about %.40s",
                    hdr->sender, hdr->data.fail.about.nodename);
                // 收到了 fial report,将其中的节点标记为 fail
                failing->flags |= CLUSTER_NODE_FAIL;
                failing->fail_time = now;
                failing->flags &= ~CLUSTER_NODE_PFAIL;
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
            }
        } else {
            serverLog(LL_NOTICE,
                "Ignoring FAIL message from unknown node %.40s about %.40s",
                hdr->sender, hdr->data.fail.about.nodename);
        }
    } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
        robj *channel, *message;
        uint32_t channel_len, message_len;

        /* 如果没有订阅者,就没必要创建对象了 */
        if (dictSize(server.pubsub_channels) ||
           dictSize(server.pubsub_patterns))
        {
            ......
            // 找到监听 channel 的 client,发送 message
            pubsubPublishMessage(channel,message);
            ......
        }
    } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
        // 有 slave 发起升主请求,投票
        if (!sender) return 1;  /* We don't know that node. */
        clusterSendFailoverAuthIfNeeded(sender,hdr);

    } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
        // 有人给我们投票

        if (!sender) return 1;  /* We don't know that node. */

        /* 只处理满足下列要求的节点:
         * 1. master 节点,并且持有 slots
         * 2. senderCurrentEpoch ≥ 当前节点发起选举时候的 server.cluster->currentEpoch */
        if (nodeIsMaster(sender) && sender->numslots > 0 &&
            senderCurrentEpoch >= server.cluster->failover_auth_epoch)
        {
            // 票数+1
            server.cluster->failover_auth_count++;
            // 检查票数是否达到阈值
            clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
        }
    } else if (type == CLUSTERMSG_TYPE_MFSTART) {
        /* 只有当前节点是 master,而且发送方是自己的 slave 的时候,才是有效的 */
        if (!sender || sender->slaveof != myself) return 1;
        /* 初始化参数 */
        resetManualFailover();
        server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
        server.cluster->mf_slave = sender;
        pauseClients(PAUSE_DURING_FAILOVER,
                     now + (CLUSTER_MF_TIMEOUT * CLUSTER_MF_PAUSE_MULT),
                     CLIENT_PAUSE_WRITE);
        serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
            sender->name);
        /* 这里发送 ping 主要是为了提升 failover 的速度。核心目的是为了发送 repl_offset,
         * 以便 slave 检查数据是否同步完成,是否可以开始发起投票(server.cluster->mf_can_start 设为 1)。
         * (至于 repl_offset 是如何起作用的,可以全文搜索 “Received replication offset for paused” 看看) */
        clusterSendPing(link, CLUSTERMSG_TYPE_PING);
    } else if (type == CLUSTERMSG_TYPE_UPDATE) {
        clusterNode *n;
        uint64_t reportedConfigEpoch = ntohu64(hdr->data.update.nodecfg.configEpoch);

        // 找到 sender
        if (!sender) return 1;  /* We don't know the sender. */
        n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
        if (!n) return 1;   /* We don't know the reported node. */

        // configEpoch 没变,说明没有需要更新的东西
        if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */

        // 什么时候会发送 UPDATE:A节点收到B节点的 ping,发现 ping 中携带的 slots 信息过期了(slots不一致且configEpoch小),
        //                       A就会给B发送 UPDATE
        // 即 UPDATE 更新的是 slot 信息,更新的是 server.cluster->slots[x](clusterNode),这里存的一定是 master 节点,
        // 所以发送方发送的时候填写的 nodename 是 master 的,
        // 所以这里 n 应该是 master 节点,如果不是,说明本地配置有误,需要修正。
        /* If in our current config the node is a slave, set it as a master. */
        if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);

        /* 更新 configEpoch. */
        n->configEpoch = reportedConfigEpoch;
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                             CLUSTER_TODO_FSYNC_CONFIG);

        /* 更新 slots */
        clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
            hdr->data.update.nodecfg.slots);

    } else if (type == CLUSTERMSG_TYPE_MODULE) {
        // module 的代码先不看
    } else {
        serverLog(LL_WARNING,"Received unknown packet type: %d", type);
    }
    return 1;
}

更新slot信息

clusterUpdateSlotsConfigWith

/* 当收到 PING, PONG 或 UPDATE 包时. 其中包括了 node, node configEpoch, slots claimed under this configEpoch.
 *
 * 我们可能需要据此更新本地记录的 slot 信息。
 * 同时,如果有需要的话,会把自身变为对方的 slave。
 *
 * The 'sender' is the node for which we received a configuration update.
 * Sometimes it is not actually the "Sender" of the information, like in the
 * case we receive the info via an UPDATE packet. */
void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
    int j;
    clusterNode *curmaster = NULL, *newmaster = NULL;
    /* Dirty slots list 中包含着我们失去所有权、但是仍然有存有数据的 slot。
     * 这通常是由于 failover 或者管理员手动更改了 cluster 配置引起的。
     *
     * 如果消息的内容还不足以将 master 降为 slave,我们需要把失去所有权的 slots 中的 key 都删掉
     * (因为这种情况下,我们会去向 master resync 整个 key space,所以不会有问题)
     * If the update message is not able to demote a master to slave (in this
     * case we'll resync with the master updating the whole key space), we
     * need to delete all the keys in the slots we lost ownership. */
    uint16_t dirty_slots[CLUSTER_SLOTS];
    int dirty_slots_count = 0;

    /* 需要确定 sender 是否是我们的新 master。
     * 确定:
     * 1. 我们的 slots 是否都迁移到 sender 了
     * 2. sender 中除了我们的 slots 之外,没有别的 slots 了 */
    int sender_slots = 0;
    int migrated_our_slots = 0;

    curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;

    if (sender == myself) {
        serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
        return;
    }

    for (j = 0; j < CLUSTER_SLOTS; j++) {
        if (bitmapTestBit(slots,j)) {
            sender_slots++;

            /* The slot is already bound to the sender of this message. */
            if (server.cluster->slots[j] == sender) continue;

            /* 如果这些 slots 将要被迁移过来,continue
             * (这个场景通常是管理员通过 redis-trib 手动触发的). */
            if (server.cluster->importing_slots_from[j]) continue;

            /* 当这个 slot 未分配或发送方的 configEpoch 大于原本的 configEpoch,
             * 本地绑定这个 slot 给消息发送方
             * */
            if (server.cluster->slots[j] == NULL ||
                server.cluster->slots[j]->configEpoch < senderConfigEpoch)
            {
                /* 小丑竟是我自己,如果 slot 中还有 key,则标记为 dirty slot */
                if (server.cluster->slots[j] == myself &&
                    countKeysInSlot(j) &&
                    sender != myself)
                {
                    dirty_slots[dirty_slots_count] = j;
                    dirty_slots_count++;
                }

                /* 本地信息中,拥有这个 slot 的是 currMaster,(而且本地的 configEpoch 小于发送方的 configEpoch)
                 * 则把发送方当做新的 master */
                if (server.cluster->slots[j] == curmaster) {
                    newmaster = sender;
                    migrated_our_slots++;
                }

                // server.cluster->slots[j] = NULL
                clusterDelSlot(j);
                // server.cluster->slots[j] = sender;
                // 这个方法的实现有点意思,和 replica migration 有关,曾经还出过 bug,感兴趣的话可以细看下。
                clusterAddSlot(sender,j);
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                     CLUSTER_TODO_UPDATE_STATE|
                                     CLUSTER_TODO_FSYNC_CONFIG);
            }
        }
    }

    ......

    if (newmaster && curmaster->numslots == 0 &&
            // 配置文件中的配置,允许自动迁移
            (server.cluster_allow_replica_migration ||
            // sender 声明的所有 slots 当前都在 currMaster 手上
             sender_slots == migrated_our_slots)) {

        /* 整合一下上面的条件,这里就等于是说:
         *  假设当前节点共有 n 个 slots, 根据 clusterMsg 中的信息可知,这 n 个 slots 全部都迁移给了 sender,
         *  而且 ( cluster_allow_replica_migration 配置打开 || sender 中只有这 n 个 slots) */

        /* 再通俗一点说,
         * 当前节点的 slots 全转走了
         * 而且 (cluster_allow_replica_migration 配置打开 || 当前节点和 sender (的slots)发生了等价置换) */

        /* 再再通俗一点说,
         * 当前节点的 slots 全转走了,
         * 要么是:cluster_allow_replica_migration 配置打开,我重新认主
         * 要么是:当前节点和 sender 发生了等价置换,我重新认主 */

        serverLog(LL_WARNING,
            "Configuration change detected. Reconfiguring myself "
            "as a replica of %.40s", sender->name);

        // 自己已经是个空壳了,重新认主
        clusterSetMaster(sender);
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                             CLUSTER_TODO_UPDATE_STATE|
                             CLUSTER_TODO_FSYNC_CONFIG);
    } else if (dirty_slots_count) {

        /* 如果走到这里,说明我们收到了 update message,拿走了一部分当前节点的 slots
         * 但是当前节点还有别的 slots 存在,所以依然是 master.
         *
         * 所以这里需要删除掉失去所有权的 slots 中,所有的 key.
         * This is an optimization since the replication code will anyway
         * flush all the instance data in a faster way. */
        for (j = 0; j < dirty_slots_count; j++)
            delKeysInSlot(dirty_slots[j]);

        // todo 这里有个问题,key value 是何时迁移的,这里直接 del 不会丢数据吗??
    }
}

投票

/* Vote for the node asking for our vote if there are the conditions. */
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
    clusterNode *master = node->slaveof;
    uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
    uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
    unsigned char *claimed_slots = request->myslots;
    int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
    int j;

    /* 未持有 slots 的 master 没有投票权 */
    if (nodeIsSlave(myself) || myself->numslots == 0) return;

    /* Request epoch must be >= our currentEpoch.
     * 事实上,在这次请求中的开始,我们已经用其中的信息更新了自身的 currentEpoch
     * 所以不会出现 > 的情况,只有 = 或 <, 这里需要抛弃掉 < 的情况*/
    if (requestCurrentEpoch < server.cluster->currentEpoch) {
        serverLog(LL_WARNING,
            "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
            node->name,
            (unsigned long long) requestCurrentEpoch,
            (unsigned long long) server.cluster->currentEpoch);
        return;
    }

    /* 已经投过票了,return */
    if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
        serverLog(LL_WARNING,
                "Failover auth denied to %.40s: already voted for epoch %llu",
                node->name,
                (unsigned long long) server.cluster->currentEpoch);
        return;
    }

    /* sender 必须是 slave,它的 master 必须挂了。
     * (手动 failover 的时候,master 不挂也可以)
     * The master can be non failing if the request is flagged
     * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
    if (nodeIsMaster(node) || master == NULL ||
        (!nodeFailed(master) && !force_ack))
    {
        if (nodeIsMaster(node)) {
            serverLog(LL_WARNING,
                    "Failover auth denied to %.40s: it is a master node",
                    node->name);
        } else if (master == NULL) {
            serverLog(LL_WARNING,
                    "Failover auth denied to %.40s: I don't know its master",
                    node->name);
        } else if (!nodeFailed(master)) {
            serverLog(LL_WARNING,
                    "Failover auth denied to %.40s: its master is up",
                    node->name);
        }
        return;
    }

    /* 如果我们最近给 sender 的 master 投过票, 就先不投票。
     * 这其实不是一个必须的限制,只不过加上这个条件之后,整个选举算法会更清晰、简洁一些(more linear) */
    if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
    {
        serverLog(LL_WARNING,
                "Failover auth denied to %.40s: "
                "can't vote about this master before %lld milliseconds",
                node->name,
                (long long) ((server.cluster_node_timeout*2)-
                             (mstime() - node->slaveof->voted_time)));
        return;
    }

    /* 请求投票的 slave,它的 configEpoch 必须 >= 当前持有这些 slots 的节点 */
    for (j = 0; j < CLUSTER_SLOTS; j++) {
        if (bitmapTestBit(claimed_slots, j) == 0) continue;
        if (server.cluster->slots[j] == NULL ||
            server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
        {
            continue;
        }
        /* 只要有一个 slots 的 configEpoch > slave,就拒绝投票,return */
        serverLog(LL_WARNING,
                "Failover auth denied to %.40s: "
                "slot %d epoch (%llu) > reqEpoch (%llu)",
                node->name, j,
                (unsigned long long) server.cluster->slots[j]->configEpoch,
                (unsigned long long) requestConfigEpoch);
        return;
    }

    /* 可以投票 */
    server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
    node->slaveof->voted_time = mstime();
    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
    // 这个方法很简单,直接答复一个 ack 给 slave
    clusterSendFailoverAuth(node);
    serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
        node->name, (unsigned long long) server.cluster->currentEpoch);
}

3. clusterCron 定期任务

在 redis 的 serverCron 中,针对 cluster 模式的节点,会运行 clusterCron(10次/秒),它的主要工作是:

(1)向其他节点发送MEET消息,将其加入集群;
(2)每1s会随机选择一个节点,发送 ping 消息;
(3)如果一个节点在超时时间之内仍未收到 ping 包的响应(cluster-node-timeout配置项指定的时间),则将其
标记为 pfail;
(4)检查是否需要进行主从切换,如果需要则执行切换;
(5)检查是否需要进行副本漂移,如果需要,执行副本漂移操作。

对于步骤(1),当在一个集群节点A执行 CLUSTER MEET ip port 命令时,会将“ip-port”指定的节点B加入该集群中,但该命令执行时只是将B的“ip-port”信息保存起来,然后在 clusterCron 函数中才和B节点建立连接并发送MEET类型的数据包
对于步骤(3),Redis集群中节点的故障状态有两种。一种为pfail(Possible failure),当一个节点A未在指定时间收到另一个节点B对ping包的响应时,A节点会将B节点标记为pfail。另一种是,当大多数Master节点确认B为pfail之后,就会将B标记为fail。fail状态的节点才会需要执行主从切换。

void clusterCron(void) {
    dictIterator *di;
    dictEntry *de;
    int update_state = 0;
    int orphaned_masters; /* How many masters there are without ok slaves. */
    int max_slaves; /* Max number of ok slaves for a single master. */
    int this_slaves; /* Number of ok slaves for our master (if we are slave). */
    mstime_t min_pong = 0, now = mstime();
    clusterNode *min_pong_node = NULL;
    static unsigned long long iteration = 0;
    mstime_t handshake_timeout;

    iteration++; /* 计数器,clusterCron 函数运行的次数 */

    /* 在 redis 运行的过程中可以通过 CONFIG SET 命令修改 cluster-announce-ip ,
     * 如果确实修改了 ip,这里需要设置 myself->ip */
    {
        static char *prev_ip = NULL;
        char *curr_ip = server.cluster_announce_ip;
        int changed = 0;

        if (prev_ip == NULL && curr_ip != NULL) changed = 1;
        else if (prev_ip != NULL && curr_ip == NULL) changed = 1;
        else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;

        if (changed) {
            if (prev_ip) zfree(prev_ip);
            prev_ip = curr_ip;

            if (curr_ip) {
                /* 拷贝一份 announce_ip 赋给 prev_id。We always take a copy of the previous IP address, by
                 * duplicating the string. This way later we can check if
                 * the address really changed. */
                prev_ip = zstrdup(prev_ip);
                strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
                myself->ip[NET_IP_STR_LEN-1] = '\0';
            } else {
                // 没有 ip,就强制置空,等收到 PING 或者 MEET 的时候再更新自己的 ip
                myself->ip[0] = '\0'; /* Force autodetection. */
            }
        }
    }

    // 当处于 handshake 中的节点超时之后,还没有变成正常节点,就会给它T出去。
    // 这里保证这个超时时间不要太短。
    handshake_timeout = server.cluster_node_timeout;
    if (handshake_timeout < 1000) handshake_timeout = 1000;

    /* 再校验一下 cluster_slave_no_failover 标志(禁止 slave 自动 failover,但不禁止管理员手动 failover),
     * 防止用户用 CONFIG SET 对它进行修改 */
    clusterUpdateMyselfFlags();

    /* 用于统计 PFAIL 状态的节点 */
    server.cluster->stats_pfail_nodes = 0;

    // 检查是否有 disconnect 的节点,重新连接它。
    // 顺便更新一些状态信息,程序的其他部分会用到。
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);

        /* Not interested in reconnecting the link with myself or nodes
         * for which we have no address. */
        if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;

        // 统计 pfail 节点的数量。会在定期的 PING 命令中将它们找出来并发给其他节点
        if (node->flags & CLUSTER_NODE_PFAIL)
            server.cluster->stats_pfail_nodes++;

        /* HANDSHAKE 状态是有 timeout 限制的,超时就删掉 */
        if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
            clusterDelNode(node);
            continue;
        }

        // 重新与断连的 node 建立连接
        if (node->link == NULL) {
            clusterLink *link = createClusterLink(node);
            link->conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
            connSetPrivateData(link->conn, link);
            if (connConnect(link->conn, node->ip, node->cport, NET_FIRST_BIND_ADDR,
                        clusterLinkConnectHandler) == -1) {
                // 建立连接失败。
                // 如果 node->ping_sent == 0, redis 的失败检测机制(failure detection)将不会运行,
                // 所以我们假装发送了一次 ping,手动给 node->ping_sent 赋值。
                if (node->ping_sent == 0) node->ping_sent = mstime();
                serverLog(LL_DEBUG, "Unable to connect to "
                    "Cluster Node [%s]:%d -> %s", node->ip,
                    node->cport, server.neterr);

                freeClusterLink(link);
                continue;
            }
            node->link = link;
        }
    }
    dictReleaseIterator(di);

    /* 每 1 秒随机 ping 一个节点
     * (iteration 是 cron 任务运行的次数, 而 cron 运行的频率是 10hz)  */
    if (!(iteration % 10)) {
        int j;

        /* 随机检查5个 node,找出 ping-pong 间隔最久的那一个,并向他发送 ping 命令
         * 如果5次都没有找到可用的节点,就不 ping 了*/
        for (j = 0; j < 5; j++) {
            de = dictGetRandomKey(server.cluster->nodes);
            clusterNode *this = dictGetVal(de);

            /* 断连的不 PING,刚 PING 过的不 PING */
            if (this->link == NULL || this->ping_sent != 0) continue;
            if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
                continue;
            if (min_pong_node == NULL || min_pong > this->pong_received) {
                min_pong_node = this;
                min_pong = this->pong_received;
            }
        }
        if (min_pong_node) {
            serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
            clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
        }
    }

    /* 检查所有的节点,判断是否要将其标记为下线状态。
     * 同时:
     * 1. 检查是否有孤立的 master 节点(slave节点全挂);
     * 2. 找出拥有最多 slave 节点的主节点,统计它的 slave 节点数量;
     * 3. 如果自己是从节点,则统计自己的兄弟节点数量 */
    orphaned_masters = 0;
    max_slaves = 0;
    this_slaves = 0;
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        now = mstime(); /* Use an updated time at every iteration. */

        if (node->flags &
            (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
                continue;

        /* 检测孤立的 master (为可能的 replication migrate 做准备) */
        if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
            int okslaves = clusterCountNonFailingSlaves(node);

            /* 孤立的 master 的定义是:
             * 1. 分配有 slot
             * 2. 拥有或者曾经拥有至少一个 slave,并且所有 slave 全挂 */
            if (okslaves == 0 && node->numslots > 0 &&
                node->flags & CLUSTER_NODE_MIGRATE_TO)
            {
                orphaned_masters++;
            }
            if (okslaves > max_slaves) max_slaves = okslaves;
            if (nodeIsSlave(myself) && myself->slaveof == node)
                this_slaves = okslaves;
        }

        /* 如果等待 pong 的时间超过 cluster timeout/2,为了防止连接异常,需要重新和 node 建立连接 */
        mstime_t ping_delay = now - node->ping_sent;
        mstime_t data_delay = now - node->data_received;
        if (node->link && /* is connected */
            now - node->link->ctime >
            server.cluster_node_timeout && /* was not already reconnected */
            node->ping_sent && /* we already sent a ping */
            /* 等待 pong 超过 timeout/2 */
            ping_delay > server.cluster_node_timeout/2 &&
            /* 最后一次从 node 接受任意数据包的时间超过 timeout/2 */
            data_delay > server.cluster_node_timeout/2)
        {
            /* 这里直接释放掉连接信息,下次 cron 的时候会尝试重连 */
            freeClusterLink(node->link);
        }

         /* 为了保证每个 node 最近都 ping 过:
          * 如果没有 active_ping, 且上一次收到 pong 距今已经超过 timeout/2,就再 ping 一次 */
        if (node->link &&
            node->ping_sent == 0 &&
            (now - node->pong_received) > server.cluster_node_timeout/2)
        {
            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
            continue;
        }

        /* 如果自己是 master,并且 node 代表的 slave 要求进行手动故障转移(manual failover),
         * 就持续的 ping 它(主要是为了及时同步 repl_offset,数据都同步过去之后,才会执行 mf 后续流程) */
        if (server.cluster->mf_end &&
            nodeIsMaster(myself) &&
            server.cluster->mf_slave == node &&
            node->link)
        {
            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
            continue;
        }

        /* 收到 PONG 的时候会将它置位 0, 这里说明和这个 node 最近刚刚通信过,可以跳过了 */
        if (node->ping_sent == 0) continue;

        /* 检查 node 是不是 unreachable.
         *
         * 从 node 接收到的任意数据包,也可以作为 node 存活的证明.
         * (因为 cluster bus 上也在传输数据,所以在业务负载高的时候,也有可能导致 pong 延迟) */
        mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :
                                                          data_delay;
        if (node_delay > server.cluster_node_timeout) {
            /* 超时. 标记位 pfail. */
            if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
                serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
                    node->name);
                node->flags |= CLUSTER_NODE_PFAIL;
                update_state = 1;
            }
        }
    }
    dictReleaseIterator(di);

    /* 如果自身是 slave 节点,但是还没有 replication,就去连接 master 并执行 slaveof 命令 */
    if (nodeIsSlave(myself) &&
        server.masterhost == NULL &&
        myself->slaveof &&
        nodeHasAddr(myself->slaveof))
    {
        replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
    }

    /* 如果 manual failover 超时了,清理掉各种标志位 */
    manualFailoverCheckTimeout();

    if (nodeIsSlave(myself)) {

        // (如果不是 manual failover,这个方法会直接退出)
        // 判断自身的 replic offset 是否和 master 相同,
        // 如果相同,则表示可以开始做 failover 了,记录一个标志位。
        // 后面会在 ae event loop 的 beforeSleep 中,执行 failover
        clusterHandleManualFailover();

        // auto failover
        if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
            clusterHandleSlaveFailover();

        ......

        /* 如果有孤立的 slave,并且我们是兄弟节点最多的 slave,就尝试升为 master */
        if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves &&
            server.cluster_allow_replica_migration)
            clusterHandleSlaveMigration(max_slaves);
    }

    if (update_state || server.cluster->state == CLUSTER_FAIL)
        clusterUpdateState();
}

手动故障转移

其实 manual failover 的本质还是 failover,所以 redis 其实是把它转化成普通的 failover 来进行处理的,在升主阶段,核心就是看 repl_offset 是否一致,如果一致了,就可以当做普通 failover 处理了,如果不一致,就持续等待。

所以,你可以看到这个方法的最后,clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER) 其实就是在 clusterBeforeSleep 中再做一次检测;当然与此同时,clusterCron 也会每 100ms 检测一次。直到 repl_offset 一致为止,这时候,flag 就变成 CLUSTER_TODO_HANDLE_FAILOVER 了,redis 就会在下一次 clusterBeforeSleep 中,进行真正的故障转移了。

/* cluster cron 中会调用这个方法,来调整 mf 状态机  */
void clusterHandleManualFailover(void) {

    /* 没有 mf,Return ASAP */
    if (server.cluster->mf_end == 0) return;

    /* If mf_can_start is non-zero, the failover was already triggered so the
     * next steps are performed by clusterHandleSlaveFailover(). */
    if (server.cluster->mf_can_start) return;

    // 为了保证数据不丢失,需要等待 master 发送 repl_offset,才能开始执行
    if (server.cluster->mf_master_offset == -1) return;

    if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
        /* 我的 replication offset 和 master paused 之后的 replication offset 一致 */
        server.cluster->mf_can_start = 1;
        serverLog(LL_WARNING,
            "All master replication stream processed, "
            "manual failover can start.");
        clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
        return;
    }
    clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER);
}

新 slave 加入集群

replicationSetMaster方法,我们在高可靠主线-主从复制一节已经介绍过了,主要工作是:

  1. 与 master 建立物理连接
  2. 给 master 发送自己的信息
  3. 从 master 同步数据(从 master 下载 rdb 文件)

4. clusterBeforeSleep

/* 下面这段话要翻译的自然一点还是比较费劲的,借助 ChatGpt3.5 搞一搞:
 * 这个函数在事件处理程序休眠之前被调用。它非常有用,可以执行一些必须马上完成的操作,以响应已经触发的事件,
 * 但是这些操作不能在事件处理程序内部执行,因为可能会有安全问题。此外,也可以在回复客户端之前执行一些可能比较费时的任务。
 */

/* This function is called before the event handler returns to sleep for
 * events. It is useful to perform operations that must be done ASAP in
 * reaction to events fired but that are not safe to perform inside event
 * handlers, or to perform potentially expansive tasks that we need to do
 * a single time before replying to clients. */

void clusterBeforeSleep(void) {

    /* 把 flag 拷贝一份 *
    int flags = server.cluster->todo_before_sleep;

    /* 重置 flags */
    server.cluster->todo_before_sleep = 0;

    // 故障转移
    //
    // 注意看这里,如果是手动故障转移,会在 if 中持续的的检测 repl_offset,直到主从 repl_offset 一致之后,
    // 这个 flag 就变成 CLUSTER_TODO_HANDLE_FAILOVER 了,再次运行 clusterBeforeSleep 的时候,
    // 就会走到 else if 中,执行真正的升主。
    if (flags & CLUSTER_TODO_HANDLE_MANUALFAILOVER) {
        /* 尽快执行手动故障转移,没必要等 100ms 一次的 clusterCron */
        if(nodeIsSlave(myself)) {
            clusterHandleManualFailover();
            if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
                clusterHandleSlaveFailover();
        }
    } else if (flags & CLUSTER_TODO_HANDLE_FAILOVER) {
        /* 走到这里说明选票足够,开始升主操作 */
        clusterHandleSlaveFailover();
    }

    /* Update the cluster state. */
    if (flags & CLUSTER_TODO_UPDATE_STATE)
        clusterUpdateState();

    /* 将 cluster 配置保存在磁盘上 */
    if (flags & CLUSTER_TODO_SAVE_CONFIG) {
        int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG;
        clusterSaveConfigOrDie(fsync);
    }
}

故障转移

clusterHandleSlaveFailover

/* 这个方法的调用场景是
 * 1. 我们是 slave,master 中有 slots,master 处于 FAIL 状态
 * 2. 手动故障转移,master 发来了 repl_offset 且与本地一致。
 *
 * 方法的目标是:
 * 1) 检查是否可以执行 failover,本地的数据是否是最新的?
 * 2) 向其他 master 发起投票
 * 3) 执行升主操作,并通知其他节点
 */
void clusterHandleSlaveFailover(void) {
    mstime_t data_age;
    mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
    int needed_quorum = (server.cluster->size / 2) + 1;
    int manual_failover = server.cluster->mf_end != 0 &&
                          server.cluster->mf_can_start;
    mstime_t auth_timeout, auth_retry_time;

    server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;

    /* failover_timeout = MAX(NODE_TIMEOUT*2,2000) ms
     * 重试超时是 = 2 * failover_timeout  */
    auth_timeout = server.cluster_node_timeout*2;
    if (auth_timeout < 2000) auth_timeout = 2000;
    auth_retry_time = auth_timeout*2;

    /* 前置的条件检查:
     * 1) 我们是 slave
     * 2) master 处于 FAIL 状态, 或者是 manual failover.
     * 3) 是自动 failover,并且 failover 功能未关闭
     * 4) master 持有 slots */
    if (nodeIsMaster(myself) ||
        myself->slaveof == NULL ||
        (!nodeFailed(myself->slaveof) && !manual_failover) ||
        (server.cluster_slave_no_failover && !manual_failover) ||
        myself->slaveof->numslots == 0)
    {
        /* 记录失败原因:前置条件未达成 */
        server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
        return;
    }

    /* 下面这几段代码是针对 auto failover 的:
     * 当 slave 和 master 断连太久时,他们的数据可能差的有点多,这时候就不应该执行 auto failover 了
     */

    /* 1. 计算 data_age: 即当前节点和 master 断连的时间 */
    if (server.repl_state == REPL_STATE_CONNECTED) {
        data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
                   * 1000;
    } else {
        data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
    }

    /* 2. 当 master 挂掉之后,需要 cluster_node_timeout 时间,才会被标记位 FAIL,这里要扣除掉这个时间 */
    if (data_age > server.cluster_node_timeout)
        data_age -= server.cluster_node_timeout;

    /* 3. 断连时间的容忍度,这是一个用户配置: cluster_slave_validity_factor */
    if (server.cluster_slave_validity_factor &&
        data_age >
        (((mstime_t)server.repl_ping_slave_period * 1000) +
         (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
    {
        if (!manual_failover) {
            clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
            return;
        }
    }

    /* 如果 failover 一直未成功,并且过了 retry 时间,则触发一次新的 failover.
     * auth_age = mstime() - server.cluster->failover_auth_time;  */
    if (auth_age > auth_retry_time) {

        // 设置本轮 failover 的最终超时时间,超过这个时间就要重新开启一轮 failover
        server.cluster->failover_auth_time = mstime() +
            500 + /* 延迟 500ms, 让 FAIL 状态在集群中充分传播 */
            random() % 500; /* 实际延迟 0-500ms */

        server.cluster->failover_auth_count = 0;
        server.cluster->failover_auth_sent = 0;
        // 根据 slave 的 repl_offset 获取 rank
        server.cluster->failover_auth_rank = clusterGetSlaveRank();

        // 给最终超时时间增加 rank*1000,让数据更准确的节点早点开始 failover
        server.cluster->failover_auth_time += server.cluster->failover_auth_rank * 1000;

        /* manual failover 的优先级最高,不需要延迟 */
        if (server.cluster->mf_end) {
            server.cluster->failover_auth_time = mstime();
            server.cluster->failover_auth_rank = 0;
                clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
        }
        serverLog(LL_WARNING,
            "Start of election delayed for %lld milliseconds "
            "(rank #%d, offset %lld).",
            server.cluster->failover_auth_time - mstime(),
            server.cluster->failover_auth_rank,
            replicationGetSlaveOffset());
        /* 既然我们规划了一次 election, 就顺道给其他兄弟节点广播一下自己的 offset.
         * (注意,只给兄弟节点发,其他的 master 和 slave 都不发)
         * 如果当前节点的 offset 更大,说明数据更新,其他节点在计算 rank 的时候,就会退让 */
        clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
        return;
    }

    /* 如果从其他节点拿到的 offset 更大,就需要更新一下自己的 rank
     * (注意: 只针对 auto failover) */
    if (server.cluster->failover_auth_sent == 0 &&
        server.cluster->mf_end == 0)
    {
        int newrank = clusterGetSlaveRank();
        if (newrank > server.cluster->failover_auth_rank) {
            long long added_delay =
                (newrank - server.cluster->failover_auth_rank) * 1000;
            server.cluster->failover_auth_time += added_delay;
            server.cluster->failover_auth_rank = newrank;
            serverLog(LL_WARNING,
                "Replica rank updated to #%d, added %lld milliseconds of delay.",
                newrank, added_delay);
        }
    }

    /* 还没到指定的时间,return */
    if (mstime() < server.cluster->failover_auth_time) {
        clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
        return;
    }

    /* 超时,return */
    if (auth_age > auth_timeout) {
        clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
        return;
    }

    /* 如果没有发起投票,则请求投票!! */
    if (server.cluster->failover_auth_sent == 0) {
        server.cluster->currentEpoch++;
        server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
        serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
            (unsigned long long) server.cluster->currentEpoch);
        // 消息 type = CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST
        clusterRequestFailoverAuth();
        server.cluster->failover_auth_sent = 1;
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                             CLUSTER_TODO_UPDATE_STATE|
                             CLUSTER_TODO_FSYNC_CONFIG);
        return;
    }

    /* 检查票数 */
    if (server.cluster->failover_auth_count >= needed_quorum) {

        /* 票数足够,终于可以开始 failover 了 */
        serverLog(LL_WARNING, "Failover election won: I'm the new master.");

        // 需要注意的是:
        // 终于,currentEpoch 和 configEpoch 在这里,通过 failover_auth_epoch 关联上了

        // 在上面发起投票的时候,currentEpoch++,并且赋给了 failover_auth_epoch,
        // 这里选举成功之后,又赋给了 configEpoch
        if (myself->configEpoch < server.cluster->failover_auth_epoch) {
            myself->configEpoch = server.cluster->failover_auth_epoch;
            serverLog(LL_WARNING,
                "configEpoch set to %llu after successful failover",
                (unsigned long long) myself->configEpoch);
        }

        // 取代 master,负担起对应的 slots
        clusterFailoverReplaceYourMaster();
    } else {
        clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
    }
}

升主

clusterFailoverReplaceYourMaster

void clusterFailoverReplaceYourMaster(void) {
    int j;
    clusterNode *oldmaster = myself->slaveof;

    if (nodeIsMaster(myself) || oldmaster == NULL) return;

    /* 1) 转为 master. */
    //1. 清理 slaveof 字段
    //2. 去掉 flags 中 slave 标记、增加 master 标记
    clusterSetNodeAsMaster(myself);
    //1. 先清理 masterhost 字段,再断开与 master 的连接(防止系统自动重连)
    //2. 生成新的 replication id,把旧的(从老 master 那里继承来的) id 放到 secondary ID 字段
    //3. 断开与其他 slave 的连接, slave 重连的时候就会更新 replication id
    //4. 重新打开 aof(断连太久导致数据不可信,会关闭 aof)
    replicationUnsetMaster();

    /* 2) 把旧 master 的 slots 都拿过来. */
    for (j = 0; j < CLUSTER_SLOTS; j++) {
        if (clusterNodeGetSlotBit(oldmaster,j)) {
            clusterDelSlot(j);
            clusterAddSlot(myself,j);
        }
    }

    /* 3) 更新配置并保存到磁盘. */
    clusterUpdateState();
    clusterSaveConfigOrDie(1);

    /* 4) 发送 PONG 通知其他所有节点,自己成为新 master 了 */
    clusterBroadcastPong(CLUSTER_BROADCAST_ALL);

    /* 5) 重置 mf 的状态 */
    resetManualFailover();
}

clusterUpdateState

void clusterUpdateState(void) {
    int j, new_state;
    int reachable_masters = 0;
    static mstime_t among_minority_time;
    static mstime_t first_call_time = 0;

    server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;

    /* 如果当前节点是一个 master,并且 cluster 状态是 FAIL 的,
     * 就稍等一会儿在开始写数据,让 cluster 的自动配置先跑起来 */
    if (first_call_time == 0) first_call_time = mstime();
    if (nodeIsMaster(myself) &&
        server.cluster->state == CLUSTER_FAIL &&
        mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;

    /* 假设 cluster 的状态是 OK 的,下面会酌情进行修改 */
    new_state = CLUSTER_OK;

    /* 检查所有的 slots 是否都已分配 */
    if (server.cluster_require_full_coverage) {
        for (j = 0; j < CLUSTER_SLOTS; j++) {
            if (server.cluster->slots[j] == NULL ||
                server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
            {
                new_state = CLUSTER_FAIL;
                break;
            }
        }
    }

    /* 计算 cluster size. 要求:
     * 1. 必须是 master
     * 2. 至少持有一个 */
    {
        dictIterator *di;
        dictEntry *de;

        server.cluster->size = 0;
        di = dictGetSafeIterator(server.cluster->nodes);
        while((de = dictNext(di)) != NULL) {
            clusterNode *node = dictGetVal(de);

            if (nodeIsMaster(node) && node->numslots) {
                server.cluster->size++;
                if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
                    reachable_masters++;
            }
        }
        dictReleaseIterator(di);
    }

    /* 可达的 master 小于 (n/2)+1, 说明状态异常 */
    {
        int needed_quorum = (server.cluster->size / 2) + 1;

        if (reachable_masters < needed_quorum) {
            new_state = CLUSTER_FAIL;
            among_minority_time = mstime();
        }
    }

    if (new_state != server.cluster->state) {
        mstime_t rejoin_delay = server.cluster_node_timeout;

        /* 如果当前节点是一个 master,挂掉之后又重新进入了集群,则先别急着恢复状态(处理数据)
         * 稍微 delay 一段时间,保证集群中的配置信息(比如说 failover)充分传播。 */
        if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
            rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
        if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
            rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;

        if (new_state == CLUSTER_OK &&
            nodeIsMaster(myself) &&
            mstime() - among_minority_time < rejoin_delay)
        {
            return;
        }

        /* 记录日志,并修改状态 */
        serverLog(LL_WARNING,"Cluster state changed: %s",
            new_state == CLUSTER_OK ? "ok" : "fail");
        server.cluster->state = new_state;
    }
}

三、一些特殊的情况

  1. 如果 master 离线一段时间又上线,会怎么样?

新旧 master 都声称自己有同一段 slot,而新 master 的 configEpoch 更大,所以新 master 仍然是 master,旧 master 在广播自己的信息之后,会收到其他节点的 upgrade 请求,旧 master 会更新自己的配置,重新以 replica 的身份加入集群。

  1. 重新加入集群的节点,它的 slot 是如何变化的?

通常来说,触发 failover 之后,旧 master 会变为新 master 的 replica。但是如果发生了 hash slot 的变动,就会通过下面的规则来处理:

A master node will change its configuration to replicate (be a replica of) the node that stole its last hash slot

举例来说:假设一个节点 A 包含有 slot 1,2 ,A 离线之后,slot 1 被分配给 B,slot 2 被分配给 C。如果 A 又重新加入了集群,其他节点在收到 A 的广播之后,会给 A 发送 upgrade 请求,让 A 更新自己的配置,此时 A 会变为 C 的 replica。即:收到 A 广播的 master 节点,会找到当前承载 A 的最后一个 slot(也就是2)的节点(也就是C),让 A 变为 C 的 replica。

其他的 replica 节点也遵守相同的规则:

they reconfigure to replicate the node that stole the last hash slot of its former master.

他们都会被通知变为 C 的 replica(如果没有人工介入的话)

四、参考

不得不说,官方文档 - Cluster Spec 才是最好的老师,很多细节上的问题、边界问题、特殊情况,都在这里有说明;缺点就是纯英文,我断断续续看了快一个月才看完,但是好处也是明显的,整个 cluster 机制,不再像以前一样是一个黑盒了

这里还有一个网友翻译的中文站点,不过机翻痕迹太严重了,基本没法看。

https://blog.csdn.net/Edidaughter/article/details/116403296

https://time.geekbang.org/column/article/276545

标签: redis
最后更新:2026年2月26日

zt52875287

这个人很懒,什么都没留下

点赞
< 上一篇
下一篇 >
文章目录
  • 一、分片集群
  • 二、源码:
    • 0. 补充说明
      • 1. PFAIL 和 FAIL
      • 2. 选举
      • 3. Replica migration
      • 4. manual failover
    • 1. 接收数据包
    • 2. 处理数据包
      • 更新slot信息
      • 投票
    • 3. clusterCron 定期任务
      • 手动故障转移
      • 新 slave 加入集群
    • 4. clusterBeforeSleep
      • 故障转移
      • 升主
      • clusterUpdateState
  • 三、一些特殊的情况
  • 四、参考

Copyright © by zt52875287@gmail.com All Rights Reserved.

Theme Kratos Made By Seaton Jiang

陕ICP备2021009385号-1