[Feature Request] Blockchain reactor for fast sync purpose
Closed this issue · 1 comments
Blockchain reactor implementation details
The reactor will include a demultiplexing routine which will send each message to each sub routine for independent processing. The fast sync protocol logic is decoupled from IO by using three concurrent threads of execution: a scheduler, a processor, and a demuxer. The demuxRoutine
acts as "pacemaker" setting the time in which events are expected to be handled. It is responsible for translating between internal events and network IO messages, and for routing events between components. Both the scheduler and processor are structured as finite state machines with input and output events. Input events are received on an unbounded priority queue, with higher priority for error events. Output events are emitted on a blocking, bounded channel. Network IO is handled by the KardiaChain p2p subsystem, where messages are sent in a non-blocking manner.
// Takes the channel as a parameter to avoid race conditions on r.events.
func (r *BlockchainReactor) demux(events <-chan Event) {
var (
scheduleFreq = 20 * time.Millisecond
doScheduleCh = make(chan struct{}, 1)
doScheduleTk = time.NewTicker(scheduleFreq)
)
defer doScheduleTk.Stop()
...
for {
select {
case <-doScheduleTk.C:
select {
case doScheduleCh <- struct{}{}:
default:
}
case <-doScheduleCh:
r.scheduler.send(rTrySchedule{time: time.Now()})
// Events from peers. Closing the channel signals event loop termination.
case event, ok := <-events:
...
// Incremental events from scheduler
case event := <-r.scheduler.next():
...
// Incremental events from processor
case event := <-r.processor.next():
...
// Terminal event from processor
case err := <-r.processor.final():
...
}
}
}
The IO component is responsible for exchanging (sending and receiving) fast sync protocol messages with peers. There is one send and one receive routine per peer.
Life cycle management
A set of routines for individual processes allow processes to run in parallel with clear life cycle management. Start
, Stop
, and AddPeer
hooks currently present in the reactor will delegate to the sub-routines allowing them to manage internal state independent without further coupling to the reactor.
func (r *BlockChainReactor) Start() {
r.events = make(chan Event, chBufferSize)
go r.scheduler.start()
go r.processor.start()
go r.demux(r.events)
...
}
func (bcR *BlockchainReactor) Receive(...) {
...
r.msgs <- msg
...
}
func (r *BlockchainReactor) Stop() {
...
r.msgs <- stop
...
}
...
func (r *BlockchainReactor) AddPeer(peer p2p.Peer) {
...
r.msgs <- bcAddPeerEv{peer.ID}
...
}
IO handling
An IO handling routine within the reactor will isolate peer communication. Message going through the ioRoutine
will usually be one way, using p2p
APIs. In the case in which the p2p
API such as trySend
return errors, the ioRoutine
can funnel those message back to the demuxRoutine
for distribution to the other routines. For instance errors from the ioRoutine
can be consumed by the scheduler to inform better peer selection implementations.
func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height uint64) error {
...
msgBytes, err := EncodeMsg(&bcproto.BlockRequest{Height: height})
if err != nil {
return err
}
queued := peer.TrySend(BlockchainChannel, msgBytes)
if !queued {
return fmt.Errorf("send queue full")
}
return nil
}
func (sio *switchIO) sendStatusResponse(base uint64, height uint64, peerID p2p.ID) error {
...
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{Height: height, Base: base})
if err != nil {
return err
}
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
return fmt.Errorf("peer queue full")
}
return nil
}
func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error {
...
msgBytes, err := EncodeMsg(&bcproto.BlockResponse{Block: bpb})
if err != nil {
return err
}
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
return fmt.Errorf("peer queue full")
}
return nil
}
func (sio *switchIO) sendBlockNotFound(height uint64, peerID p2p.ID) error {
peer := sio.sw.Peers().Get(peerID)
if peer == nil {
return fmt.Errorf("peer not found")
}
msgBytes, err := EncodeMsg(&bcproto.NoBlockResponse{Height: height})
if err != nil {
return err
}
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
return fmt.Errorf("peer queue full")
}
return nil
}
func (sio *switchIO) trySwitchToConsensus(state cstate.LastestBlockState, skipWAL bool) bool {
conR, ok := sio.sw.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(state, skipWAL)
}
return ok
}
func (sio *switchIO) broadcastStatusRequest() error {
...
sio.sw.Broadcast(BlockchainChannel, msgBytes)
return nil
}
Processor
The processor is responsible for ordering, verifying and executing blocks. The Processor will maintain an internal map queue
referring to the blocks waiting to be processed. As a set of blocks arrive unordered, the Processor will check if it has height+1
necessary to process the next block. The processor also maintains the interface processorContext
in order to verify, apply and save new blocks.
type pcState struct {
// blocks waiting to be processed
queue blockQueue
// draining indicates that the next rProcessBlock event with a queue miss constitutes completion
draining bool
// the number of blocks successfully synced by the processor
blocksSynced int
// the processorContext which contains the processor dependencies
context processorContext
}
type processorContext interface {
applyBlock(blockID types.BlockID, block *types.Block) error
verifyCommit(chainID string, blockID types.BlockID, height uint64, commit *types.Commit) error
saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit)
kaiState() cstate.LastestBlockState
setState(cstate.LastestBlockState)
}
In Kardia blockchain, the commit for block (signed votes messages) h
is contained in block h+1
, and thus a node performing fast sync must receive two sequential blocks before it can verify fully the first one. If verification succeeds, the first block is accepted; if it fails, both blocks are rejected, since it is not known which block was faulty. When the node rejects a block, it
suspects the sending peer of being faulty and evicts this peer from the set of peers. The same happens when a peer does not reply within a predefined time interval.
// nextTwo returns the next two unverified blocks
func (state *pcState) nextTwo() (queueItem, queueItem, error) {
if first, ok := state.queue[state.height()+1]; ok {
if second, ok := state.queue[state.height()+2]; ok {
return first, second, nil
}
}
return queueItem{}, queueItem{}, fmt.Errorf("not found")
}
func (state *pcState) handle(event Event) (Event, error) {
switch event := event.(type) {
case rProcessBlock:
// verify if +second+ last commit "confirms" +first+ block
err = state.context.verifyCommit(kaiState.ChainID, firstID, first.Height(), second.LastCommit())
if err != nil {
state.purgePeer(firstItem.peerID)
if firstItem.peerID != secondItem.peerID {
state.purgePeer(secondItem.peerID)
}
return pcBlockVerificationFailure{
height: first.Height(), firstPeerID: firstItem.peerID, secondPeerID: secondItem.peerID},
nil
}
...
Furthermore, it informs the scheduler whether a block processing was successful (pcBlockProcessed
) or it has led to an error (pcBlockVerificationFailure
).
Scheduler
The scheduler contains the business logic for tracking peers and determining which block to request from whom. The scheduler needs to maintain state on:
- The state
blockState
of every block seem up to height ofmaxHeight
- The set of peers and their peer state
peerState
- which peers have which blocks
- which blocks have been requested from which peers
type blockState int
const (
blockStateUnknown blockState = iota + 1 // no known peer has this block
blockStateNew // indicates that a peer has reported having this block
blockStatePending // indicates that this block has been requested from a peer
blockStateReceived // indicates that this block has been received by a peer
blockStateProcessed // indicates that this block has been applied
)
type scheduler struct {
initHeight uint64
// next block that needs to be processed. All blocks with smaller height are
// in Processed state.
height uint64
// lastAdvance tracks the last time a block execution happened.
// syncTimeout is the maximum time the scheduler waits to advance in the fast sync process before finishing.
// This covers the cases where there are no peers or all peers have a lower height.
lastAdvance time.Time
syncTimeout time.Duration
// a map of peerID to scheduler specific peer struct `scPeer` used to keep
// track of peer specific state
peers map[p2p.ID]*scPeer
peerTimeout time.Duration // maximum response time from a peer otherwise prune
minRecvRate int64 // minimum receive rate from peer otherwise prune
// the maximum number of blocks that should be New, Received or Pending at any point
// in time. This is used to enforce a limit on the blockStates map.
targetPending int
// a list of blocks to be scheduled (New), Pending or Received. Its length should be
// smaller than targetPending.
blockStates map[uint64]blockState
// a map of heights to the peer we are waiting a response from
pendingBlocks map[uint64]p2p.ID
// the time at which a block was put in blockStatePending
pendingTime map[uint64]time.Time
// a map of heights to the peers that put the block in blockStateReceived
receivedBlocks map[uint64]p2p.ID
}
The scheduler receives relevant protocol messages from peers (for example bcBlockResponse
and bcStatusResponse
), but also internal events that are the result of the block processing in the processor (the events carry the information of whether
a block was successfully processed or there was an error). The scheduler schedules block requests by emitting internal events (scBlockRequest
) and also informs the processor about internal processing, for example, when block response is received (scBlockReceived
) or if there is an error in peer behaviour (scPeerError
). It is configured to maintain a target n
of in flight messages and will use feedback from bcBlockResponse
, bcStatusResponse
and scPeerError
to produce an optimal assignment of rTrySchedule
at each doScheduleCh
ticker.
func (sc *scheduler) handle(event Event) (Event, error) {
switch event := event.(type) {
case bcResetState:
nextEvent, err := sc.handleResetState(event)
return nextEvent, err
case bcStatusResponse:
nextEvent, err := sc.handleStatusResponse(event)
return nextEvent, err
case bcBlockResponse:
nextEvent, err := sc.handleBlockResponse(event)
return nextEvent, err
case bcNoBlockResponse:
nextEvent, err := sc.handleNoBlockResponse(event)
return nextEvent, err
case rTrySchedule:
nextEvent, err := sc.handleTrySchedule(event)
return nextEvent, err
case bcAddNewPeer:
nextEvent, err := sc.handleAddNewPeer(event)
return nextEvent, err
case bcRemovePeer:
nextEvent, err := sc.handleRemovePeer(event)
return nextEvent, err
case rTryPrunePeer:
nextEvent, err := sc.handleTryPrunePeer(event)
return nextEvent, err
case pcBlockProcessed:
nextEvent, err := sc.handleBlockProcessed(event)
return nextEvent, err
case pcBlockVerificationFailure:
nextEvent, err := sc.handleBlockProcessError(event)
return nextEvent, err
default:
return scSchedulerFail{reason: fmt.Errorf("unknown event %v", event)}, nil
}
}
...
// lowest block in sc.blockStates with state == blockStateNew or -1 if no new blocks
func (sc *scheduler) nextHeightToSchedule() uint64 {
var min uint64 = math.MaxUint64
for height, state := range sc.blockStates {
if state == blockStateNew && height < min {
min = height
}
}
if min == math.MaxUint64 {
min = 0
}
return min
}
func (sc *scheduler) handleTrySchedule(event rTrySchedule) (Event, error) {
if time.Since(sc.lastAdvance) > sc.syncTimeout {
return scFinishedEv{reason: "timeout, no advance"}, nil
}
nextHeight := sc.nextHeightToSchedule()
if nextHeight == 0 {
return noOp, nil
}
bestPeerID, err := sc.selectPeer(nextHeight)
if err != nil {
return scSchedulerFail{reason: err}, nil
}
if err := sc.markPending(bestPeerID, nextHeight, event.time); err != nil {
return scSchedulerFail{reason: err}, nil // XXX: peerError might be more appropriate
}
return scBlockRequest{peerID: bestPeerID, height: nextHeight}, nil
}
...
type scPeer struct {
peerID p2p.ID
// initialized as New when peer is added, updated to Ready when statusUpdate is received,
// updated to Removed when peer is removed
state peerState
base uint64 // updated when statusResponse is received
height uint64 // updated when statusResponse is received
lastTouched time.Time
lastRate int64 // last receive rate in bytes
}
Once the Fastsync protocol terminates, this is signaled to the KardiaChain consensus component (denoted ConsensusManager
) with a trySwitchToConsensus
event.