View on GitHub

时光粒子源码

分布式一致性/分布式存储等开源技术探讨, GitHub:https://github.com/timequark

Raft源码分析(三) - commit提交状态跟踪

lastApplied & commitIndex & latLogIndex 示意图

上图中,上半部分说明lastApplied会根据commitIndex,自动追平。当 Leader 中的 index 为 9 的 entry,完成半

数以上replication后,commitIndex := 9,同时,lastApplied还是会自动追平。下面源码中会详细分析。

以下是lastApplied、commitIndex的定义:

storage.py
class Log:
    """Persistent Raft Log on a disk
    Log entries:
        {term: <term>, command: <command>}
        {term: <term>, command: <command>}
        ...
        {term: <term>, command: <command>}

    Entry index is a corresponding line number
    """

    UPDATE_CACHE_EVERY = 5

    def __init__(self, node_id, serializer=None):
        self.filename = os.path.join(config.log_path, '{}.log'.format(node_id.replace(':', '_')))
        os.makedirs(os.path.dirname(self.filename), exist_ok=True)
        open(self.filename, 'a').close()

        self.serializer = serializer or config.serializer
        self.cache = self.read()

        # All States
        # commit_index、last_applied在所有Role角色中均会使用到,分析后得知,目前只有Leader/Follower才会使用到,
        # 其实 Candidate 只是在election时才会用到,是一个临时跳板角色
        """Volatile state on all servers: index of highest log entry known to be committed
        (initialized to 0, increases monotonically)"""
        self.commit_index = 0

        """Volatile state on all servers: index of highest log entry applied to state machine
        (initialized to 0, increases monotonically)"""
        self.last_applied = 0

        # Leaders
        # next_index仅在Leader中使用,记录Follower中的下一个entry的index位置,即append_entries_response中Follower的last_log_index + 1
        """Volatile state on Leaders: for each server, index of the next log entry to send to that server
        (initialized to leader last log index + 1)
            {<follower>:  index, ...}
        """
        self.next_index = None

        """Volatile state on Leaders: for each server, index of highest log entry known to be replicated on server
        (initialized to 0, increases monotonically)
            {<follower>:  index, ...}
        """
        # match_index仅在Leader中使用,记录Follower中已经replication完成的entry index
        self.match_index = None

lastApplied更新逻辑

state.py
def validate_commit_index(func):
    """Apply to State Machine everything up to commit index"""

    @functools.wraps(func)
    def wrapped(self, *args, **kwargs):
        # LIUHAO: Attention here, [log.last_applied, log.commit_index) but not 'log.last_log_index'
        # 这里根据commit_index,last_applied自动追平
        for not_applied in range(self.log.last_applied + 1, self.log.commit_index + 1):
            self.state_machine.apply(self.log[not_applied]['command'])
            self.log.last_applied += 1

            try:
                # LIUHAO: In concurrent requests case, how to make Future can reply Client's execute_command call correctly???
                # 疑点:
                #   self.apply_future 是 Leader.execute_command 中异步等待的变量,在并发情况下,如何保证 apply_future 的有序性呢?
                self.apply_future.set_result(not_applied)
            except (asyncio.futures.InvalidStateError, AttributeError):
                pass

        return func(self, *args, **kwargs)
    return wrapped

Leader 中 commitIndex源码分析

state.py
class Leader(BaseRole):
    ...
    def update_commit_index(self):
        '''
        LIUHAO: Update commit_index of leader when Leader receives responses from Follower
        '''
        commited_on_majority = 0
        # 在当前[commit_index+1, last_log_index + 1)范围内遍历,Leader中的 index 已得到 match_index 半数以
        # 上 Follower 回应,并且,log[index]['term'] 与最新 storage.term 相同时,更新 commit_index
        for index in range(self.log.commit_index + 1, self.log.last_log_index + 1):
            commited_count = len([
                1 for follower in self.log.match_index
                if self.log.match_index[follower] >= index
            ])

            # If index is matched on at least half + self for current term — commit
            # That may cause commit fails upon restart with stale logs
            is_current_term = self.log[index]['term'] == self.storage.term
            if self.state.is_majority(commited_count + 1) and is_current_term:
                commited_on_majority = index

            else:
                break

        if commited_on_majority > self.log.commit_index:
            self.log.commit_index = commited_on_majority

