欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Redis6.2哨兵定时检查源码

时间:2023-06-15

推荐看渐进式解析 Redis 源码 - 哨兵 sentinel他写的很全,我这也是参考他的,顺着他的逻辑来的

文章目录

serverCron(定时任务触发方法)

sentinelTimer(哨兵模式定时器检查)

sentinelCheckTiltCondition(检查是否需要进入TITL模式)sentinelHandleDictOfRedisInstances(对传过来的某一种哨兵实例进行循环执行周期函数)

sentinelHandleRedisInstance (周期性调用函数)

sentinelReconnectInstance(sentinel 和 redis实例的连接)

sentinelSendPing (命令模式建立连接)redisAsyncCommand(订阅模式建立连接) sentinelSendPeriodicCommands(发送监控命令的函数)

sentinelInfoReplyCallback(Info回调函数) serverCron(定时任务触发方法)

Server.c中的serverCron方法里,而serverCron是每秒调用server.hz次,而且如果看到sentinelTimer方法最后面,会根据方程server.hz会变化,serverCron完整方法可以看Redis 6.2定时删除刷新频率源码

if (server.sentinel_mode) sentinelTimer();

sentinelTimer(哨兵模式定时器检查)

void sentinelTimer(void) { //先检查 sentinel 是否需要进入 TILT 模式,更新最近一次执行 sentinel 模式的时间 sentinelCheckTiltCondition(); // 对 sentinel 监控的所有主节点进行递归执行周期函数 sentinelHandleDictOfRedisInstances(sentinel.masters); // 运行队列中等待的脚本,比如发送 PING、HELLO 等 sentinelRunPendingscripts(); // 清理已经执行的脚本,重新执行错误的脚本 sentinelCollectTerminatedscripts(); // 清除执行超时的脚本,等下一次周期在 sentinelCollectTerminatedscripts() 中执行 sentinelKillTimedoutscripts(); //不断改变 Redis 定期任务的执行频率,以便使每个 sentinel 节点都不同步,这种不确定性可以避免 sentinel 在同一时间开始完全继续保持同步, //当被要求进行投票时,一次又一次在同一时间进行投票,因为脑裂导致有可能没有胜选者 server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;}

sentinelCheckTiltCondition(检查是否需要进入TITL模式)

void sentinelCheckTiltCondition(void) { mstime_t now = mstime(); //最近一次执行周期函数过了多久 mstime_t delta = now - sentinel.previous_time; //如果时间为负值,或者相隔时间超过2000毫秒,则进入TITL模式 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) { //进入TITL模式 sentinel.tilt = 1; //TITL模式开始时间 sentinel.tilt_start_time = mstime(); sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered"); } //设置最近一次sentinel时间处理程序的时间 sentinel.previous_time = mstime();}

sentinelHandleDictOfRedisInstances(对传过来的某一种哨兵实例进行循环执行周期函数)

void sentinelHandleDictOfRedisInstances(dict *instances) { dictIterator *di; dictEntry *de; sentinelRedisInstance *switch_to_promoted = NULL; //获取监控对象实例迭代器 di = dictGetIterator(instances); // 遍历监控对象实例 while((de = dictNext(di)) != NULL) { // 获取实例 sentinelRedisInstance *ri = dictGetVal(de); // 对指定的实例执行周期操作 sentinelHandleRedisInstance(ri); // 如果当前实例为sentinel主服务器节点 if (ri->flags & SRI_MASTER) { // 递归对当前sentinel实例的从节点执行周期函数 sentinelHandleDictOfRedisInstances(ri->slaves); // 递归对当前实例的 sentinel 节点执行周期函数 sentinelHandleDictOfRedisInstances(ri->sentinels); // 如果当前实例处于完成故障转移状态而且所有从节点已经完成了对新主节点的同步操作,那么设置 主从转换标识 if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATe_CONFIG) { switch_to_promoted = ri; } } } // 如果设置了主从转换标识 if (switch_to_promoted) // 将原来的主节点从表中删除,使用晋升的新主节点替代,那么所有 从节点 和 旧主节点 都从属当前新的主节点 sentinelFailoverSwitchToPromotedSlave(switch_to_promoted); // 释放迭代器 dictReleaseIterator(di);}

sentinelHandleRedisInstance (周期性调用函数)

void sentinelHandleRedisInstance(sentinelRedisInstance *ri) { // 对所有类型的实例进行操作 // 为所有 sentinel 和 实例之间建立网络通信 sentinelReconnectInstance(ri); // 定期发送 PING、PONG、PUBLISH 等命令 sentinelSendPeriodicCommands(ri); if (sentinel.tilt) { // 如果 TILT 模式的时间还没到(默认 1000*30毫秒),则直接返回 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return; // 超过时间,则退出 TILT 模式 sentinel.tilt = 0; sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited"); } // 对于各实例进行下线检测,判断是否处于主观下线状态 sentinelCheckSubjectivelyDown(ri); // 主从节点不做什么操作,目前是空白的,可能后续版本会增加内容 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) { } // 主服务器节点 if (ri->flags & SRI_MASTER) { // 检查是否客观下线 sentinelCheckObjectivelyDown(ri); // 如果处于客观下线,则进一步进行故障转移状态判断 if (sentinelStartFailoverIfNeeded(ri)) // 向其他 sentinel 节点发送 SENTINEL is-master-down-by-addr 确认是否不可达,如果能从其他 sentinel 节点足够多的票数,则标记为客观下线,触发故障转移 sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED); // 执行故障转移操作 sentinelFailoverStateMachine(ri); // 主节点没有处于客观下线的状态,那么也要尝试发送 SENTINEL is-master-down-by-addr 给所有的 sentinel 获取回复,因为主节点如果有回复延迟等等状况,可以通过该命令,更新一些主节点状态 sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS); }}

sentinelReconnectInstance(sentinel 和 redis实例的连接)

在载入配置时,主节点实例加入字典是默认关闭的,所以调用这个函数用于sentinel 和 redis实例的连接,有两种模式用于建立连接,一种是命令模式,一种是订阅模式

void sentinelReconnectInstance(sentinelRedisInstance *ri) { // 如果实例没有断开连接,直接返回 if (ri->link->disconnected == 0) return; // 如果 port == 0,则意味着 地址非法 if (ri->addr->port == 0) return; instancelink *link = ri->link; mstime_t now = mstime(); // 如果最近一次重连的时间距离现在太短,小于 1秒,则直接返回 if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return; ri->link->last_reconn_time = now; // 命令模式,发送命令,跟下面的订阅连接区分 if (link->cc == NULL) { // 建立连接并绑定到实例 link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR); if (link->cc && !link->cc->err) anetCloexec(link->cc->c.fd); if (!link->cc) {//建立连接失败 sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #Failed to establish connection"); } else if (!link->cc->err && server.tls_replication && (instancelinkNegotiateTLS(link->cc) == C_ERR)) { //初始化 TLS 失败 sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #Failed to initialize TLS"); instancelinkCloseConnection(link,link->cc); } else if (link->cc->err) { // 连接错误,发送事件通知,并关闭连接 sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s", link->cc->errstr); instancelinkCloseConnection(link,link->cc); } else { // 重置连接属性 link->pending_commands = 0; link->cc_conn_time = mstime(); link->cc->data = link; // 将服务器的事件循环关联到连接的上下文 redisAeAttach(server.el,link->cc); // 设置确立连接的回调函数 redisAsyncSetConnectCallback(link->cc, sentinellinkEstablishedCallback); // 设置断开连接的回调函数 redisAsyncSetDisconnectCallback(link->cc, sentinelDisconnectCallback); // 发送 AUTH 命令认证 sentinelSendAuthIfNeeded(ri,link->cc); // 发送连接名字 sentinelSetClientName(ri,link->cc,"cmd"); // 立即发送 PING 命令 sentinelSendPing(ri); } } // 订阅模式 // 针对 主从节点建立 订阅连接 if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) { // 建立连接并绑定到实例 link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR); if (link->pc && !link->pc->err) anetCloexec(link->pc->c.fd); if (!link->pc) { //建立连接失败 sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to establish connection"); } else if (!link->pc->err && server.tls_replication && (instancelinkNegotiateTLS(link->pc) == C_ERR)) {//初始化 TLS 失败 sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to initialize TLS"); } else if (link->pc->err) {// 连接错误,发送事件通知,并关闭连接 sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s", link->pc->errstr); instancelinkCloseConnection(link,link->pc); } else { int retval; 重置连接属性 link->pc_conn_time = mstime(); link->pc->data = link; // 将服务器的事件循环关联到连接的上下文中 redisAeAttach(server.el,link->pc); // 设置确立连接的回调函数 redisAsyncSetConnectCallback(link->pc, sentinellinkEstablishedCallback); // 设置断开连接的回调函数 redisAsyncSetDisconnectCallback(link->pc, sentinelDisconnectCallback); // 发送 AUTH 命令认证 sentinelSendAuthIfNeeded(ri,link->pc); sentinelSetClientName(ri,link->pc,"pubsub"); // 发送订阅 __sentinel__:hello 频道的命令,设置回调函数sentinelReceiveHelloMessages 处理回复, //sentinelReceiveHelloMessages 是处理 pub/sub 的频道返回信息的回调函数,可以发现订阅同一 master 的 sentinel 节点 retval = redisAsyncCommand(link->pc, sentinelReceiveHelloMessages, ri, "%s %s", sentinelInstanceMapCommand(ri,"SUBSCRIBE"), SENTINEL_HELLO_CHANNEL); // 订阅频道出错 if (retval != C_OK) { // 关闭连接 instancelinkCloseConnection(link,link->pc); return; } } } if (link->cc && (ri->flags & SRI_SENTINEL || link->pc)) link->disconnected = 0;}

sentinelSendPing (命令模式建立连接)

int sentinelSendPing(sentinelRedisInstance *ri) { int retval = redisAsyncCommand(ri->link->cc, sentinelPingReplyCallback, ri, "%s", sentinelInstanceMapCommand(ri,"PING")); if (retval == C_OK) { ri->link->pending_commands++; ri->link->last_ping_time = mstime(); if (ri->link->act_ping_time == 0) ri->link->act_ping_time = ri->link->last_ping_time; return 1; } else { return 0; }}

发送ping命令之后通过回调函数sentinelPingReplyCallback

void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { sentinelRedisInstance *ri = privdata; instancelink *link = c->data; redisReply *r; // 如果没有响应或者连接问题,直接返回 if (!reply || !link) return; // 到这里证明已经响应,那么未回复命令个数 -1 link->pending_commands--; r = reply; // 如果实例处于 REDIS_REPLY_STATUS 或者 REDIS_REPLY_ERROR 状态 if (r->type == REDIS_REPLY_STATUS || r->type == REDIS_REPLY_ERROR) { if (strncmp(r->str,"PONG",4) == 0 || strncmp(r->str,"LOADING",7) == 0 || strncmp(r->str,"MASTERDOWN",10) == 0) { // 更新回复时间 link->last_avail_time = mstime(); // 接收到信息后,设置最近发送 PING 命令时间为 0 link->act_ping_time = 0; } else { if (strncmp(r->str,"BUSY",4) == 0 && (ri->flags & SRI_S_DOWN) && !(ri->flags & SRI_script_KILL_SENT)) { // 异步发送一个 script kill 命令,并设置回调函数 if (redisAsyncCommand(ri->link->cc, sentinelDiscardReplyCallback, ri, "%s KILL", sentinelInstanceMapCommand(ri,"script")) == C_OK) { // 发送成功,但是 未回复命令就要 +1 了 ri->link->pending_commands++; } // 改变状态为 SRI_script_KILL_SENT ri->flags |= SRI_script_KILL_SENT; } } } // 更新最近一次收到 PING 回复的时间 link->last_pong_time = mstime();}

redisAsyncCommand(订阅模式建立连接)

既然是根据订阅模式建立连接,那就应该有生产和订阅两部分组成,redisAsyncCommand这个方法不管是发送生产还是订阅,都是调用这个方法,只是参数不同

int sentinelSendHello(sentinelRedisInstance *ri) { char ip[NET_IP_STR_LEN]; char payload[NET_IP_STR_LEN+1024]; int retval; char *announce_ip; int announce_port; sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master; sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master); if (ri->link->disconnected) return C_ERR; if (sentinel.announce_ip) { announce_ip = sentinel.announce_ip; } else { if (anetFdToString(ri->link->cc->c.fd,ip,sizeof(ip),NULL,FD_TO_SOCK_NAME) == -1) return C_ERR; announce_ip = ip; } if (sentinel.announce_port) announce_port = sentinel.announce_port; else if (server.tls_replication && server.tls_port) announce_port = server.tls_port; else announce_port = server.port; snprintf(payload,sizeof(payload), "%s,%d,%s,%llu," "%s,%s,%d,%llu", announce_ip, announce_port, sentinel.myid, (unsigned long long) sentinel.current_epoch, master->name,announceSentinelAddr(master_addr),master_addr->port, (unsigned long long) master->config_epoch); retval = redisAsyncCommand(ri->link->cc, sentinelPublishReplyCallback, ri, "%s %s %s", sentinelInstanceMapCommand(ri,"PUBLISH"), SENTINEL_HELLO_CHANNEL,payload); if (retval != C_OK) return C_ERR; ri->link->pending_commands++; return C_OK;}

生产操作

往__sentinel__:hello 频道发送的操作不在sentinelReconnectInstance方法里执行,而是在下面介绍sentinelSendPeriodicCommands方法下的sentinelSendHello方法,这个方法是是发送的操作,

int sentinelSendHello(sentinelRedisInstance *ri) { char ip[NET_IP_STR_LEN]; char payload[NET_IP_STR_LEN+1024]; int retval; char *announce_ip; int announce_port; // 主服务器节点实例 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master; sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master); // 如果处于关闭状态,则返回 if (ri->link->disconnected) return C_ERR; if (sentinel.announce_ip) { announce_ip = sentinel.announce_ip; } else { //否则直接使用主服务器节点地址 if (anetFdToString(ri->link->cc->c.fd,ip,sizeof(ip),NULL,FD_TO_SOCK_NAME) == -1) return C_ERR; // 获取主节点地址 announce_ip = ip; } // 获取端口 if (sentinel.announce_port) announce_port = sentinel.announce_port; else if (server.tls_replication && server.tls_port) announce_port = server.tls_port; else announce_port = server.port; // 相当于组装 hello 信息,包括 sentinel 地址端口、runid 和 纪元版本信息,主服务器节点 别名、地址端口 和 纪元版本信息 snprintf(payload,sizeof(payload), "%s,%d,%s,%llu," "%s,%s,%d,%llu", announce_ip, announce_port, sentinel.myid, (unsigned long long) sentinel.current_epoch, master->name,announceSentinelAddr(master_addr),master_addr->port, (unsigned long long) master->config_epoch); // 执行 PUBLISH,发送 hello 信息,并设置回调函数 retval = redisAsyncCommand(ri->link->cc, sentinelPublishReplyCallback, ri, "%s %s %s", sentinelInstanceMapCommand(ri,"PUBLISH"), SENTINEL_HELLO_CHANNEL,payload); if (retval != C_OK) return C_ERR; // 未回复命令个数+1 ri->link->pending_commands++; return C_OK;}

订阅操作

redisAsyncCommand这个方法主要是发送订阅 __sentinel__:hello 频道的命令,而发送生产是下面sentinelSendPeriodicCommands方法调用sentinelSendHello方法这里不做说明,感兴趣自己看一下源码吧
设置回调函数sentinelReceiveHelloMessages 处理回复,这里重点看一下回调函数

void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) { sentinelRedisInstance *ri = privdata; redisReply *r; UNUSED(c); if (!reply || !ri) return; r = reply; ri->link->pc_last_activity = mstime(); if (r->type != REDIS_REPLY_ARRAY || r->elements != 3 || r->element[0]->type != REDIS_REPLY_STRING || r->element[1]->type != REDIS_REPLY_STRING || r->element[2]->type != REDIS_REPLY_STRING || strcmp(r->element[0]->str,"message") != 0) return; if (strstr(r->element[2]->str,sentinel.myid) != NULL) return; sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);}

sentinelProcessHelloMessage方法是实际对订阅收到的消息做的处理

void sentinelProcessHelloMessage(char *hello, int hello_len) { int numtokens, port, removed, master_port; uint64_t current_epoch, master_config_epoch; char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens); sentinelRedisInstance *si, *master; if (numtokens == 8) { master = sentinelGetMasterByName(token[4]); if (!master) goto cleanup; port = atoi(token[1]); master_port = atoi(token[6]); si = getSentinelRedisInstanceByAddrAndRunID( master->sentinels,token[0],port,token[2]); current_epoch = strtoull(token[3],NULL,10); master_config_epoch = strtoull(token[7],NULL,10); if (!si) { removed = removeMatchingSentinelFromMaster(master,token[2]); if (removed) { sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master, "%@ ip %s port %d for %s", token[0],port,token[2]); } else { sentinelRedisInstance *other = getSentinelRedisInstanceByAddrAndRunID( master->sentinels, token[0],port,NULL); if (other) { sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@"); other->addr->port = 0; sentinelUpdateSentinelAddressInAllMasters(other); } } si = createSentinelRedisInstance(token[2],SRI_SENTINEL, token[0],port,master->quorum,master); if (si) { if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@"); si->runid = sdsnew(token[2]); sentinelTryConnectionSharing(si); if (removed) sentinelUpdateSentinelAddressInAllMasters(si); sentinelFlushConfig(); } } if (current_epoch > sentinel.current_epoch) { sentinel.current_epoch = current_epoch; sentinelFlushConfig(); sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu", (unsigned long long) sentinel.current_epoch); } if (si && master->config_epoch < master_config_epoch) { master->config_epoch = master_config_epoch; if (master_port != master->addr->port || !sentinelAddrEqualsHostname(master->addr, token[5])) { sentinelAddr *old_addr; sentinelEvent(LL_WARNING,"+config-update-from",si,"%@"); sentinelEvent(LL_WARNING,"+switch-master", master,"%s %s %d %s %d", master->name, announceSentinelAddr(master->addr), master->addr->port, token[5], master_port); old_addr = dupSentinelAddr(master->addr); sentinelResetMasterAndChangeAddress(master, token[5], master_port); sentinelCallClientReconfscript(master, SENTINEL_OBSERVER,"start", old_addr,master->addr); releaseSentinelAddr(old_addr); } } if (si) si->last_hello_time = mstime(); }cleanup: sdsfreesplitres(token,numtokens);}

sentinelSendPeriodicCommands(发送监控命令的函数)

void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) { mstime_t now = mstime(); mstime_t info_period, ping_period; int retval; if (ri->link->disconnected) return; // 对于不是 INFO、PING、PUBLISH 等关键命令的时候,会有 SENTINEL_MAX_PENDING_COMMANDS 的限制,这样的设计是为了冗余保护措施, //如果检测到超时条件,连接将被断开重新连接,每个实例已经发送未回复的命令不能超过 SENTINEL_MAX_PENDING_COMMANDS(默认100),否则直接返回 if (ri->link->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return; if ((ri->flags & SRI_SLAVE) && ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) || (ri->master_link_down_time != 0))) { // 监控频率修改为 1秒,即上面说的每秒 info_period = 1000; } else { // 否则默认的 10秒 info_period = SENTINEL_INFO_PERIOD; } // 主观下线判断时间 ping_period = ri->down_after_period; // 最小1秒的频率 if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD; if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) { // 发送 INFO 命令 retval = redisAsyncCommand(ri->link->cc, sentinelInfoReplyCallback, ri, "%s", sentinelInstanceMapCommand(ri,"INFO")); // 已经发送未回复的命令数 +1 if (retval == C_OK) ri->link->pending_commands++; } if ((now - ri->link->last_pong_time) > ping_period && (now - ri->link->last_ping_time) > ping_period/2) {// 如果发送和回复 PING 命令超时 // 立即发送一个 PING 命令,并更新 act_ping_time sentinelSendPing(ri); } if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {//如果发送 PUBLISH 的命令超时 // 立即发送 PUBLISH 信息给实例 sentinelSendHello(ri); }}

sentinelInfoReplyCallback(Info回调函数)

void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { sentinelRedisInstance *ri = privdata; instancelink *link = c->data; redisReply *r; if (!reply || !link) return; link->pending_commands--; r = reply; if (r->type == REDIS_REPLY_STRING) sentinelRefreshInstanceInfo(ri,r->str);}

实际的回调函数是执行sentinelRefreshInstanceInfo方法,收集响应消息和处理角色变化

void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { sds *lines; int numlines, j; int role = 0; sdsfree(ri->info); ri->info = sdsnew(info); // 重置从节点复制断开时间,防止在 INFO 输出中找不到该信息 ri->master_link_down_time = 0; // 一行一行的进行遍历,并把行数放到 numlines 中 lines = sdssplitlen(info,strlen(info),"rn",2,&numlines); // 逐行处理 for (j = 0; j < numlines; j++) { sentinelRedisInstance *slave; // 当前行内容 sds l = lines[j]; // runid 这行的格式为 "run_id:de4a10ef73028f75e4bd9d6e345544b8ed5e9e09rn" if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) { // 如果原 runid 为空,则更新为当前 l 内容中的 run_id 内容 if (ri->runid == NULL) { ri->runid = sdsnewlen(l+7,40); } else { // 如果原 runid 不为空,则更新 if (strncmp(ri->runid,l+7,40) != 0) { // 事件通知 sentinelEvent(LL_NOTICE,"+reboot",ri,"%@"); // 清空原来 runid sdsfree(ri->runid); // 重赋值 ri->runid = sdsnewlen(l+7,40); } } } //读取从服务器节点的 ip 和 port 信息 if ((ri->flags & SRI_MASTER) && sdslen(l) >= 7 && !memcmp(l,"slave",5) && isdigit(l[5])) { char *ip, *port, *end; if (strstr(l,"ip=") == NULL) { // 老版本,分别定位 ip 和 port 信息 ip = strchr(l,':'); if (!ip) continue; ip++; port = strchr(ip,','); if (!port) continue; *port = ''; port++; end = strchr(port,','); if (!end) continue; *end = ''; } else { ip = strstr(l,"ip="); if (!ip) continue; ip += 3; port = strstr(l,"port="); if (!port) continue; port += 5; end = strchr(ip,','); if (end) *end = ''; end = strchr(port,','); if (end) *end = ''; } if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) { if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip, atoi(port), ri->quorum, ri)) != NULL) { // 发送事件通知 sentinelEvent(LL_NOTICE,"+slave",slave,"%@"); // 刷新 config sentinelFlushConfig(); } } } if (sdslen(l) >= 32 && !memcmp(l,"master_link_down_since_seconds",30)) { ri->master_link_down_time = strtoll(l+31,NULL,10)*1000; } if (sdslen(l) >= 11 && !memcmp(l,"role:master",11)) role = SRI_MASTER; else if (sdslen(l) >= 10 && !memcmp(l,"role:slave",10)) role = SRI_SLAVE; // 如果是从服务器节点 if (role == SRI_SLAVE) { if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) { if (ri->slave_master_host == NULL || strcasecmp(l+12,ri->slave_master_host)) { sdsfree(ri->slave_master_host); ri->slave_master_host = sdsnew(l+12); ri->slave_conf_change_time = mstime(); } } if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12)) { int slave_master_port = atoi(l+12); if (ri->slave_master_port != slave_master_port) { ri->slave_master_port = slave_master_port; ri->slave_conf_change_time = mstime(); } } if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) { ri->slave_master_link_status = (strcasecmp(l+19,"up") == 0) ? SENTINEL_MASTER_link_STATUS_UP : SENTINEL_MASTER_link_STATUS_DOWN; } if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15)) ri->slave_priority = atoi(l+15); if (sdslen(l) >= 18 && !memcmp(l,"slave_repl_offset:",18)) ri->slave_repl_offset = strtoull(l+18,NULL,10); if (sdslen(l) >= 18 && !memcmp(l,"replica_announced:",18)) ri->replica_announced = atoi(l+18); } } // 获取最新的 INFO 时间 ri->info_refresh = mstime(); sdsfreesplitres(lines,numlines); // 如果角色变换了,更新角色和角色变换时间 if (role != ri->role_reported) { ri->role_reported_time = mstime(); ri->role_reported = role; if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime(); sentinelEvent(LL_VERBOSE, ((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ? "+role-change" : "-role-change", ri, "%@ new reported role is %s", role == SRI_MASTER ? "master" : "slave"); } if (sentinel.tilt) return; //如果实例本身是主服务器节点但是 INFO 回复的却是从服务器节点,原因一般是 sentinel 判断不可达,发生了故障转移所致,这里不进行处理 if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) { } // 如果实例本身是从服务器节点但是 INFO 回复的却是主服务器节点,那就是因为故障转移给升级到主服务器节点 if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) { // 如果当前为被晋升的从服务器节点,并且主服务器节点在等待从服务器节点去晋升为主服务器节点 if ((ri->flags & SRI_PROMOTED) && (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) && (ri->master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_PROMOTION)) { // 要确保这个从服务器节点被配置为主节点,并且将纪元设置为故障转移优先级最高的纪元,这样才能强制其他 sentinel 节点取更新他们的配置,更新旧的主节点的纪元 ri->master->config_epoch = ri->master->failover_epoch; // 设置旧的主节点的故障转移状态为SENTINEL_FAILOVER_STATE_RECONF_SLAVES,该状态会向旧的主节点所属的从节点发送通知让他们向新的主节点发起复制操作 ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES; // 设置故障转移状态改变的时间 ri->master->failover_state_change_time = mstime(); // 刷新配置 sentinelFlushConfig(); // 事件通知 sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@"); // 如果开启了故障模拟标识,而且是晋升之后发生故障 if (sentinel.simfailure_flags &SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION) // 那么就退出程序 sentinelSimFailureCrash(); // 事件通知 sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",ri->master,"%@"); // 重新配置脚本属性,放入脚本队列 sentinelCallClientReconfscript(ri->master,SENTINEL_LEADER,"start",ri->master->addr,ri->addr); // 强行发送 hello 给所有的 redis 节点 和 sentinel 实例去关联指定的主服务器节点实例 sentinelForceHelloUpdateForMaster(ri->master); } else { // 从节点晋升为了主节点,我们要强制其重新配置成从节点,等待一些时间,然后无论如何都要让其接收新的配置,默认计算等待的时间为 8s mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4; // 当前实例已经成为主节点,且在最近的 wait_time 毫秒内,如果当前实例没有出现下线状态,并且实例的主节点实例看起来很健壮,并且距离角色更新的时间已经超过缓冲的 wait_time if (!(ri->flags & SRI_PROMOTED) && sentinelMasterLooksSane(ri->master) && sentinelRedisInstanceNoDownFor(ri,wait_time) && mstime() - ri->role_reported_time > wait_time) { // 发送 slaveof 命令,使其成为主节点 int retval = sentinelSendSlaveOf(ri,ri->master->addr); if (retval == C_OK) sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@"); } } } // 如果当前实例为从服务器节点,而且 INFO 的回复确实也是 从服务器节点,但是 主服务器节点的 ip/port 发生了改变,那么需要 从服务器节点重新指向并复制修改过 ip/port 信息的主节点 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE && (ri->slave_master_port != ri->master->addr->port || !sentinelAddrEqualsHostname(ri->master->addr, ri->slave_master_host))) { // 故障转移超时时间 mstime_t wait_time = ri->master->failover_timeout; // wait_time 时间内当前实例没有出现下线状态,而且主节点变换 ip/port 信息已经超过了 wait_time if (sentinelMasterLooksSane(ri->master) && sentinelRedisInstanceNoDownFor(ri,wait_time) && mstime() - ri->slave_conf_change_time > wait_time) { // 改变 slaveof 的目标地址 int retval = sentinelSendSlaveOf(ri,ri->master->addr); if (retval == C_OK) sentinelEvent(LL_NOTICE,"+fix-slave-config",ri,"%@"); } } // 如果当前实例为从服务器节点,并且 INFO 传递过来的信息确实也是从节点,并且 sentinel 向当前实例发送了 slaveof 命令 或者 当前实例已经在同步新的主节点了 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE && (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))) { // 将 SRI_RECONF_SENT 状态改为 SRI_RECONF_INPROG,因为 sentinel 向 实例发送了 slaveof 命令,但是当前实例所属主节点的地址已经和新主节点地址相同,所以,将状态给为 SRI_RECONF_INPROG,表示正在同步复制操作 if ((ri->flags & SRI_RECONF_SENT) && ri->slave_master_host && sentinelAddrEqualsHostname(ri->master->promoted_slave->addr, ri->slave_master_host) && ri->slave_master_port == ri->master->promoted_slave->addr->port) { ri->flags &= ~SRI_RECONF_SENT; ri->flags |= SRI_RECONF_INPROG; sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@"); } // 将状态由 SRI_RECONF_INPROG 改为 SRI_RECONF_DONE,表示同步完成 if ((ri->flags & SRI_RECONF_INPROG) && ri->slave_master_link_status == SENTINEL_MASTER_link_STATUS_UP) { ri->flags &= ~SRI_RECONF_INPROG; ri->flags |= SRI_RECONF_DONE; sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@"); } }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。