同步模式
以太坊中区块同步包含以下三种模式:
-
full sync:从网络同步所有的区块头,区块体并重放区块中的交易以生成状态数据
-
fast sync:从网络同步所有的区块头,区块体以及状态数据,但不对区块中的交易进行重放,只会对区块中的数据进行校验
-
light sync:从网络中同步所有区块头,不去同步区块体,也不去同步状态数据,仅在需要相应区块和状态数据时从网络上获取
区块下载
区块下载流程示意图如下所示:
首先根据Synchronise开始区块同步,通过findAncestor找到指定节点的共同祖先,并在此高度进行同步,同时开启多个goroutine同步不同的数据:header、receipt、body,假如同步高度为100的区块,必须先header同步成功同步完成才可以进行body和receipts的同步,而每个部分的同步大致都是由FetchParts来完成的,里面包含了各个Chan的配合,也会涉及不少的回调函数
源码分析
数据结构
downloader数据结构如下所示:
// filedir:go-ethereum-1.10.2\\\\eth\\\\downloader\\\\downloader.go L96type Downloader struct {// WARNING: The `rttEstimate` and `rttConfidence` fields are accessed atomically.// On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is// guaranteed to be so aligned, so take advantage of that. For more information,// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.rttEstimate uint64 // Round trip time to target for download requestsrttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncModemux *event.TypeMux // Event multiplexer to announce sync operation eventscheckpoint uint64 // Checkpoint block number to enforce head against (e.g. fast sync)genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT)queue *queue // Scheduler for selecting the hashes to downloadpeers *peerSet // Set of active peers from which download can proceedstateDB ethdb.Database // Database to state sync into (and deduplicate via)stateBloom *trie.SyncBloom // Bloom filter for fast trie node and contract code existence checks// Statistics 统计信息,syncStatsChainOrigin uint64 // Origin block number where syncing started atsyncStatsChainHeight uint64 // Highest block number known when syncing startedsyncStatsState stateSyncStatssyncStatsLock sync.RWMutex // Lock protecting the sync stats fieldslightchain LightChainblockchain BlockChain// CallbacksdropPeer peerDropFn // Drops a peer for misbehaving// StatussynchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testingsynchronising int32notified int32committed int32ancientLimit uint64 // The maximum block number which can be regarded as ancient data.// ChannelsheaderCh chan dataPack // Channel receiving inbound block headers header的输入通道,从网络下载的header会被送到这个通道bodyCh chan dataPack // Channel receiving inbound block bodies bodies的输入通道,从网络下载的bodies会被送到这个通道receiptCh chan dataPack // Channel receiving inbound receipts receipts的输入通道,从网络下载的receipts会被送到这个通道bodyWakeCh chan bool // Channel to signal the block body fetcher of new tasks 用来传输body fetcher新任务的通道receiptWakeCh chan bool // Channel to signal the receipt fetcher of new tasks 用来传输receipt fetcher 新任务的通道headerProcCh chan []*types.Header // Channel to feed the header processor new tasks 通道为header处理者提供新的任务// State syncpivotHeader *types.Header // Pivot block header to dynamically push the syncing state rootpivotLock sync.RWMutex // Lock protecting pivot header reads from updatessnapSync bool // Whether to run state sync over the snap protocolSnapSyncer *snap.Syncer // TODO(karalabe): make private! hack for nowstateSyncStart chan *stateSync //启动新的state fetchertrackStateReq chan *stateReqstateCh chan dataPack // Channel receiving inbound node state data State的输入通道,从网络下载的State会被送到这个通道// Cancellation and terminationcancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)cancelCh chan struct{} // Channel to cancel mid-flight syncscancelLock sync.RWMutex // Lock to protect the cancel channel and peer in deliverscancelWg sync.WaitGroup // Make sure all fetcher goroutines have exited.quitCh chan struct{} // Quit channel to signal terminationquitLock sync.Mutex // Lock to prevent double closes// Testing hookssyncInitHook func(uint64, uint64) // Method to call upon initiating a new sync runbodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetchreceiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetchchainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)}
构造方法
New用于初始化一个Downloader对象,具体代码如下所示:
// New creates a new downloader to fetch hashes and blocks from remote peers.func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {if lightchain == nil {lightchain = chain}dl := &Downloader{stateDB: stateDb,stateBloom: stateBloom,mux: mux,checkpoint: checkpoint,queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),peers: newPeerSet(),rttEstimate: uint64(rttMaxEstimate),rttConfidence: uint64(1000000),blockchain: chain,lightchain: lightchain,dropPeer: dropPeer,headerCh: make(chan dataPack, 1),bodyCh: make(chan dataPack, 1),receiptCh: make(chan dataPack, 1),bodyWakeCh: make(chan bool, 1),receiptWakeCh: make(chan bool, 1),headerProcCh: make(chan []*types.Header, 1),quitCh: make(chan struct{}),stateCh: make(chan dataPack),SnapSyncer: snap.NewSyncer(stateDb),stateSyncStart: make(chan *stateSync),syncStatsState: stateSyncStats{processed: rawdb.ReadFastTrieProgress(stateDb),},trackStateReq: make(chan *stateReq),}go dl.qosTuner() //计算rttEstimate和rttConfidencego dl.stateFetcher() //启动stateFetcher的任务监听return dl}
同步下载
区块同步始于Synchronise函数,在这里会直接调用synchronise进行同步,如果同步过程中出现错误,则删除掉Peer:
// Synchronise tries to sync up our local block chain with a remote peer, both// adding various sanity checks as well as wrapping it with various log entries.func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {err := d.synchronise(id, head, td, mode)switch err {case nil, errBusy, errCanceled:return err}if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) ||errors.Is(err, errStallingPeer) || errors.Is(err, errUnsyncedPeer) || errors.Is(err, errEmptyHeaderSet) ||errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) {log.Warn(\\\"Synchronisation failed, dropping peer\\\", \\\"peer\\\", id, \\\"err\\\", err)if d.dropPeer == nil {// The dropPeer method is nil when `--copydb` is used for a local copy.// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignoredlog.Warn(\\\"Downloader wants to drop peer, but peerdrop-function is not set\\\", \\\"peer\\\", id)} else {d.dropPeer(id)}return err}log.Warn(\\\"Synchronisation failed, retrying\\\", \\\"err\\\", err)return err}
synchronise函数实现代码如下:
// synchronise will select the peer and use it for synchronising. If an empty string is given// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the// checks fail an error will be returned. This method is synchronousfunc (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {// Mock out the synchronisation if testingif d.synchroniseMock != nil {return d.synchroniseMock(id, hash)}// Make sure only one goroutine is ever allowed past this point at once // 只能运行一个, 检查是否正在运行if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {return errBusy}defer atomic.StoreInt32(&d.synchronising, 0)// Post a user notification of the sync (only once per session) // 发布同步的用户通知(每个会话仅一次)if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {log.Info(\\\"Block synchronisation started\\\")}// If we are already full syncing, but have a fast-sync bloom filter laying// around, make sure it doesn\\\'t use memory any more. This is a special case// when the user attempts to fast sync a new empty network.if mode == FullSync && d.stateBloom != nil {d.stateBloom.Close()}// If snap sync was requested, create the snap scheduler and switch to fast// sync mode. Long term we could drop fast sync or merge the two together,// but until snap becomes prevalent, we should support both. TODO(karalabe).if mode == SnapSync {if !d.snapSync {log.Warn(\\\"Enabling snapshot sync prototype\\\")d.snapSync = true}mode = FastSync}// Reset the queue, peer set and wake channels to clean any internal leftover stated.queue.Reset(blockCacheMaxItems, blockCacheInitialItems) // 重置queue的状态d.peers.Reset() // 重置peer的状态for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { // 清空d.bodyWakeCh, d.receiptWakeChselect {case <-ch:default:}}for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} { //清空d.headerCh, d.bodyCh, d.receiptChfor empty := false; !empty; {select {case <-ch:default:empty = true}}}for empty := false; !empty; { // 清空headerProcChselect {case <-d.headerProcCh:default:empty = true}}// Create cancel channel for aborting mid-flight and mark the master peerd.cancelLock.Lock()d.cancelCh = make(chan struct{})d.cancelPeer = idd.cancelLock.Unlock()defer d.Cancel() // No matter what, we can\\\'t leave the cancel channel open// Atomically set the requested sync modeatomic.StoreUint32(&d.mode, uint32(mode))// Retrieve the origin peer and initiate the downloading processp := d.peers.Peer(id)if p == nil {return errUnknownPeer}return d.syncWithPeer(p, hash, td) // 基于哈希链从指定的peer和head hash开始块同步}
syncWithPeer函数代码如下所示:
// filedir:go-ethereum-1.10.2\\\\eth\\\\downloader\\\\downloader.go L448// syncWithPeer starts a block synchronization based on the hash chain from the// specified peer and head hash.func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {d.mux.Post(StartEvent{})defer func() {// reset on errorif err != nil {d.mux.Post(FailedEvent{err})} else {latest := d.lightchain.CurrentHeader()d.mux.Post(DoneEvent{latest})}}()if p.version < 64 {return fmt.Errorf(\\\"%w: advertized %d < required %d\\\", errTooOld, p.version, 64)}mode := d.getMode()log.Debug(\\\"Synchronising with the network\\\", \\\"peer\\\", p.id, \\\"eth\\\", p.version, \\\"head\\\", hash, \\\"td\\\", td, \\\"mode\\\", mode)defer func(start time.Time) {log.Debug(\\\"Synchronisation terminated\\\", \\\"elapsed\\\", common.PrettyDuration(time.Since(start)))}(time.Now())// Look up the sync boundaries: the common ancestor and the target blocklatest, pivot, err := d.fetchHead(p)if err != nil {return err}if mode == FastSync && pivot == nil {// If no pivot block was returned, the head is below the min full block// threshold (i.e. new chian). In that case we won\\\'t really fast sync// anyway, but still need a valid pivot block to avoid some code hitting// nil panics on an access.pivot = d.blockchain.CurrentBlock().Header()}height := latest.Number.Uint64()origin, err := d.findAncestor(p, latest) // 通过findAncestor来获取共同祖先,以便找到一个开始同步的点if err != nil {return err}d.syncStatsLock.Lock()if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {d.syncStatsChainOrigin = origin}d.syncStatsChainHeight = heightd.syncStatsLock.Unlock()// Ensure our origin point is below any fast sync pivot pointif mode == FastSync {if height <= uint64(fsMinFullBlocks) { // 如果对端节点的height小于64,则共同祖先更新为0origin = 0} else { // 否则更新pivot为对端节点height-64pivotNumber := pivot.Number.Uint64()if pivotNumber <= origin { // 如果pivot小于共同祖先,则更新共同祖先为pivot的前一个origin = pivotNumber - 1}// Write out the pivot into the database so a rollback beyond it will// reenable fast syncrawdb.WriteLastPivotNumber(d.stateDB, pivotNumber)}}d.committed = 1if mode == FastSync && pivot.Number.Uint64() != 0 {d.committed = 0}if mode == FastSync {// Set the ancient data limitation.// If we are running fast sync, all block data older than ancientLimit will be// written to the ancient store. More recent data will be written to the active// database and will wait for the freezer to migrate.//// If there is a checkpoint available, then calculate the ancientLimit through// that. Otherwise calculate the ancient limit through the advertised height// of the remote peer.//// The reason for picking checkpoint first is that a malicious peer can give us// a fake (very high) height, forcing the ancient limit to also be very high.// The peer would start to feed us valid blocks until head, resulting in all of// the blocks might be written into the ancient store. A following mini-reorg// could cause issues.if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {d.ancientLimit = d.checkpoint} else if height > fullMaxForkAncestry+1 {d.ancientLimit = height - fullMaxForkAncestry - 1} else {d.ancientLimit = 0}frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.// If a part of blockchain data has already been written into active store,// disable the ancient style insertion explicitly.if origin >= frozen && frozen != 0 {d.ancientLimit = 0log.Info(\\\"Disabling direct-ancient mode\\\", \\\"origin\\\", origin, \\\"ancient\\\", frozen-1)} else if d.ancientLimit > 0 {log.Debug(\\\"Enabling direct-ancient mode\\\", \\\"ancient\\\", d.ancientLimit)}// Rewind the ancient store and blockchain if reorg happens.if origin+1 < frozen {if err := d.lightchain.SetHead(origin + 1); err != nil {return err}}}// Initiate the sync using a concurrent header and content retrieval algorithmd.queue.Prepare(origin+1, mode) // 更新queue的值从共同祖先+1开始,即从共同祖先开始sync区块if d.syncInitHook != nil {d.syncInitHook(origin, height)}fetchers := []func() error{func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrievedfunc() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast syncfunc() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast syncfunc() error { return d.processHeaders(origin+1, td) },}if mode == FastSync { //根据模式的不同,增加新的处理逻辑d.pivotLock.Lock()d.pivotHeader = pivotd.pivotLock.Unlock()fetchers = append(fetchers, func() error { return d.processFastSyncContent() })} else if mode == FullSync {fetchers = append(fetchers, d.processFullSyncContent)}return d.spawnSync(fetchers)}
spawnSync会给每个fetcher启动一个goroutine, 然后阻塞的等待fetcher出错:
// spawnSync runs d.process and all given fetcher functions to completion in// separate goroutines, returning the first error that appears.func (d *Downloader) spawnSync(fetchers []func() error) error {errc := make(chan error, len(fetchers))d.cancelWg.Add(len(fetchers))for _, fn := range fetchers {fn := fngo func() { defer d.cancelWg.Done(); errc <- fn() }()}// Wait for the first error, then terminate the others.var err errorfor i := 0; i < len(fetchers); i++ {if i == len(fetchers)-1 {// Close the queue when all fetchers have exited.// This will cause the block processor to end when// it has processed the queue.d.queue.Close()}if err = <-errc; err != nil && err != errCanceled {break}}d.queue.Close()d.Cancel()return err}
同步State
state即世界状态,其保存着所有账户的余额等信息
// filedir: go-ethereum-1.10.2\\\\eth\\\\downloader\\\\statesync.go// stateFetcher manages the active state sync and accepts requests// on its behalf.func (d *Downloader) stateFetcher() {for {select {case s := <-d.stateSyncStart:for next := s; next != nil; {next = d.runStateSync(next)}case <-d.stateCh:// Ignore state responses while no sync is running.case <-d.quitCh:return}}}
runStateSync函数执行状态同步,直到它完成或请求切换到另一个根哈希:
// runStateSync runs a state synchronisation until it completes or another root// hash is requested to be switched over to.func (d *Downloader) runStateSync(s *stateSync) *stateSync {var (active = make(map[string]*stateReq) // Currently in-flight requestsfinished []*stateReq // Completed or failed requeststimeout = make(chan *stateReq) // Timed out active requests)log.Trace(\\\"State sync starting\\\", \\\"root\\\", s.root)defer func() {// Cancel active request timers on exit. Also set peers to idle so they\\\'re// available for the next sync.for _, req := range active {req.timer.Stop()req.peer.SetNodeDataIdle(int(req.nItems), time.Now())}}()go s.run()defer s.Cancel()// Listen for peer departure events to cancel assigned taskspeerDrop := make(chan *peerConnection, 1024)peerSub := s.d.peers.SubscribePeerDrops(peerDrop)defer peerSub.Unsubscribe()for {// Enable sending of the first buffered element if there is one.var (deliverReq *stateReqdeliverReqCh chan *stateReq)if len(finished) > 0 {deliverReq = finished[0]deliverReqCh = s.deliver}select {// The stateSync lifecycle:case next := <-d.stateSyncStart:d.spindownStateSync(active, finished, timeout, peerDrop)return nextcase <-s.done:d.spindownStateSync(active, finished, timeout, peerDrop)return nil// Send the next finished request to the current sync:case deliverReqCh <- deliverReq:// Shift out the first request, but also set the emptied slot to nil for GCcopy(finished, finished[1:])finished[len(finished)-1] = nilfinished = finished[:len(finished)-1]// Handle incoming state packs:case pack := <-d.stateCh:// Discard any data not requested (or previously timed out)req := active[pack.PeerId()]if req == nil {log.Debug(\\\"Unrequested node data\\\", \\\"peer\\\", pack.PeerId(), \\\"len\\\", pack.Items())continue}// Finalize the request and queue up for processingreq.timer.Stop()req.response = pack.(*statePack).statesreq.delivered = time.Now()finished = append(finished, req)delete(active, pack.PeerId())// Handle dropped peer connections:case p := <-peerDrop:// Skip if no request is currently pendingreq := active[p.id]if req == nil {continue}// Finalize the request and queue up for processingreq.timer.Stop()req.dropped = truereq.delivered = time.Now()finished = append(finished, req)delete(active, p.id)// Handle timed-out requests:case req := <-timeout:// If the peer is already requesting something else, ignore the stale timeout.// This can happen when the timeout and the delivery happens simultaneously,// causing both pathways to trigger.if active[req.peer.id] != req {continue}req.delivered = time.Now()// Move the timed out data back into the download queuefinished = append(finished, req)delete(active, req.peer.id)// Track outgoing state requests:case req := <-d.trackStateReq:// If an active request already exists for this peer, we have a problem. In// theory the trie node schedule must never assign two requests to the same// peer. In practice however, a peer might receive a request, disconnect and// immediately reconnect before the previous times out. In this case the first// request is never honored, alas we must not silently overwrite it, as that// causes valid requests to go missing and sync to get stuck.if old := active[req.peer.id]; old != nil {log.Warn(\\\"Busy peer assigned new state fetch\\\", \\\"peer\\\", old.peer.id)// Move the previous request to the finished setold.timer.Stop()old.dropped = trueold.delivered = time.Now()finished = append(finished, old)}// Start a timer to notify the sync loop if the peer stalled.req.timer = time.AfterFunc(req.timeout, func() {timeout <- req})active[req.peer.id] = req}}}
同步Head
// fetchHead retrieves the head header and prior pivot block (if available) from// a remote peer.func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *types.Header, err error) {p.log.Debug(\\\"Retrieving remote chain head\\\")mode := d.getMode()// Request the advertised remote head block and wait for the responselatest, _ := p.peer.Head()fetch := 1if mode == FastSync {fetch = 2 // head + pivot headers}go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true)ttl := d.requestTTL()timeout := time.After(ttl)for {select {case <-d.cancelCh:return nil, nil, errCanceledcase packet := <-d.headerCh:// Discard anything not from the origin peerif packet.PeerId() != p.id {log.Debug(\\\"Received headers from incorrect peer\\\", \\\"peer\\\", packet.PeerId())break}// Make sure the peer gave us at least one and at most the requested headersheaders := packet.(*headerPack).headersif len(headers) == 0 || len(headers) > fetch {return nil, nil, fmt.Errorf(\\\"%w: returned headers %d != requested %d\\\", errBadPeer, len(headers), fetch)}// The first header needs to be the head, validate against the checkpoint// and request. If only 1 header was returned, make sure there\\\'s no pivot// or there was not one requested.head := headers[0]if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {return nil, nil, fmt.Errorf(\\\"%w: remote head %d below checkpoint %d\\\", errUnsyncedPeer, head.Number, d.checkpoint)}if len(headers) == 1 {if mode == FastSync && head.Number.Uint64() > uint64(fsMinFullBlocks) {return nil, nil, fmt.Errorf(\\\"%w: no pivot included along head header\\\", errBadPeer)}p.log.Debug(\\\"Remote head identified, no pivot\\\", \\\"number\\\", head.Number, \\\"hash\\\", head.Hash())return head, nil, nil}// At this point we have 2 headers in total and the first is the// validated head of the chian. Check the pivot number and return,pivot := headers[1]if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) {return nil, nil, fmt.Errorf(\\\"%w: remote pivot %d != requested %d\\\", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks))}return head, pivot, nilcase <-timeout:p.log.Debug(\\\"Waiting for head header timed out\\\", \\\"elapsed\\\", ttl)return nil, nil, errTimeoutcase <-d.bodyCh:case <-d.receiptCh:// Out of bounds delivery, ignore}}}
处理Head
// processHeaders takes batches of retrieved headers from an input channel and// keeps processing and scheduling them into the header chain and downloader\\\'s// queue until the stream ends or a failure occurs.func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {// Keep a count of uncertain headers to roll backvar (rollback uint64 // Zero means no rollback (fine as you can\\\'t unroll the genesis)rollbackErr errormode = d.getMode())defer func() {if rollback > 0 {lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0if mode != LightSync {lastFastBlock = d.blockchain.CurrentFastBlock().Number()lastBlock = d.blockchain.CurrentBlock().Number()}if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block// We\\\'re already unwinding the stack, only print the error to make it more visiblelog.Error(\\\"Failed to roll back chain segment\\\", \\\"head\\\", rollback-1, \\\"err\\\", err)}curFastBlock, curBlock := common.Big0, common.Big0if mode != LightSync {curFastBlock = d.blockchain.CurrentFastBlock().Number()curBlock = d.blockchain.CurrentBlock().Number()}log.Warn(\\\"Rolled back chain segment\\\",\\\"header\\\", fmt.Sprintf(\\\"%d->%d\\\", lastHeader, d.lightchain.CurrentHeader().Number),\\\"fast\\\", fmt.Sprintf(\\\"%d->%d\\\", lastFastBlock, curFastBlock),\\\"block\\\", fmt.Sprintf(\\\"%d->%d\\\", lastBlock, curBlock), \\\"reason\\\", rollbackErr)}}()// Wait for batches of headers to processgotHeaders := falsefor {select {case <-d.cancelCh:rollbackErr = errCanceledreturn errCanceledcase headers := <-d.headerProcCh:// Terminate header processing if we synced upif len(headers) == 0 {// Notify everyone that headers are fully processedfor _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {select {case ch <- false:case <-d.cancelCh:}}// If no headers were retrieved at all, the peer violated its TD promise that it had a// better chain compared to ours. The only exception is if its promised blocks were// already imported by other means (e.g. fetcher)://// R <remote peer>, L <local node>: Both at block 10// R: Mine block 11, and propagate it to L// L: Queue block 11 for import// L: Notice that R\\\'s head and TD increased compared to ours, start sync// L: Import of block 11 finishes// L: Sync begins, and finds common ancestor at 11// L: Request new headers up from 11 (R\\\'s TD was higher, it must have something)// R: Nothing to giveif mode != LightSync {head := d.blockchain.CurrentBlock()if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {return errStallingPeer}}// If fast or light syncing, ensure promised headers are indeed delivered. This is// needed to detect scenarios where an attacker feeds a bad pivot and then bails out// of delivering the post-pivot blocks that would flag the invalid content.//// This check cannot be executed \\\"as is\\\" for full imports, since blocks may still be// queued for processing when the header download completes. However, as long as the// peer gave us something useful, we\\\'re already happy/progressed (above check).if mode == FastSync || mode == LightSync {head := d.lightchain.CurrentHeader()if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {return errStallingPeer}}// Disable any rollback and returnrollback = 0return nil}// Otherwise split the chunk of headers into batches and process themgotHeaders = truefor len(headers) > 0 {// Terminate if something failed in between processing chunksselect {case <-d.cancelCh:rollbackErr = errCanceledreturn errCanceleddefault:}// Select the next chunk of headers to importlimit := maxHeadersProcessif limit > len(headers) {limit = len(headers)}chunk := headers[:limit]// In case of header only syncing, validate the chunk immediatelyif mode == FastSync || mode == LightSync {// If we\\\'re importing pure headers, verify based on their recentnessvar pivot uint64d.pivotLock.RLock()if d.pivotHeader != nil {pivot = d.pivotHeader.Number.Uint64()}d.pivotLock.RUnlock()frequency := fsHeaderCheckFrequencyif chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {frequency = 1}if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {rollbackErr = err// If some headers were inserted, track them as uncertainif (mode == FastSync || frequency > 1) && n > 0 && rollback == 0 {rollback = chunk[0].Number.Uint64()}log.Warn(\\\"Invalid header encountered\\\", \\\"number\\\", chunk[n].Number, \\\"hash\\\", chunk[n].Hash(), \\\"parent\\\", chunk[n].ParentHash, \\\"err\\\", err)return fmt.Errorf(\\\"%w: %v\\\", errInvalidChain, err)}// All verifications passed, track all headers within the alloted limitsif mode == FastSync {head := chunk[len(chunk)-1].Number.Uint64()if head-rollback > uint64(fsHeaderSafetyNet) {rollback = head - uint64(fsHeaderSafetyNet)} else {rollback = 1}}}// Unless we\\\'re doing light chains, schedule the headers for associated content retrievalif mode == FullSync || mode == FastSync {// If we\\\'ve reached the allowed number of pending headers, stall a bitfor d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {select {case <-d.cancelCh:rollbackErr = errCanceledreturn errCanceledcase <-time.After(time.Second):}}// Otherwise insert the headers for content retrievalinserts := d.queue.Schedule(chunk, origin)if len(inserts) != len(chunk) {rollbackErr = fmt.Errorf(\\\"stale headers: len inserts %v len(chunk) %v\\\", len(inserts), len(chunk))return fmt.Errorf(\\\"%w: stale headers\\\", errBadPeer)}}headers = headers[limit:]origin += uint64(limit)}// Update the highest block number we know if a higher one is found.d.syncStatsLock.Lock()if d.syncStatsChainHeight < origin {d.syncStatsChainHeight = origin - 1}d.syncStatsLock.Unlock()// Signal the content downloaders of the availablility of new tasksfor _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {select {case ch <- true:default:}}}}}
同步Body
// fetchBodies iteratively downloads the scheduled block bodies, taking any// available peers, reserving a chunk of blocks for each, waiting for delivery// and also periodically checking for timeouts.func (d *Downloader) fetchBodies(from uint64) error {log.Debug(\\\"Downloading block bodies\\\", \\\"origin\\\", from)var (deliver = func(packet dataPack) (int, error) {pack := packet.(*bodyPack)return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)}expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) })err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies,d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, \\\"bodies\\\")log.Debug(\\\"Block body download terminated\\\", \\\"err\\\", err)return err}
// DeliverBodies injects a new batch of block bodies received from a remote node.func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) error {return d.deliver(d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)}
// deliver injects a new batch of data received from a remote node.func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {// Update the delivery metrics for both good and failed deliveriesinMeter.Mark(int64(packet.Items()))defer func() {if err != nil {dropMeter.Mark(int64(packet.Items()))}}()// Deliver or abort if the sync is canceled while queuingd.cancelLock.RLock()cancel := d.cancelChd.cancelLock.RUnlock()if cancel == nil {return errNoSyncActive}select {case destCh <- packet:return nilcase <-cancel:return errNoSyncActive}}
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {// Create a ticker to detect expired retrieval tasksticker := time.NewTicker(100 * time.Millisecond)defer ticker.Stop()update := make(chan struct{}, 1)// Prepare the queue and fetch block parts until the block header fetcher\\\'s donefinished := falsefor {select {case <-d.cancelCh:return errCanceledcase packet := <-deliveryCh:deliveryTime := time.Now()// If the peer was previously banned and failed to deliver its pack// in a reasonable time frame, ignore its message.if peer := d.peers.Peer(packet.PeerId()); peer != nil {// Deliver the received chunk of data and check chain validityaccepted, err := deliver(packet)if errors.Is(err, errInvalidChain) {return err}// Unless a peer delivered something completely else than requested (usually// caused by a timed out request which came through in the end), set it to// idle. If the delivery\\\'s stale, the peer should have already been idled.if !errors.Is(err, errStaleDelivery) {setIdle(peer, accepted, deliveryTime)}// Issue a log to the user to see what\\\'s going onswitch {case err == nil && packet.Items() == 0:peer.log.Trace(\\\"Requested data not delivered\\\", \\\"type\\\", kind)case err == nil:peer.log.Trace(\\\"Delivered new batch of data\\\", \\\"type\\\", kind, \\\"count\\\", packet.Stats())default:peer.log.Debug(\\\"Failed to deliver retrieved data\\\", \\\"type\\\", kind, \\\"err\\\", err)}}// Blocks assembled, try to update the progressselect {case update <- struct{}{}:default:}case cont := <-wakeCh:// The header fetcher sent a continuation flag, check if it\\\'s doneif !cont {finished = true}// Headers arrive, try to update the progressselect {case update <- struct{}{}:default:}case <-ticker.C:// Sanity check update the progressselect {case update <- struct{}{}:default:}case <-update:// Short circuit if we lost all our peersif d.peers.Len() == 0 {return errNoPeers}// Check for fetch request timeouts and demote the responsible peersfor pid, fails := range expire() {if peer := d.peers.Peer(pid); peer != nil {// If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps// ourselves. Only reset to minimal throughput but don\\\'t drop just yet. If even the minimal times// out that sync wise we need to get rid of the peer.//// The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing// how response times reacts, to it always requests one more than the minimum (i.e. min 2).if fails > 2 {peer.log.Trace(\\\"Data delivery timed out\\\", \\\"type\\\", kind)setIdle(peer, 0, time.Now())} else {peer.log.Debug(\\\"Stalling delivery, dropping\\\", \\\"type\\\", kind)if d.dropPeer == nil {// The dropPeer method is nil when `--copydb` is used for a local copy.// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignoredpeer.log.Warn(\\\"Downloader wants to drop peer, but peerdrop-function is not set\\\", \\\"peer\\\", pid)} else {d.dropPeer(pid)// If this peer was the master peer, abort sync immediatelyd.cancelLock.RLock()master := pid == d.cancelPeerd.cancelLock.RUnlock()if master {d.cancel()return errTimeout}}}}}// If there\\\'s nothing more to fetch, wait or terminateif pending() == 0 {if !inFlight() && finished {log.Debug(\\\"Data fetching completed\\\", \\\"type\\\", kind)return nil}break}// Send a download request to all idle peers, until throttledprogressed, throttled, running := false, false, inFlight()idles, total := idle()pendCount := pending()for _, peer := range idles {// Short circuit if throttling activatedif throttled {break}// Short circuit if there is no more available task.if pendCount = pending(); pendCount == 0 {break}// Reserve a chunk of fetches for a peer. A nil can mean either that// no more headers are available, or that the peer is known not to// have them.request, progress, throttle := reserve(peer, capacity(peer))if progress {progressed = true}if throttle {throttled = truethrottleCounter.Inc(1)}if request == nil {continue}if request.From > 0 {peer.log.Trace(\\\"Requesting new batch of data\\\", \\\"type\\\", kind, \\\"from\\\", request.From)} else {peer.log.Trace(\\\"Requesting new batch of data\\\", \\\"type\\\", kind, \\\"count\\\", len(request.Headers), \\\"from\\\", request.Headers[0].Number)}// Fetch the chunk and make sure any errors return the hashes to the queueif fetchHook != nil {fetchHook(request.Headers)}if err := fetch(peer, request); err != nil {// Although we could try and make an attempt to fix this, this error really// means that we\\\'ve double allocated a fetch task to a peer. If that is the// case, the internal state of the downloader and the queue is very wrong so// better hard crash and note the error instead of silently accumulating into// a much bigger issue.panic(fmt.Sprintf(\\\"%v: %s fetch assignment failed\\\", peer, kind))}running = true}// Make sure that we have peers available for fetching. If all peers have been tried// and all failed throw an errorif !progressed && !throttled && !running && len(idles) == total && pendCount > 0 {return errPeersUnavailable}}}}
同步收据
// fetchReceipts iteratively downloads the scheduled block receipts, taking any// available peers, reserving a chunk of receipts for each, waiting for delivery// and also periodically checking for timeouts.func (d *Downloader) fetchReceipts(from uint64) error {log.Debug(\\\"Downloading transaction receipts\\\", \\\"origin\\\", from)var (deliver = func(packet dataPack) (int, error) {pack := packet.(*receiptPack)return d.queue.DeliverReceipts(pack.peerID, pack.receipts)}expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {p.SetReceiptsIdle(accepted, deliveryTime)})err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, \\\"receipts\\\")log.Debug(\\\"Transaction receipt download terminated\\\", \\\"err\\\", err)return err}
// DeliverReceipts injects a new batch of receipts received from a remote node.func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) error {return d.deliver(d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)}
// deliver injects a new batch of data received from a remote node.func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {// Update the delivery metrics for both good and failed deliveriesinMeter.Mark(int64(packet.Items()))defer func() {if err != nil {dropMeter.Mark(int64(packet.Items()))}}()// Deliver or abort if the sync is canceled while queuingd.cancelLock.RLock()cancel := d.cancelChd.cancelLock.RUnlock()if cancel == nil {return errNoSyncActive}select {case destCh <- packet:return nilcase <-cancel:return errNoSyncActive}}
Content
// processFullSyncContent takes fetch results from the queue and imports them into the chain.func (d *Downloader) processFullSyncContent() error {for {results := d.queue.Results(true)if len(results) == 0 {return nil}if d.chainInsertHook != nil {d.chainInsertHook(results)}if err := d.importBlockResults(results); err != nil {return err}}}
// processFastSyncContent takes fetch results from the queue and writes them to the// database. It also controls the synchronisation of state nodes of the pivot block.func (d *Downloader) processFastSyncContent() error {// Start syncing state of the reported head block. This should get us most of// the state of the pivot block.d.pivotLock.RLock()sync := d.syncState(d.pivotHeader.Root)d.pivotLock.RUnlock()defer func() {// The `sync` object is replaced every time the pivot moves. We need to// defer close the very last active one, hence the lazy evaluation vs.// calling defer sync.Cancel() !!!sync.Cancel()}()closeOnErr := func(s *stateSync) {if err := s.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled && err != snap.ErrCancelled {d.queue.Close() // wake up Results}}go closeOnErr(sync)// To cater for moving pivot points, track the pivot block and subsequently// accumulated download results separately.var (oldPivot *fetchResult // Locked in pivot block, might change eventuallyoldTail []*fetchResult // Downloaded content after the pivot)for {// Wait for the next batch of downloaded data to be available, and if the pivot// block became stale, move the goalpostresults := d.queue.Results(oldPivot == nil) // Block if we\\\'re not monitoring pivot stalenessif len(results) == 0 {// If pivot sync is done, stopif oldPivot == nil {return sync.Cancel()}// If sync failed, stopselect {case <-d.cancelCh:sync.Cancel()return errCanceleddefault:}}if d.chainInsertHook != nil {d.chainInsertHook(results)}// If we haven\\\'t downloaded the pivot block yet, check pivot staleness// notifications from the header downloaderd.pivotLock.RLock()pivot := d.pivotHeaderd.pivotLock.RUnlock()if oldPivot == nil {if pivot.Root != sync.root {sync.Cancel()sync = d.syncState(pivot.Root)go closeOnErr(sync)}} else {results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)}// Split around the pivot block and process the two sides via fast/full syncif atomic.LoadInt32(&d.committed) == 0 {latest := results[len(results)-1].Header// If the height is above the pivot block by 2 sets, it means the pivot// become stale in the network and it was garbage collected, move to a// new pivot.//// Note, we have `reorgProtHeaderDelay` number of blocks withheld, Those// need to be taken into account, otherwise we\\\'re detecting the pivot move// late and will drop peers due to unavailable state!!!if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*uint64(fsMinFullBlocks)-uint64(reorgProtHeaderDelay) {log.Warn(\\\"Pivot became stale, moving\\\", \\\"old\\\", pivot.Number.Uint64(), \\\"new\\\", height-uint64(fsMinFullBlocks)+uint64(reorgProtHeaderDelay))pivot = results[len(results)-1-fsMinFullBlocks+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommittedd.pivotLock.Lock()d.pivotHeader = pivotd.pivotLock.Unlock()// Write out the pivot into the database so a rollback beyond it will// reenable fast syncrawdb.WriteLastPivotNumber(d.stateDB, pivot.Number.Uint64())}}P, beforeP, afterP := splitAroundPivot(pivot.Number.Uint64(), results)if err := d.commitFastSyncData(beforeP, sync); err != nil {return err}if P != nil {// If new pivot block found, cancel old state retrieval and restartif oldPivot != P {sync.Cancel()sync = d.syncState(P.Header.Root)go closeOnErr(sync)oldPivot = P}// Wait for completion, occasionally checking for pivot stalenessselect {case <-sync.done:if sync.err != nil {return sync.err}if err := d.commitPivotBlock(P); err != nil {return err}oldPivot = nilcase <-time.After(time.Second):oldTail = afterPcontinue}}// Fast sync done, pivot commit done, full importif err := d.importBlockResults(afterP); err != nil {return err}}}
参考链接
https://www.jianshu.com/p/427fbc3a25f9
https://blog.csdn.net/pulong0748/article/details/111574388
原创文章,作者:七芒星实验室,如若转载,请注明出处:https://www.sudun.com/ask/34339.html