kardiachain/go-kardia

[Feature Request] Blockchain reactor for fast sync purpose

Closed this issue · 1 comments

Blockchain reactor implementation details

The reactor will include a demultiplexing routine which will send each message to each sub routine for independent processing. The fast sync protocol logic is decoupled from IO by using three concurrent threads of execution: a scheduler, a processor, and a demuxer. The demuxRoutine acts as "pacemaker" setting the time in which events are expected to be handled. It is responsible for translating between internal events and network IO messages, and for routing events between components. Both the scheduler and processor are structured as finite state machines with input and output events. Input events are received on an unbounded priority queue, with higher priority for error events. Output events are emitted on a blocking, bounded channel. Network IO is handled by the KardiaChain p2p subsystem, where messages are sent in a non-blocking manner.

// Takes the channel as a parameter to avoid race conditions on r.events.
func (r *BlockchainReactor) demux(events <-chan Event) {
	var (
		scheduleFreq = 20 * time.Millisecond
		doScheduleCh = make(chan struct{}, 1)
		doScheduleTk = time.NewTicker(scheduleFreq)
	)
	defer doScheduleTk.Stop()
        ...
	for {
		select {
			case <-doScheduleTk.C:
			        select {
			        case doScheduleCh <- struct{}{}:
			        default:
			        }
                        case <-doScheduleCh:
			        r.scheduler.send(rTrySchedule{time: time.Now()})

                        // Events from peers. Closing the channel signals event loop termination.
		        case event, ok := <-events:
                        ...

			// Incremental events from scheduler
			case event := <-r.scheduler.next():
			...

			// Incremental events from processor
			case event := <-r.processor.next():
			...

			// Terminal event from processor
			case err := <-r.processor.final():
			...
		}
	}
}

The IO component is responsible for exchanging (sending and receiving) fast sync protocol messages with peers. There is one send and one receive routine per peer.

Life cycle management

A set of routines for individual processes allow processes to run in parallel with clear life cycle management. Start, Stop, and AddPeer hooks currently present in the reactor will delegate to the sub-routines allowing them to manage internal state independent without further coupling to the reactor.

func (r *BlockChainReactor) Start() {
	r.events = make(chan Event, chBufferSize)
	go r.scheduler.start()
	go r.processor.start()
        go r.demux(r.events)
	...
}

func (bcR *BlockchainReactor) Receive(...) {
	...
	r.msgs <- msg
	...
}

func (r *BlockchainReactor) Stop() {
	...
	r.msgs <- stop
	...
}

...

func (r *BlockchainReactor) AddPeer(peer p2p.Peer) {
	...
	r.msgs <- bcAddPeerEv{peer.ID}
	...
}

IO handling

An IO handling routine within the reactor will isolate peer communication. Message going through the ioRoutine will usually be one way, using p2p APIs. In the case in which the p2p API such as trySend return errors, the ioRoutine can funnel those message back to the demuxRoutine for distribution to the other routines. For instance errors from the ioRoutine can be consumed by the scheduler to inform better peer selection implementations.

func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height uint64) error {
	...
	msgBytes, err := EncodeMsg(&bcproto.BlockRequest{Height: height})
	if err != nil {
		return err
	}
	queued := peer.TrySend(BlockchainChannel, msgBytes)
	if !queued {
		return fmt.Errorf("send queue full")
	}
	return nil
}

func (sio *switchIO) sendStatusResponse(base uint64, height uint64, peerID p2p.ID) error {
        ...
	msgBytes, err := EncodeMsg(&bcproto.StatusResponse{Height: height, Base: base})
	if err != nil {
		return err
	}
	if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
		return fmt.Errorf("peer queue full")
	}
	return nil
}

func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error {
	...
	msgBytes, err := EncodeMsg(&bcproto.BlockResponse{Block: bpb})
	if err != nil {
		return err
	}
	if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
		return fmt.Errorf("peer queue full")
	}
	return nil
}

func (sio *switchIO) sendBlockNotFound(height uint64, peerID p2p.ID) error {
	peer := sio.sw.Peers().Get(peerID)
	if peer == nil {
		return fmt.Errorf("peer not found")
	}
	msgBytes, err := EncodeMsg(&bcproto.NoBlockResponse{Height: height})
	if err != nil {
		return err
	}

	if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
		return fmt.Errorf("peer queue full")
	}

	return nil
}

