ホームページ > バックエンド開発 > C#.Net チュートリアル > C言語によるRaft実装(コード付き)

C言語によるRaft実装(コード付き)

烟雨青岚
リリース: 2020-06-18 17:44:57
転載
6571 人が閲覧しました

C言語によるRaft実装(コード付き)

C 言語 Raft 実装 (コード付き)

1. はじめに

この記事では、簡単な Raft 実装を紹介します。 Raft 論文を読んでいれば、実装の詳細は Raft 論文に詳しく記載されているため、この Raft 実装は読みやすいと思いますが、エンジニアリング実装は基本的に Raft 論文の記述を再表現することになります。プログラミング言語で。これが Paxos に対する Raft の最大の利点です。つまり、理解と実装が簡単です。この記事で紹介するRaftの実装はC言語で記述されており、ログ圧縮機能以外の機能は実装されており、メンバー変更の仕組みも比較的シンプルで、一度に1つの設定変更のみをサポートしています。 Raft の原則については、Raft の論文と「Raft Understanding」を参照してください。

2. Raft の基本概念

2.1 ステータス

raft には、リーダー、候補、フォロワーの 3 つの状態があります。これら 3 つの状態間の遷移を次の図に示します。リーダーのみが顧客のリクエストを処理し、ログをフォロワーにコピーする権利を持っています。候補はフォロワーとリーダーの間の中間状態です。クラスター内にリーダーがいない場合、フォロワーは候補状態に入り、クラスターへの投票を開始します。過半数の票を獲得したフォロワーがリーダーになります。

2.2 メッセージ

Raft では、プロトコルの分かりやすさを向上させるために、メッセージ タイプの設定を簡略化し、次の 2 つのリクエストのみを備えています。 [インデント]
requestVote 投票リクエストを開始します。投票を開始する際の候補者のリクエスト。これは、クラスター内の他のフォロワーおよび候補によって受信され、処理されます。
appendEntries ログ リクエストを追加します。ログをフォロワーに追加するときにリーダーによって発行されるリクエスト。 [/indent]

2.3 用語番号

用語番号の用語は、時間の新旧の関係を示すために Raft プロトコルで使用されます。各リーダーの任期は不変ですが、リーダーごとに完全に異なり、時間とともに単調に増加します。リクエスト A の期間がリクエスト B の期間より長い場合、リクエスト B は期限切れになります。

3.Raft の実装

3.1 プロトコル

まず、上記のデータ構造に対応する 4 つの重要なデータ構造を紹介します。そして、appendEntries のリクエストと応答。

/** requestVote 请求投票
   * 竞选者Candidate去竞选Leader时发送给其它node的投票请求。
   * 其它Leader或者Candidate收到term比自己大的投票请求时,会自动变成Follower*/
typedef struct
{
    /** 当前任期号,通过任期号的大小与其它Candidate竞争Leader */
    int term;

    /** 竞选者的id */
    int candidate_id;

    /** 竞选者本地保存的最新一条日志的index */
    int last_log_idx;

    /** 竞选者本地保存的最新一条日志的任期号*/
    int last_log_term;
} msg_requestvote_t;


/** 投票请求的回复response.
  * 该response主要是给返回某个node是否接收了Candidate的投票请求. */
typedef struct
{
    /** node的任期号,Candidate根据投票结果和node的任期号来更新自己的任期号 */
    int term;

    /** 投票结果,如果node给Candidate投票则为true */
    int vote_granted;
} msg_requestvote_response_t;

/**  添加日志请求.
  * Follower可以从该消息中知道哪些日志可以安全地提交到状态机FSM中去。
  * Leader可以将该消息作为心跳消息定期发送。
  * 旧的Leader和Candidate收到该消息后可能会自动变成Follower */
