From 7ee0cf99a7a5201bd0595b4fc21b83eb5dd1103c Mon Sep 17 00:00:00 2001 From: Bui Quang Minh Date: Wed, 20 Nov 2024 10:45:34 +0700 Subject: [PATCH] eth/fetcher: don't skip block/header when parent is not found Currently, we simply skip importing block/header when parent block/header is not found. However, since multiple blocks can be imported in parallel, the not found parent might be due to the fact that the parent import does not finish yet. This leads to a suitation that the correct block in canonical chain is skipped and the node gets stuck until the peer timeout. We observe this behavior when there are reorgs and block import is time consuming. This commit fixes it by creating a new queue for those missing parent blocks and re-import them after the parent is imported. --- eth/fetcher/block_fetcher.go | 153 ++++++++++++++++++++++++++--------- 1 file changed, 116 insertions(+), 37 deletions(-) diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index fd547ebce..20de26f8c 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -20,6 +20,7 @@ package fetcher import ( "errors" "math/rand" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -32,10 +33,11 @@ import ( ) const ( - lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested - arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested - gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches - fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction + lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested + arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested + gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches + fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction + cleanMissingParentInterval = 30 * time.Second // Interval to clean missing parent mapping ) const ( @@ -183,6 +185,10 @@ type BlockFetcher struct { queues map[string]int // Per peer block counts to prevent memory exhaustion queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) + missingParentLock sync.Mutex // Protect missingParent mapping from concurrent use + missingParent map[common.Hash][]common.Hash // Mapping from parent hash to slice of block hashes of missing parent blocks + importMissingParent chan common.Hash + // Callbacks getHeader HeaderRetrievalFn // Retrieves a header from the local chain getBlock blockRetrievalFn // Retrieves a block from the local chain @@ -209,30 +215,32 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr ) *BlockFetcher { return &BlockFetcher{ - light: light, - notify: make(chan *blockAnnounce), - inject: make(chan *blockOrHeaderInject), - headerFilter: make(chan chan *headerFilterTask), - bodyFilter: make(chan chan *bodyFilterTask), - done: make(chan common.Hash), - quit: make(chan struct{}), - announces: make(map[string]int), - announced: make(map[common.Hash][]*blockAnnounce), - fetching: make(map[common.Hash]*blockAnnounce), - fetched: make(map[common.Hash][]*blockAnnounce), - completing: make(map[common.Hash]*blockAnnounce), - queue: prque.New(nil), - queues: make(map[string]int), - queued: make(map[common.Hash]*blockOrHeaderInject), - getHeader: getHeader, - getBlock: getBlock, - verifyHeader: verifyHeader, - verifyBlobHeader: verifyBlobHeader, - broadcastBlock: broadcastBlock, - chainHeight: chainHeight, - insertHeaders: insertHeaders, - insertChain: insertChain, - dropPeer: dropPeer, + light: light, + notify: make(chan *blockAnnounce), + inject: make(chan *blockOrHeaderInject), + headerFilter: make(chan chan *headerFilterTask), + bodyFilter: make(chan chan *bodyFilterTask), + done: make(chan common.Hash), + quit: make(chan struct{}), + announces: make(map[string]int), + announced: make(map[common.Hash][]*blockAnnounce), + fetching: make(map[common.Hash]*blockAnnounce), + fetched: make(map[common.Hash][]*blockAnnounce), + completing: make(map[common.Hash]*blockAnnounce), + queue: prque.New(nil), + queues: make(map[string]int), + queued: make(map[common.Hash]*blockOrHeaderInject), + missingParent: make(map[common.Hash][]common.Hash), + importMissingParent: make(chan common.Hash, blockLimit), + getHeader: getHeader, + getBlock: getBlock, + verifyHeader: verifyHeader, + verifyBlobHeader: verifyBlobHeader, + broadcastBlock: broadcastBlock, + chainHeight: chainHeight, + insertHeaders: insertHeaders, + insertChain: insertChain, + dropPeer: dropPeer, } } @@ -344,13 +352,15 @@ func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transac func (f *BlockFetcher) loop() { // Iterate the block fetching until a quit is requested var ( - fetchTimer = time.NewTimer(0) - completeTimer = time.NewTimer(0) + fetchTimer = time.NewTimer(0) + completeTimer = time.NewTimer(0) + cleanMissingParentTicker = time.NewTicker(cleanMissingParentInterval) ) <-fetchTimer.C // clear out the channel <-completeTimer.C defer fetchTimer.Stop() defer completeTimer.Stop() + defer cleanMissingParentTicker.Stop() for { // Clean up any expired block fetches @@ -378,7 +388,9 @@ func (f *BlockFetcher) loop() { } // Otherwise if fresh and still unknown, try and import if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) { + f.missingParentLock.Lock() f.forgetBlock(hash) + f.missingParentLock.Unlock() continue } if f.light { @@ -442,7 +454,9 @@ func (f *BlockFetcher) loop() { case hash := <-f.done: // A pending import finished, remove all traces of the notification f.forgetHash(hash) + f.missingParentLock.Lock() f.forgetBlock(hash) + f.missingParentLock.Unlock() case <-fetchTimer.C: // At least one block's timer ran out, check for needing retrieval @@ -684,6 +698,28 @@ func (f *BlockFetcher) loop() { f.enqueue(announce.origin, nil, block, sidecars) } } + + case hash := <-f.importMissingParent: + if op := f.queued[hash]; op != nil { + if f.light { + f.importHeaders(op.origin, op.header) + } else { + f.importBlocks(op.origin, op.block, op.sidecars) + } + } + case <-cleanMissingParentTicker.C: + height := f.chainHeight() + f.missingParentLock.Lock() + for _, blocks := range f.missingParent { + for _, block := range blocks { + if op := f.queued[block]; op != nil { + if op.number()+maxUncleDist < height { + f.forgetBlock(block) + } + } + } + } + f.missingParentLock.Unlock() } } } @@ -780,13 +816,16 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash) go func() { - defer func() { f.done <- hash }() - // If the parent's unknown, abort insertion + // If the parent's unknown, queue for later processing when parent block is imported parent := f.getHeader(header.ParentHash) if parent == nil { log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash) + f.missingParentLock.Lock() + f.missingParent[header.ParentHash] = append(f.missingParent[header.ParentHash], hash) + f.missingParentLock.Unlock() return } + defer func() { f.done <- hash }() // Validate the header and if something went wrong, drop the peer if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock { log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err) @@ -798,6 +837,14 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err) return } + f.missingParentLock.Lock() + nextBlockHashes, ok := f.missingParent[hash] + f.missingParentLock.Unlock() + if ok { + for _, nextBlockHash := range nextBlockHashes { + f.importMissingParent <- nextBlockHash + } + } // Invoke the testing hook if needed if f.importedHook != nil { f.importedHook(header, nil) @@ -814,14 +861,17 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars [] // Run the import on a new thread log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) go func() { - defer func() { f.done <- hash }() - - // If the parent's unknown, abort insertion - parent := f.getBlock(block.ParentHash()) + // If the parent's unknown, queue for later processing when parent block is imported + parentHash := block.ParentHash() + parent := f.getBlock(parentHash) if parent == nil { - log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) + log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", parentHash) + f.missingParentLock.Lock() + f.missingParent[parentHash] = append(f.missingParent[parentHash], hash) + f.missingParentLock.Unlock() return } + defer func() { f.done <- hash }() // Quickly validate the header and propagate the block if it passes err := f.verifyHeader(block.Header()) if err == nil { @@ -853,6 +903,14 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars [] blockAnnounceOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, nil, false) + f.missingParentLock.Lock() + nextBlockHashes, ok := f.missingParent[hash] + f.missingParentLock.Unlock() + if ok { + for _, nextBlockHash := range nextBlockHashes { + f.importMissingParent <- nextBlockHash + } + } // Invoke the testing hook if needed if f.importedHook != nil { f.importedHook(nil, block) @@ -906,6 +964,7 @@ func (f *BlockFetcher) forgetHash(hash common.Hash) { // forgetBlock removes all traces of a queued block from the fetcher's internal // state. +// The caller must hold the missingParentLock. func (f *BlockFetcher) forgetBlock(hash common.Hash) { if insert := f.queued[hash]; insert != nil { f.queues[insert.origin]-- @@ -913,5 +972,25 @@ func (f *BlockFetcher) forgetBlock(hash common.Hash) { delete(f.queues, insert.origin) } delete(f.queued, hash) + var parentHash common.Hash + if f.light { + parentHash = insert.header.ParentHash + } else { + parentHash = insert.block.ParentHash() + } + blocks := f.missingParent[parentHash] + for i, block := range blocks { + if block == hash { + // Swap with the last element then decrease the length + blocks[i] = blocks[len(blocks)-1] + blocks = blocks[:len(blocks)-1] + break + } + } + if len(blocks) == 0 { + delete(f.missingParent, parentHash) + } else { + f.missingParent[parentHash] = blocks + } } }