func (sio *switchIO) trySwitchToConsensus(state cstate.LastestBlockState, skipWAL bool) bool {
	conR, ok := sio.sw.Reactor("CONSENSUS").(consensusReactor)
	if ok {
		conR.SwitchToConsensus(state, skipWAL)
	}
	return ok
}

func (sio *switchIO) broadcastStatusRequest() error {
	...
	sio.sw.Broadcast(BlockchainChannel, msgBytes)
	return nil
}

Processor

The processor is responsible for ordering, verifying and executing blocks. The Processor will maintain an internal map queue referring to the blocks waiting to be processed. As a set of blocks arrive unordered, the Processor will check if it has height+1 necessary to process the next block. The processor also maintains the interface processorContext in order to verify, apply and save new blocks.

type pcState struct {
	// blocks waiting to be processed
	queue blockQueue

	// draining indicates that the next rProcessBlock event with a queue miss constitutes completion
	draining bool

	// the number of blocks successfully synced by the processor
	blocksSynced int

	// the processorContext which contains the processor dependencies
	context processorContext
}

type processorContext interface {
	applyBlock(blockID types.BlockID, block *types.Block) error
	verifyCommit(chainID string, blockID types.BlockID, height uint64, commit *types.Commit) error
	saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit)
	kaiState() cstate.LastestBlockState
	setState(cstate.LastestBlockState)
}

In Kardia blockchain, the commit for block (signed votes messages) h is contained in block h+1, and thus a node performing fast sync must receive two sequential blocks before it can verify fully the first one. If verification succeeds, the first block is accepted; if it fails, both blocks are rejected, since it is not known which block was faulty. When the node rejects a block, it
suspects the sending peer of being faulty and evicts this peer from the set of peers. The same happens when a peer does not reply within a predefined time interval.

// nextTwo returns the next two unverified blocks
func (state *pcState) nextTwo() (queueItem, queueItem, error) {
	if first, ok := state.queue[state.height()+1]; ok {
		if second, ok := state.queue[state.height()+2]; ok {
			return first, second, nil
		}
	}
	return queueItem{}, queueItem{}, fmt.Errorf("not found")
}