typedef struct
{
    /** Leader当前的任期号 */
    int term;

    /** 最新日志的前一条日志的index,用于Follower确认与Leader的日志完全一致 */
    int prev_log_idx;

    /** 最新日志的前一条日志的任期号term */
    int prev_log_term;

    /** leader当前已经确认提交到状态机FSM的日志索引index,这意味着Follower也可以安全地将该索引index以前的日志提交 */
    int leader_commit;

    /** 这条添加日志消息携带的日志条数,该实现中最多只有一条 */
    int n_entries;

    /** 这条添加日志消息中携带的日志数组 */
    msg_entry_t* entries;
} msg_appendentries_t;

/** 添加日志回复.
 * 旧的Leader或Candidate收到该消息会变成Follower */
typedef struct
{
    /** 当前任期号 */
    int term;

    /** node成功添加日志时返回ture,即prev_log_index和prev_log_term都比对成功。否则返回false */
    int success;

    /* 下面两个字段不是Raft论文中规定的字段:
    /* 用来优化日志追加过程,以加速日志的追加。Raft原文中的追加过程是一次只能追加一条日志*/

    /** 处理添加日志请求后本地的最大日志索引 */
    int current_idx;

    /** 从添加日志请求中接受的第一条日志索引 */
    int first_idx;
} msg_appendentries_response_t;
ログイン後にコピー

3.2 2 つの重要な抽象化

##raft_server_private_t この構造体は Raft 実装の抽象化であり、ステータスとすべてのデータを保存します。 Raftプロトコルの運用中に必要となります。

typedef struct {
    /* 所有服务器比较固定的状态: */

    /* 服务器最后一次知道的任期号(初始化为 0,持续递增) */
    int current_term;

    /* 记录在当前分期内给哪个Candidate投过票,
       */
    int voted_for;

    /* 日志条目集;每一个条目包含一个用户状态机执行的指令,和收到时的任期号 */
    void* log;

    /* 变动比较频繁的变量: */

    /* 已知的最大的已经被提交的日志条目的索引值 */
    int commit_idx;

    /* 最后被应用到状态机的日志条目索引值(初始化为 0,持续递增) */
    int last_applied_idx;

    /* 三种状态:follower/leader/candidate */
    int state;

    /* 计时器,周期函数每次执行时会递增改值 */
    int timeout_elapsed;

    raft_node_t* nodes;
    int num_nodes;

    int election_timeout;
    int request_timeout;

    /* 保存Leader的信息,没有Leader时为NULL */
    raft_node_t* current_leader;

    /* callbacks,由调用该raft实现的调用者来实现,网络IO和持久存储
     * 都由调用者在callback中实现 */
    raft_cbs_t cb;
    void* udata;

    /* 自己的信息 */
    raft_node_t* node;

    /* 该raft实现每次只进行一个服务器的配置更改,该变量记录raft server
     * 是否正在进行配置更改*/
    int voting_cfg_change_log_idx;
} raft_server_private_t;
ログイン後にコピー

raft_node_private_t クラスター内のマシン ノードの抽象本体には、raft プロトコルの実行中に保存する必要がある他のマシンに関する情報が含まれています

typedef struct
{
    void* udata;  /*一般保存与其它机器的连接信息,由使用者决定怎么实现连接*/

    int next_idx; /*对于每一个服务器,需要发送给他的下一个日志条目的索引值(初始化为领导人最后索引值加一)*/
    int match_idx; /*对于每一个服务器,已经复制给他的日志的最高索引值*/

    int flags; /*有三种取值,是相或的关系 1:该机器有给我投票 2:该机器有投票权  3: 该机器有最新的日志*/

    int id; /*机器对应的id值,这个每台机器在全局都是唯一的*/
} raft_node_private_t;
ログイン後にコピー

# # 3.3 Raft プロトコル プロセス

定期関数

Raft はいくつかのことを定期的に実行する必要があります。たとえば、リーダーは定期的にログを他のサーバーに追加する必要があります。追いつくチャンスがある; すべてのサーバーは、確認されたコミット ログをステート マシンなどに定期的に適用する必要があります。 raft_periodic関数はraft実装内で定期的に呼び出される関数であり、呼び出し周期は1000msです。マシンは、この関数でさまざまな状態でさまざまな処理を実行します。リーダーは定期的にログをフォロワーに同期します。 Follower は、一定期間内に Leader からのハートビート パケットを受信して​​いないかどうかを定期的に検出し、受信している場合は Candidate となり、Leader への投票を開始します。リーダーとフォロワーは両方とも、送信されたログをステート マシン FSM に定期的にコミットします。

