0%

Tikv的BatchSystem

BatchSystem是Tikv实现multi-raft的基石,本文介绍BatchSystem的实现。BatchSystem本身是一个抽象出来的通用的模块,不牵涉业务逻辑(multi-raft),方便单独介绍。

总体结构 (1)

FsmBasicMailboxScheduler (2)

Fsm是一个trait,由业务逻辑来实现。Fsm的运转是靠Message来驱动的,所以就有了BasicMailbox。一个Fsm和一个BasicMailbox绑定:BasicMailbox用于接收驱动FsmMessageFsm是发到BasicMailboxMessage的owner。

1
2
3
4
pub struct BasicMailbox<Owner: Fsm> {
sender: mpsc::LooseBoundedSender<Owner::Message>,
state: Arc<FsmState<Owner>>,
}

其中sender提供发送Message的功能,state里面实际上包裹了一个Fsm(除了Fsm本身还包含Fsm的状态)。可以从BasicMailbox中把Fsm拿出来(take_fsm函数),也可以还回去(release函数)。

当需要驱动Fsm的时候,就通过以下两个函数向关联的BasicMailbox发一个Message

1
2
3
4
5
6
7
8
9
10
11
12
13
impl<Owner: Fsm> BasicMailbox<Owner> {
pub fn force_send<S: FsmScheduler<Fsm = Owner>>(
&self,
msg: Owner::Message,
scheduler: &S,
) -> Result<(), SendError<Owner::Message>> {...}

pub fn try_send<S: FsmScheduler<Fsm = Owner>>(
&self,
msg: Owner::Message,
scheduler: &S,
) -> Result<(), TrySendError<Owner::Message>> {...}
}

发送到BasicMailboxMessage立即驱动Fsm吗?不是的,Fsm经过调度,才能最终被Message驱动。这也是force_sendtry_send的第二个参数的用途。这两个函数是这样工作的:把msg放入sender;把FsmBasicMailbox里拿出来,扔给Scheduler,这时Fsm处于NOTIFYSTATE_NOTIFIED状态(调度之前,FsmBasicMailbox中的时候处于NOTIFYSTATE_IDLE状态。将来,FsmMessage之后就会被还回BasicMailbox,那时又回到NOTIFYSTATE_IDLE状态。

Scheduler里面包含一个channel的发送端。当BasicMailbox接收到一个Message的时候,就把对应的Fsm扔到这个channel里。下文再说channel的接收端以及如何消费里面的Fsm

Router (3)

Router包含上文介绍过的3个组件:

  • BasicMailbox: 一个Control BasicMailbox和若干个Normal BasicMailboxRouter提供的接口是:向Control BasicMailbox或某个(由参数addr指定)Normal BasicMailbox发送一个Message;除此之外,还有广播,即向把Message发给所有BasicMailbox
  • Fsm: 每个BasicMailbox都关联一个Fsm,即有一个Control Fsm和若干个Normal Fsm
  • Scheduler: 一个Control Scheduler和一个Normal Scheduler;当一个Message发到一个BasicMailbox的时候,就把对应的Fsm拿出来,让Scheduler调度(即放进Schedulerchannel里);

PollerPollHandler (4)

前文说过,每个Scheduler有一个channel,有Message发给一个Fsm(发送到其关联的BasicMailbox)的时候,Fsm就被放进channelPoller就是这个channel的消费者:

1
2
3
4
5
6
struct Poller<N: Fsm, C: Fsm, Handler> {
router: Router<N, C, NormalScheduler<N, C>, ControlScheduler<N, C>>,
fsm_receiver: channel::Receiver<FsmTypes<N, C>>,
handler: Handler,
max_batch_size: usize,
}

其中fsm_receiver就是channel的接收端。一个Poller就是一个线程,线程中运行Poller::poll()函数。这个函数是一个大循环,每一轮都是从fsm_receiver中接收一批Fsm(可能是Control Fsm也可能是Normal Fsm,这些Fsm上有Message),由结构体Batch表示,然后处理它们。BatcySystem的名字就来源于此:一次接收并处理一批Fsm。详细点说:

  • 上面router里有$M$个Fsm/BasicMailbox(在Tikv中,一个region对应一个Normal Fsm/BasicMailbox,此外还有一个Control Fsm/BasicMailbox);
  • 下面线程池里有$N$个Poller
  • 当一个Fsm(无论Normal Fsm还是Control Fsm)的BasicMailbox接收到Message的时候,这个Fsm就被Scheduler发给线程池去处理;
  • 每个线程(Poller::pool())一次拿一批Fsm来处理,处理完之后再还给BasicMailbox(等BasicMailbox再接收到Message的时候,再处理);

注意,这里面有两个消息通道:

    1. Fsm被发往Poller的消息通道:发送端在Scheduler里,接收端在Poller里;
    1. Message被发往Fsm的消息通道:发送端在BasicMailbox里,接收端在Fsm里(PeerFsmStoreFsmreceiver);

如上所述,Poller::poll()的主要工作是从fsm_receiver中接收一批Fsm并处理它们。但,处理的逻辑是业务层的事,所以这里由一个trait(PollHandler)表示处理逻辑。由于fsm_receiver接收到的既有Control Fsm也有Normal Fsm,所以PollHandler定义了handle_controlhandle_normal接口。

1
2
3
4
5
6
7
pub trait PollHandler<N, C> {
fn begin(&mut self, batch_size: usize);
fn handle_control(&mut self, control: &mut C) -> Option<usize>;
fn handle_normal(&mut self, normal: &mut N) -> Option<usize>;
fn end(&mut self, batch: &mut [Box<N>]);
//......
}

Poller接收到一批Fsm之后:

  • 调用begin()
  • 对每个Fsm调用handle_control()handle_normal()
  • 调用end()

这些都是业务层实现的。可以想象,业务层实现handle_normal()handle_control()的时候,应该是从Fsm接收Message,处理它,驱动Fsm

BatchSystem (5)

BatchSystem就是把上述东西组合起来,其spawn函数的工作就是创建$N$个Poller实例,并启动线程。

小结 (6)

本文梳理一下BatchSystem的逻辑,后文就不用再陷入这些细节,聚焦于Tikv的业务逻辑:一个消息发给一个PeerFsmStoreFsm的时候,直接去看对应的PollHandler实现即可。

写的不错,有赏!