func (state *pcState) handle(event Event) (Event, error) {
        switch event := event.(type) {
        case rProcessBlock:
		// verify if +second+ last commit "confirms" +first+ block
		err = state.context.verifyCommit(kaiState.ChainID, firstID, first.Height(), second.LastCommit())
		if err != nil {
			state.purgePeer(firstItem.peerID)
			if firstItem.peerID != secondItem.peerID {
				state.purgePeer(secondItem.peerID)
			}
			return pcBlockVerificationFailure{
					height: first.Height(), firstPeerID: firstItem.peerID, secondPeerID: secondItem.peerID},
				nil
		}
...

Furthermore, it informs the scheduler whether a block processing was successful (pcBlockProcessed) or it has led to an error (pcBlockVerificationFailure).

Scheduler

The scheduler contains the business logic for tracking peers and determining which block to request from whom. The scheduler needs to maintain state on:

  • The state blockState of every block seem up to height of maxHeight
  • The set of peers and their peer state peerState
  • which peers have which blocks
  • which blocks have been requested from which peers
type blockState int

const (
	blockStateUnknown   blockState = iota + 1 // no known peer has this block
	blockStateNew                             // indicates that a peer has reported having this block
	blockStatePending                         // indicates that this block has been requested from a peer
	blockStateReceived                        // indicates that this block has been received by a peer
	blockStateProcessed                       // indicates that this block has been applied
)

type scheduler struct {
	initHeight uint64

	// next block that needs to be processed. All blocks with smaller height are
	// in Processed state.
	height uint64

	// lastAdvance tracks the last time a block execution happened.
	// syncTimeout is the maximum time the scheduler waits to advance in the fast sync process before finishing.
	// This covers the cases where there are no peers or all peers have a lower height.
	lastAdvance time.Time
	syncTimeout time.Duration

	// a map of peerID to scheduler specific peer struct `scPeer` used to keep
	// track of peer specific state
	peers       map[p2p.ID]*scPeer
	peerTimeout time.Duration   // maximum response time from a peer otherwise prune
	minRecvRate int64             // minimum receive rate from peer otherwise prune

	// the maximum number of blocks that should be New, Received or Pending at any point
	// in time. This is used to enforce a limit on the blockStates map.
	targetPending int
	// a list of blocks to be scheduled (New), Pending or Received. Its length should be
	// smaller than targetPending.
	blockStates map[uint64]blockState

	// a map of heights to the peer we are waiting a response from
	pendingBlocks map[uint64]p2p.ID

	// the time at which a block was put in blockStatePending
	pendingTime map[uint64]time.Time

	// a map of heights to the peers that put the block in blockStateReceived
	receivedBlocks map[uint64]p2p.ID
}

The scheduler receives relevant protocol messages from peers (for example bcBlockResponse and bcStatusResponse), but also internal events that are the result of the block processing in the processor (the events carry the information of whether
a block was successfully processed or there was an error). The scheduler schedules block requests by emitting internal events (scBlockRequest) and also informs the processor about internal processing, for example, when block response is received (scBlockReceived) or if there is an error in peer behaviour (scPeerError). It is configured to maintain a target n of in flight messages and will use feedback from bcBlockResponse, bcStatusResponse and scPeerError to produce an optimal assignment of rTrySchedule at each doScheduleCh ticker.

func (sc *scheduler) handle(event Event) (Event, error) {
	switch event := event.(type) {
	case bcResetState:
		nextEvent, err := sc.handleResetState(event)
		return nextEvent, err
	case bcStatusResponse:
		nextEvent, err := sc.handleStatusResponse(event)
		return nextEvent, err
	case bcBlockResponse:
		nextEvent, err := sc.handleBlockResponse(event)
		return nextEvent, err
	case bcNoBlockResponse:
		nextEvent, err := sc.handleNoBlockResponse(event)
		return nextEvent, err
	case rTrySchedule:
		nextEvent, err := sc.handleTrySchedule(event)
		return nextEvent, err
	case bcAddNewPeer:
		nextEvent, err := sc.handleAddNewPeer(event)
		return nextEvent, err
	case bcRemovePeer:
		nextEvent, err := sc.handleRemovePeer(event)
		return nextEvent, err
	case rTryPrunePeer:
		nextEvent, err := sc.handleTryPrunePeer(event)
		return nextEvent, err
	case pcBlockProcessed:
		nextEvent, err := sc.handleBlockProcessed(event)
		return nextEvent, err
	case pcBlockVerificationFailure:
		nextEvent, err := sc.handleBlockProcessError(event)
		return nextEvent, err
	default:
		return scSchedulerFail{reason: fmt.Errorf("unknown event %v", event)}, nil
	}
}
...

// lowest block in sc.blockStates with state == blockStateNew or -1 if no new blocks
func (sc *scheduler) nextHeightToSchedule() uint64 {
	var min uint64 = math.MaxUint64
	for height, state := range sc.blockStates {
		if state == blockStateNew && height < min {
			min = height
		}
	}
	if min == math.MaxUint64 {
		min = 0
	}
	return min
}

func (sc *scheduler) handleTrySchedule(event rTrySchedule) (Event, error) {
	if time.Since(sc.lastAdvance) > sc.syncTimeout {
		return scFinishedEv{reason: "timeout, no advance"}, nil
	}

	nextHeight := sc.nextHeightToSchedule()
	if nextHeight == 0 {
		return noOp, nil
	}

	bestPeerID, err := sc.selectPeer(nextHeight)
	if err != nil {
		return scSchedulerFail{reason: err}, nil
	}
	if err := sc.markPending(bestPeerID, nextHeight, event.time); err != nil {
		return scSchedulerFail{reason: err}, nil // XXX: peerError might be more appropriate
	}
	return scBlockRequest{peerID: bestPeerID, height: nextHeight}, nil
}
...

type scPeer struct {
	peerID p2p.ID

	// initialized as New when peer is added, updated to Ready when statusUpdate is received,
	// updated to Removed when peer is removed
	state peerState

	base        uint64 // updated when statusResponse is received
	height      uint64 // updated when statusResponse is received
	lastTouched time.Time
	lastRate    int64 // last receive rate in bytes
}

Once the Fastsync protocol terminates, this is signaled to the KardiaChain consensus component (denoted ConsensusManager) with a trySwitchToConsensus event.

Blockchain reactor implemented at #129 in order to resolve this issue.