/** raft周期性执行的函数,实现raft中的定时器以及定期应用日志到状态机
  */
int raft_periodic(raft_server_t* me_, int msec_since_last_period)
{
    raft_server_private_t* me = (raft_server_private_t*)me_;

    /* 选举计时器;Follower每次收到Leader的心跳后会重置清0,Leader每次发送日志也会清0 */
    me->timeout_elapsed += msec_since_last_period;

    /* Leader周期性地向Follower同步日志 */
    if (me->state == RAFT_STATE_LEADER) 
    {
        if (me->request_timeout <= me->timeout_elapsed)
            raft_send_appendentries_all(me_);
    }
    /* Follower检测选举计时器是否超时 */
    else if (me->election_timeout <= me->timeout_elapsed)
    {
        if (1 < me->num_nodes)
            raft_election_start(me_);
    }

    /* 周期性地将已经确认commit的日志应用到状态机FSM */
    if (me->last_applied_idx < me->commit_idx)
        if (-1 == raft_apply_entry(me_))
            return -1;

    return 0;
}
ログイン後にコピー

候補者になる

クラスター内の各サーバーには選挙タイマーがあります。タイマーのタイムアウト期間内にサーバーがリーダーからハートビートを受信しない場合、クラスターは考慮されます。サーバーにリーダーがいない、またはリーダーがダウンしている場合、サーバーは候補となり、リーダーに立候補するための投票を開始します。次の raft_become_candidate 関数は、サーバーが候補になるための関数です。この関数は主に次のことを行います。内容:
現在の任期番号 (currentTerm) を増やす

#自分に投票する

#選挙タイムアウト タイマーをリセットする

送信 投票を要求する RPC は他のすべてのサーバーに送信されます

/** Follower成为Candidate执行的函数
  */
void raft_become_candidate(raft_server_t* me_)
{
    raft_server_private_t* me = (raft_server_private_t*)me_;
    int i;

    /*自增当前的任期号;给自己投票,设置自己的状态为CANDIDATE*/
    raft_set_current_term(me_, raft_get_current_term(me_) + 1);
    for (i = 0; i < me->num_nodes; i++)
        raft_node_vote_for_me(me->nodes[i], 0);
    raft_vote(me_, me->node);
    me->current_leader = NULL;
    raft_set_state(me_, RAFT_STATE_CANDIDATE);

    /* 重置选举超时计时器。为了防止多个Candidate竞争,将下一次发起投票的时间间隔设置成随机值*/
    /* TODO: this should probably be lower */
    me->timeout_elapsed = rand() % me->election_timeout;

    /*发送请求投票的 RPC 给其他所有服务器*/
    for (i = 0; i < me->num_nodes; i++)
        if (me->node != me->nodes[i] && raft_node_is_voting(me->nodes[i]))
            raft_send_requestvote(me_, me->nodes[i]);
}
ログイン後にコピー

投票リクエストの処理

投票リクエストを処理するロジックは、主に投票に同意するかどうかを決定することです。判定はリクエスト内の用語番号とログ情報鮮度と古さのレベル、同じ用語番号を持つ他のサーバーに投票したかどうかです。以前に投票したことがある場合、再度投票することはできません。1 人につき 1 票のみです。

term > currentTerm の場合、フォロワー モードに切り替えます。 ここで投票リクエストを受信するサーバーは、ネットワーク状態が悪いリーダーであるか、まだ投票リクエストを発行する時間がない候補者である可能性があります。彼らは、自分の任期番号よりも新しい任期番号を持つリクエストを受信した後、次のことを行う必要があります。無条件でフォロワーになり、リーダーが 1 人だけ存在するようにします。