那 match_index 是什么时候更新的?来看一下 Leader.on_receive_append_entries_response 方法:

    @validate_commit_index
    @validate_term
    def on_receive_append_entries_response(self, data):
        sender_id = self.state.get_sender_id(data['sender'])

        # Count all unqiue responses per particular heartbeat interval
        # and step down via <step_down_timer> if leader doesn't get majority of responses for
        # <step_down_missed_heartbeats> heartbeats

        if data['request_id'] in self.response_map:
            self.response_map[data['request_id']].add(sender_id)

            if self.state.is_majority(len(self.response_map[data['request_id']]) + 1):
                self.step_down_timer.reset()
                del self.response_map[data['request_id']]

        if not data['success']:
            # LIUHAO: next_index is descreasing. Maybe in order to tolerant the follower to recover log data and catch up Leader
            self.log.next_index[sender_id] = max(self.log.next_index[sender_id] - 1, 1)

        else:
            # LIUHAO: Trace next_index, match_index for follower inside Leader.
            # append_entries 成功时,
            # next_index[follower_id] 更新为Follower的last_log_index+1,
            # match_index[follower_id]更新为Follower的last_log_index
            self.log.next_index[sender_id] = data['last_log_index'] + 1
            self.log.match_index[sender_id] = data['last_log_index']
            # 更新commit_index
            self.update_commit_index()

        # Send AppendEntries RPC to continue updating fast-forward log (data['success'] == False)
        # or in case there are new entries to sync (data['success'] == data['updated'] == True)
        if self.log.last_log_index >= self.log.next_index[sender_id]:
            # LIUHAO: Continue to send data to the follower
            asyncio.ensure_future(self.append_entries(destination=sender_id), loop=self.loop)

Follower 中commitIndex源码分析

state.py
class Follower(BaseRole):
    ...
    @validate_commit_index
    @validate_term
    def on_receive_append_entries(self, data):
        self.state.set_leader(data['leader_id'])

        # Reply False if log doesn’t contain an entry at prev_log_index whose term matches prev_log_term
        try:
            prev_log_index = data['prev_log_index']
            # 检查Leader侧提供的Follower的prev_log_index、Leader的term,与本地相比,是否有效
            # 如果无效,则直接返回 False
            # 注意:
            # raft白皮书有提到,无效时,可以携带Follower的 last_log_index,给到 Leader 侧,这样做可以使
            # Leader 侧快速定位 Follower 的 next_index,进而减少Leader侧无效的 append_entries 通信次数
            if prev_log_index > self.log.last_log_index or (
                prev_log_index and self.log[prev_log_index]['term'] != data['prev_log_term']
            ):
                response = {
                    'type': 'append_entries_response',
                    'term': self.storage.term,
                    'success': False,

                    'request_id': data['request_id']
                }
                # 异步回应Leader
                asyncio.ensure_future(self.state.send(response, data['sender']), loop=self.loop)
                return
        except IndexError:
            pass

        # If an existing entry conflicts with a new one (same index but different terms),
        # delete the existing entry and all that follow it
        # 将Leader发过来的entries数据,存至Log中 new_index 开始的位置
        new_index = data['prev_log_index'] + 1
        try:
            # 有冲突时,直接擦除至尾部,向Leader看齐
            if self.log[new_index]['term'] != data['term'] or (
                self.log.last_log_index != prev_log_index
            ):
                self.log.erase_from(new_index)
        except IndexError:
            pass
            # LIUHAO: TODO
            # 'log.write' will append entries to its tail. Should we reply Leader False message???

        # It's always one entry for now
        for entry in data['entries']:
            self.log.write(entry['term'], entry['command'])

        # Update commit index if necessary
        # 注意这里的条件,Follower的commit_index 小于 Leader的commit_index时,才更新
        # 问题:
        # Follower的commit_index 大于 Leader的commit_index时,如何处理?
        # 思考:
        # 大于的情形有可能是 Follower 曾经是 Leader,commit_index 比较新 ,因为某些原因降级成 Follower。
        # 但是,这种情形也不合理,因为 Leader 的 commit_index 只有收到过半Follower的 append_entries_response 后才会更新,
        # 如此,Follower 的 commit_index 一定是小于 Leader 的 commit_index,直至 Leader 同步完最后一个 last_log_index 
        # 的 entry,Follower 的 commit_index 等于 Leader 的 commit_index(因为  Leader 的 update_commit_index 遍历范围
        # [commit_index+1, last_log_index+1) 时 index 最大值为 last_log_index )。 
        if self.log.commit_index < data['commit_index']:
            self.log.commit_index = min(data['commit_index'], self.log.last_log_index)

        # Respond True since entry matching prev_log_index and prev_log_term was found
        response = {
            'type': 'append_entries_response',
            'term': self.storage.term,
            'success': True,

            'last_log_index': self.log.last_log_index, # LIUHAO: Here, 'log.last_log_index' will be updated for that more than 1 entry be appended to the Log list 
            'request_id': data['request_id']
        }
        asyncio.ensure_future(self.state.send(response, data['sender']), loop=self.loop)

        # 重置选举定时器
        self.election_timer.reset()

上一篇

下一篇