如果term < currentTerm返回false。请求中的term比自己的term还要小,说明是一个过时的请求,则不给它投票返回false。

如果 term == currentTerm,请求中的日志信息不比本地日志旧,并且尚未给其它Candidate投过票,那么就投票给他

/** 处理投票请求 
*/
int raft_recv_requestvote(raft_server_t* me_,
                          raft_node_t* node,
                          msg_requestvote_t* vr,
                          msg_requestvote_response_t *r)
{
    raft_server_private_t* me = (raft_server_private_t*)me_;

    /*如果请求中term > 本地currentTerm, 则转为Follower模式*/
    if (raft_get_current_term(me_) < vr->term)
    {
        raft_set_current_term(me_, vr->term);
        raft_become_follower(me_);
    }

    /*如果需要投票,则回复true,即将r->vote_granted = 1;*/
    if (__should_grant_vote(me, vr))
    {
        assert(!(raft_is_leader(me_) || raft_is_candidate(me_)));

        /*同意投票--本地记录给哪个服务器投了票,并设置response中的vote_granted为1*/
        raft_vote_for_nodeid(me_, vr->candidate_id);
        r->vote_granted = 1;

        /* there must be in an election. */
        me->current_leader = NULL;

        me->timeout_elapsed = 0;
    }
    else
        r->vote_granted = 0;

    __log(me_, node, "node requested vote: %d replying: %s",
          node, r->vote_granted == 1 ? "granted" : "not granted");

    /*更新本地保存的任期号,与请求中的保持一致*/
    r->term = raft_get_current_term(me_);
    return 0;
}

/** 检查是否满足投票的条件 
*/
static int __should_grant_vote(raft_server_private_t* me, msg_requestvote_t* vr)
{
    /**请求中的任期号term比本地term要小,不给投票*/
    if (vr->term < raft_get_current_term((void*)me))
        return 0;

    /*如果已经投过票了,返回false*/
    /* TODO: if voted for is candiate return 1 (if below checks pass) */
    if (raft_already_voted((void*)me))
        return 0;

    /* 下面代码检查请求中日志信息是否比本地日志新*/

    /* 获取本地最新的日志索引 */
    int current_idx = raft_get_current_idx((void*)me);

    /* 本地日志为空,请求中的日志信息绝对比本地要新,返回true */
    if (0 == current_idx)
        return 1;

    /* 如果本地最新日志中的任期号比请求中的last_log_term要小,则返回true */
    raft_entry_t* e = raft_get_entry_from_idx((void*)me, current_idx);
    if (e->term < vr->last_log_term)
        return 1;

    /* 本地最新日志中的任期号与请求中的last_log_term相等,则比较日志索引,索引比较大的说明日志比较新*/
    if (vr->last_log_term == e->term && current_idx <= vr->last_log_idx)
        return 1;

    /*果本地最新日志中的任期号比请求中的last_log_term要大,则返回false */
    return 0;
}
ログイン後にコピー
  • 收到投票回复 Candidate收到投票回复后,检查是否给自己投了票,如果投了票则统计当前收到的投票总数,超过一半则成为Leader
/** 处理投票恢复 
*/
int raft_recv_requestvote_response(raft_server_t* me_,
                                   raft_node_t* node,
                                   msg_requestvote_response_t* r)
{
    raft_server_private_t* me = (raft_server_private_t*)me_;

    __log(me_, node, "node responded to requestvote status: %s",
          r->vote_granted == 1 ? "granted" : "not granted");

    /* Oh~我不是Candidate,直接返回 */
    if (!raft_is_candidate(me_))
    {
        return 0;
    }
    /* response中的任期号比自己的大,说明自己的term已经过时,无条件转为Follower */
    else if (raft_get_current_term(me_) < r->term)
    {
        raft_set_current_term(me_, r->term);
        raft_become_follower(me_);
        return 0;
    }
    /* response中的任期号比自己小,说明收到了一个过时的response,忽略即可。
     * 当网络比较差的时候容易出现这种情况 */
    else if (raft_get_current_term(me_) != r->term)
    {
        return 0;
    }

    __log(me_, node, "node responded to requestvote: %d status: %s ct:%d rt:%d",
          node, r->vote_granted == 1 ? "granted" : "not granted",
          me->current_term,
          r->term);

    /* Yeah~给我投票了 */
    if (1 == r->vote_granted)
    {
        /* 记录给自己投票的服务器信息 */
        if (node)
            raft_node_vote_for_me(node, 1);
        int votes = raft_get_nvotes_for_me(me_);
        /* 如果给自己投票的服务器超过了总数的一般,则成为Leader */
        if (raft_votes_is_majority(me->num_nodes, votes))
            raft_become_leader(me_);
    }

    return 0;
}
ログイン後にコピー

添加日志请求 Leader除了在收到客户端请求后会发起添加日志请求,还会在周期函数raft_periodic中发起添加日志请求。Leader维护了所有Follower的日志情况,如果Follower的日志比较旧,就会周期性地给它发送添加日志请求。关于日志怎么同步和保持一致性的原理,可以阅读raft论文5.3节--日志复制。简单地说就是,Leader在给Follower发送一条日志N时,会顺带将前一条日志M的信息也带过去。Follower会检查请求中前一条日志M的信息与本地相同索引的日志是否吻合,如果吻合说明本地在M以前的所有日志都是和Leader一致的(raft论文中使用递归法证明,因为所有日志都是按照同样的规则添加的)。

/** 给某个Follower发送添加日志请求
  */
int raft_send_appendentries(raft_server_t* me_, raft_node_t* node)
{
    raft_server_private_t* me = (raft_server_private_t*)me_;

    assert(node);
    assert(node != me->node);

    /* callback函数,实现网络发送功能,由使用该raft实现的调用者实现网络IO功能*/
    if (!(me->cb.send_appendentries))
        return -1;

    /* 初始化请求的参数-- 当前任期号、最新日志索引 */
    msg_appendentries_t ae;
    ae.term = me->current_term;
    ae.leader_commit = raft_get_commit_idx(me_);
    ae.prev_log_idx = 0;
    ae.prev_log_term = 0;
    ae.n_entries = 0;
    ae.entries = NULL;

    /* 根据记录的Follower的日志信息,获取要发给Follower的下一条日志索引 */
    int next_idx = raft_node_get_next_idx(node);

    msg_entry_t mety;

    /* 添加下一条日志的内容*/
    raft_entry_t* ety = raft_get_entry_from_idx(me_, next_idx);
    if (ety)
    {
        mety.term = ety->term;
        mety.id = ety->id;
        mety.type = ety->type;
        mety.data.len = ety->data.len;
        mety.data.buf = ety->data.buf;
        ae.entries = &mety;
        // TODO: we want to send more than 1 at a time
        ae.n_entries = 1;
    }

    /* 添加要添加日志的前一条日志信息,用来做日志一致性检查,关于怎么保证
     * Leader和Follower日志的一致性,可参看raft论文第5.3节--日志复制*/
    if (1 < next_idx)
    {
        raft_entry_t* prev_ety = raft_get_entry_from_idx(me_, next_idx - 1);
        ae.prev_log_idx = next_idx - 1;
        if (prev_ety)
            ae.prev_log_term = prev_ety->term;
    }

    __log(me_, node, "sending appendentries node: ci:%d t:%d lc:%d pli:%d plt:%d",
          raft_get_current_idx(me_),
          ae.term,
          ae.leader_commit,
          ae.prev_log_idx,
          ae.prev_log_term);

    /* 调用callback发送请求,callback由该raft实现的调用者来实现*/
    me->cb.send_appendentries(me_, me->udata, node, &ae);

    return 0;
}
ログイン後にコピー

处理添加日志请求 所有的服务器都有可能收到添加日志请求,比如过时的Leader和Candidate以及正常运行的Follower。处理添加日志请求的过程主要就是验证请求中的日志是否比本地日志新的过程。

/*
1. 处理任期号的三种情况(大于等于和小于)
2. 处理prev log不一致的情况,返回包中告诉Leader自己目前的log情况
3. 处理添加日志成功的情况-- 保存新日志并更新current_idx和commit_idx
*/
int raft_recv_appendentries(
    raft_server_t* me_,
    raft_node_t* node,
    msg_appendentries_t* ae,
    msg_appendentries_response_t *r
    )
{
    raft_server_private_t* me = (raft_server_private_t*)me_;

    me->timeout_elapsed = 0;

    if (0 < ae->n_entries)
        __log(me_, node, "recvd appendentries from: %lx, t:%d ci:%d lc:%d pli:%d plt:%d #%d",
              node,
              ae->term,
              raft_get_current_idx(me_),
              ae->leader_commit,
              ae->prev_log_idx,
              ae->prev_log_term,
              ae->n_entries);

    r->term = me->current_term;

    /* 处理任期号 */
    /* currentTerm == ae->term,当自己是Candidate时收到term与自己相等的请求,
     * 说明已经有其它Candidate成为了Leader,自己无条件变成Follower*/
    if (raft_is_candidate(me_) && me->current_term == ae->term)
    {
        me->voted_for = -1;
        raft_become_follower(me_);
    }
    /* currentTerm < ae->term. 自己的任期号已经落后Leader,无条件成为Follower,并且更新自己的term*/
    else if (me->current_term < ae->term)
    {
        raft_set_current_term(me_, ae->term);
        r->term = ae->term;
        raft_become_follower(me_);
    }
    /* currentTerm > ae->term. 说明收到一个过时Leader的请求,直接回包告诉它最新的term */
    else if (ae->term < me->current_term)
    {
        /* 1. Reply false if term < currentTerm (§5.1) */
        __log(me_, node, "AE term %d is less than current term %d",
              ae->term, me->current_term);
        goto fail_with_current_idx;
    }


    /* NOTE: the log starts at 1 */
    /* 检查请求中prev_log_idx日志的term与本地对应索引的term是否一致 */
    if (0 < ae->prev_log_idx)
    {
        raft_entry_t* e = raft_get_entry_from_idx(me_, ae->prev_log_idx);

        /*  本地在prev_log_idx位置还不存在日志,说明日志已经落后Leader了,返回false
         *    并告诉leader自己当前日志的位置,这样Leader知道下一次该发哪条日志过来了*/
        if (!e)
        {
            __log(me_, node, "AE no log at prev_idx %d", ae->prev_log_idx);
            goto fail_with_current_idx;
        }
        if (raft_get_current_idx(me_) < ae->prev_log_idx)
            goto fail_with_current_idx;

        /* 本地在prev_log_idx位置的日志的term与请求中的prev_log_term不一致,
         * 此时本地无条件删除本地与请求不一致的日志,并向Leader返回删除后的日志位置*/
        if (e->term != ae->prev_log_term)
        {
            __log(me_, node, "AE term doesn&#39;t match prev_term (ie. %d vs %d) ci:%d pli:%d",
                  e->term, ae->prev_log_term, raft_get_current_idx(me_), ae->prev_log_idx);
            assert(me->commit_idx < ae->prev_log_idx);
            /* Delete all the following log entries because they don&#39;t match */
            log_delete(me->log, ae->prev_log_idx);
            r->current_idx = ae->prev_log_idx - 1;
            goto fail;
        }
    }

    /* 本地的日志比Leader要多。当本地服务器曾经是Leader,收到了很多客户端请求
     * 并还没来得及同步时会出现这种情况。这时本地无条件删除比Leader多的日志 */
    if (ae->n_entries == 0 && 0 < ae->prev_log_idx && ae->prev_log_idx + 1 < raft_get_current_idx(me_))
    {
        assert(me->commit_idx < ae->prev_log_idx + 1);
        log_delete(me->log, ae->prev_log_idx + 1);
    }

    r->current_idx = ae->prev_log_idx;

    /* 下面for循环跳过请求中已经在本地添加过的日志*/
    int i;
    for (i = 0; i < ae->n_entries; i++)
    {
        msg_entry_t* ety = &ae->entries[i];
        int ety_index = ae->prev_log_idx + 1 + i;
        raft_entry_t* existing_ety = raft_get_entry_from_idx(me_, ety_index);
        r->current_idx = ety_index;
        if (existing_ety && existing_ety->term != ety->term)
        {
            assert(me->commit_idx < ety_index);
            log_delete(me->log, ety_index);
            break;
        }
        else if (!existing_ety)
            break;
    }

    /* 下面for循环将请求中确认的新日志添加到本地 */
    for (; i < ae->n_entries; i++)
    {
        int e = raft_append_entry(me_, &ae->entries[i]);
        if (-1 == e)
            goto fail_with_current_idx;

        r->current_idx = ae->prev_log_idx + 1 + i;
    }

    /* 4. 请求中携带了Leader已经提交到状态机的日志索引,本地同样也更新这个索引,将其
     *    设置为本地最大日志索引和leader_commit中的较小者*/
    if (raft_get_commit_idx(me_) < ae->leader_commit)
    {
        int last_log_idx = max(raft_get_current_idx(me_), 1);
        raft_set_commit_idx(me_, min(last_log_idx, ae->leader_commit));
    }

    /* 更新Leader信息 */
    me->current_leader = node;

    r->success = 1;
    r->first_idx = ae->prev_log_idx + 1;
    return 0;

fail_with_current_idx:
    r->current_idx = raft_get_current_idx(me_);
fail:
    r->success = 0;
    r->first_idx = 0;
    return -1;
}
ログイン後にコピー

处理添加日志请求回复 Leader收到添加日志回复后,可以知道下面这些信息:

自己是不是已经过时(current_term < response->term即为过时)

follower是否成功添加日志,如果添加失败,则减小发给follower的日志索引nextIndex再重试;如果添加成功则更新本地记录的follower日志信息,并检查日志是否最新,如果不是最新则继续发送添加日志请求。

新机器的日志添加,详见3.4节-- 成员变更

/** 处理添加日志请求回复
  * /
int raft_recv_appendentries_response(raft_server_t* me_,
                                     raft_node_t* node,
                                     msg_appendentries_response_t* r)
{
    raft_server_private_t* me = (raft_server_private_t*)me_;

    __log(me_, node,
          "received appendentries response %s ci:%d rci:%d 1stidx:%d",
          r->success == 1 ? "SUCCESS" : "fail",
          raft_get_current_idx(me_),
          r->current_idx,
          r->first_idx);

    /* 过时的回复 -- 忽略 */
    if (r->current_idx != 0 && r->current_idx <= raft_node_get_match_idx(node))
        return 0;

    /* oh~我不是Leader */
    if (!raft_is_leader(me_))
        return -1;

    /* 回复中的term比自己的要大,说明自己是一个过时的Leader,无条件转为Follower */
    if (me->current_term < r->term)
    {
        raft_set_current_term(me_, r->term);
        raft_become_follower(me_);
        return 0;
    }
    /* 过时的回复,网络状况不好时会出现 */
    else if (me->current_term != r->term)
        return 0;

    /* stop processing, this is a node we don&#39;t have in our configuration */
    if (!node)
        return 0;

    /* 由于日志不一致导致添加日志不成功*/
    if (0 == r->success)
    {
        assert(0 <= raft_node_get_next_idx(node));

        /* 将nextIdex减*/
        int next_idx = raft_node_get_next_idx(node);
        assert(0 <= next_idx);
        /* Follower的日志数量还远远少于Leader,将nextIdex设为回复中的current_idx+1和Leader
         * 当前索引中较小的一个,一般回复中的current_idx+1会比较小*/
        if (r->current_idx < next_idx - 1)
            raft_node_set_next_idx(node, min(r->current_idx + 1, raft_get_current_idx(me_)));
        /* Follower的日志数量和Leader差不多,但是比对前一条日志时失败,这种情况将next_idx减1
         * 重试*/
        else
            raft_node_set_next_idx(node, next_idx - 1);

        /* 使用更新后的nextIdx重新发送添加日志请求 */
        raft_send_appendentries(me_, node);
        return 0;
    }

    assert(r->current_idx <= raft_get_current_idx(me_));

    /* 下面处理添加日志请求的情况 */
    /* 更新本地记录的Follower的日志情况 */
    raft_node_set_next_idx(node, r->current_idx + 1);
    raft_node_set_match_idx(node, r->current_idx);

    /* 如果是新加入的机器,则判断它的日志是否是最新,如果达到了最新,则赋予它投票权,
     * 这里逻辑的详细解释在第3.4节 -- 成员变更*/
    if (!raft_node_is_voting(node) &&
        -1 == me->voting_cfg_change_log_idx &&
        raft_get_current_idx(me_) <= r->current_idx + 1 &&
        me->cb.node_has_sufficient_logs &&
        0 == raft_node_has_sufficient_logs(node)
        )
    {
        raft_node_set_has_sufficient_logs(node);
        me->cb.node_has_sufficient_logs(me_, me->udata, node);
    }

    /* 如果一条日志回复成功的数量超过一半,则将日志提交commit,即允许应用到状态机 */
    int votes = 1; /* include me */
    int point = r->current_idx;
    int i;
    for (i = 0; i < me->num_nodes; i++)
    {
        if (me->node == me->nodes[i] || !raft_node_is_voting(me->nodes[i]))
            continue;

        int match_idx = raft_node_get_match_idx(me->nodes[i]);

        if (0 < match_idx)
        {
            raft_entry_t* ety = raft_get_entry_from_idx(me_, match_idx);
            /*如果follower已经添加了索引大于等于r->current_idx的日志,则vote加1*/
            if (ety->term == me->current_term && point <= match_idx)
                votes++;
        }
    }

    /* 投票数大于所有服务器的一半,则将日志提交 */
    if (me->num_nodes / 2 < votes && raft_get_commit_idx(me_) < point)
        raft_set_commit_idx(me_, point);

    /* 如果follower的日志还没有最新,那么继续发送添加日志请求 */
    if (raft_get_entry_from_idx(me_, raft_node_get_next_idx(node)))
        raft_send_appendentries(me_, node);

    /* periodic applies committed entries lazily */

    return 0;
}
ログイン後にコピー

3.3 成员变更

成员的变更都是以日志的形式下发的。添加的新成员分两阶段进行,第一阶段中新成员没有有投票权,但是有接收日志的权力;当它的日志同步到最新后就进入到第二阶段,由Leader赋予投票权,从而成为集群中完整的一员。删除成员相对比较简单,所有服务器收到删除成员的日志后,立马将该成员的信息从本地抹除。

添加成员过程

  1. 管理员向Leader发送添加成员命令

  2. Leader添加一条 RAFT_LOGTYPE_ADD_NONVOTING_NODE日志,即添加没有投票权的服务器。该日志与其它普通日志一样同步给集群中其它服务器。收到该日志的服务器在本地保存该新成员的信息。

  3. 当新成员的日志同步到最新后,Leader添加一条 RAFT_LOGTYPE_ADD_NODE日志,即有投票权的服务器,同样地,该日志与其它普通日志一样同步给集群中其它服务器。收到该日志的服务器在本地保存该新成员的信息,以后的投票活动会将新成员考虑进去。

删除成员过程

  1. 管理员向Leader发送删除成员命令。

  2. Leader添加一条 RAFT_LOGTYPE_REMOVE_NODE 日志,并跟普通日志一样同步给其它服务器。收到该日志的服务器立即将被成员信息从本地删除。

感谢大家的阅读,希望大家收益多多。

推荐教程:《C语言

以上がC言語によるRaft実装(コード付き)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:itbbs.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート