Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: add flag to config bucket size of DHT #643

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
425 changes: 425 additions & 0 deletions cmd/p2psim/main.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions cmd/ronin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ var (
utils.DisableRoninProtocol,
utils.AdditionalChainEventFlag,
utils.DBEngineFlag,
utils.DHTBucketSizeFlag,
}

rpcFlags = []cli.Flag{
Expand Down
7 changes: 7 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,12 @@ var (
Usage: "Sets DNS discovery entry points (use \"\" to disable DNS)",
Category: flags.NetworkingCategory,
}
DHTBucketSizeFlag = &cli.IntFlag{
Name: "dht.bucketsize",
Usage: "Size of each DHT bucket",
Value: 16,
Category: flags.NetworkingCategory,
}

// ATM the url is left to the user and deployment to
JSpathFlag = &flags.DirectoryFlag{
Expand Down Expand Up @@ -1542,6 +1548,7 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) {
cfg.NoDiscovery = true
cfg.DiscoveryV5 = false
}
cfg.DHTBucketSize = ctx.Int(DHTBucketSizeFlag.Name)
}

// SetNodeConfig applies node-related command line flags to the config.
Expand Down
3 changes: 3 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ type Config struct {

// Send additional chain event
EnableAdditionalChainEvent bool

// Size of each bucket in DHT
DHTBucketSize int
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down
1 change: 1 addition & 0 deletions p2p/discover/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Config struct {
ValidSchemes enr.IdentityScheme // allowed identity schemes
Clock mclock.Clock
FilterFunction NodeFilterFunc // function for filtering ENR entries
DHTBucketSize int // size of each bucket in DHT
}

func (cfg Config) withDefaults() Config {
Expand Down
6 changes: 3 additions & 3 deletions p2p/discover/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (it *lookup) advance() bool {
for _, n := range nodes {
if n != nil && !it.seen[n.ID()] {
it.seen[n.ID()] = true
it.result.push(n, bucketSize)
it.result.push(n, defaultBucketSize)
it.replyBuffer = append(it.replyBuffer, n)
}
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func (it *lookup) startQueries() bool {

// The first query returns nodes from the local table.
if it.queries == -1 {
closest := it.tab.findnodeByID(it.result.target, bucketSize, false)
closest := it.tab.findnodeByID(it.result.target, defaultBucketSize, false)
// Avoid finishing the lookup too quickly if table is empty. It'd be better to wait
// for the table to fill in this case, but there is no good mechanism for that
// yet.
Expand Down Expand Up @@ -151,7 +151,7 @@ func (it *lookup) query(n *node, reply chan<- []*node) {
// Remove the node from the local table if it fails to return anything useful too
// many times, but only if there are enough other nodes in the bucket.
dropped := false
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 {
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= defaultBucketSize/2 {
dropped = true
it.tab.delete(n)
}
Expand Down
32 changes: 19 additions & 13 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (

const (
alpha = 3 // Kademlia concurrency factor
bucketSize = 16 // Kademlia bucket size
defaultBucketSize = 16 // Kademlia bucket size
maxReplacements = 10 // Size of per-bucket replacement list
maxWorkerTask = 90 // Maximum number of worker tasks
timeoutWorkerTaskClose = 1 * time.Second // Timeout for waiting workerPoolTask is refill full
Expand All @@ -67,11 +67,12 @@ const (
// itself up-to-date by verifying the liveness of neighbors and requesting their node
// records when announcements of a new record version are received.
type Table struct {
mutex sync.Mutex // protects buckets, bucket content, nursery, rand
buckets [nBuckets]*bucket // index of known nodes by distance
nursery []*node // bootstrap nodes
rand *mrand.Rand // source of randomness, periodically reseeded
ips netutil.DistinctNetSet
mutex sync.Mutex // protects buckets, bucket content, nursery, rand
buckets [nBuckets]*bucket // index of known nodes by distance
nursery []*node // bootstrap nodes
rand *mrand.Rand // source of randomness, periodically reseeded
ips netutil.DistinctNetSet
bucketSize int

log log.Logger
db *enode.DB // database of known nodes
Expand Down Expand Up @@ -105,7 +106,7 @@ type bucket struct {
index int
}

func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc) (*Table, error) {
func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc, bucketSize int) (*Table, error) {
tab := &Table{
net: t,
db: db,
Expand All @@ -118,6 +119,7 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
log: log,
enrFilter: filter,
bucketSize: bucketSize,
}
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
Expand All @@ -131,15 +133,19 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger
for i := 0; i < maxWorkerTask; i++ {
tab.workerPoolTask <- struct{}{}
}
// Set default bucket size if bucketSize is not set
if tab.bucketSize <= 0 {
tab.bucketSize = defaultBucketSize
}

tab.seedRand()
tab.loadSeedNodes()

return tab, nil
}

func newMeteredTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc) (*Table, error) {
tab, err := newTable(t, db, bootnodes, log, filter)
func newMeteredTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc, bucketSize int) (*Table, error) {
tab, err := newTable(t, db, bootnodes, log, filter, bucketSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -557,7 +563,7 @@ func (tab *Table) addSeenNodeSync(n *node) {
// Already in bucket, don't add.
return
}
if len(b.entries) >= bucketSize {
if len(b.entries) >= tab.bucketSize {
// Bucket full, maybe add as replacement.
tab.addReplacement(b, n)
return
Expand All @@ -581,7 +587,7 @@ func (tab *Table) filterNode(n *node) bool {
}
if node, err := tab.net.RequestENR(unwrapNode(n)); err != nil {
tab.log.Debug("ENR request failed", "id", n.ID(), "addr", n.addr(), "err", err)
return false
return true
} else if !tab.enrFilter(node.Record()) {
tab.log.Trace("ENR record filter out", "id", n.ID(), "addr", n.addr())
return true
Expand Down Expand Up @@ -636,7 +642,7 @@ func (tab *Table) addVerifiedNodeSync(n *node) {
// Already in bucket, moved to front.
return
}
if len(b.entries) >= bucketSize {
if len(b.entries) >= tab.bucketSize {
// Bucket full, maybe add as replacement.
tab.addReplacement(b, n)
return
Expand All @@ -646,7 +652,7 @@ func (tab *Table) addVerifiedNodeSync(n *node) {
return
}
// Add to front of bucket.
b.entries, _ = pushNode(b.entries, n, bucketSize)
b.entries, _ = pushNode(b.entries, n, tab.bucketSize)
b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now()
if tab.nodeAddedHook != nil {
Expand Down
6 changes: 3 additions & 3 deletions p2p/discover/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding

tab.mutex.Lock()
defer tab.mutex.Unlock()
wantSize := bucketSize
wantSize := defaultBucketSize
if !lastInBucketIsResponding && !newNodeIsResponding {
wantSize--
}
Expand All @@ -102,7 +102,7 @@ func TestBucket_bumpNoDuplicates(t *testing.T) {
Rand: rand.New(rand.NewSource(time.Now().Unix())),
Values: func(args []reflect.Value, rand *rand.Rand) {
// generate a random list of nodes. this will be the content of the bucket.
n := rand.Intn(bucketSize-1) + 1
n := rand.Intn(defaultBucketSize-1) + 1
nodes := make([]*node, n)
for i := range nodes {
nodes[i] = nodeAtDistance(enode.ID{}, 200, intIP(200))
Expand Down Expand Up @@ -296,7 +296,7 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value {
t := &closeTest{
Self: gen(enode.ID{}, rand).(enode.ID),
Target: gen(enode.ID{}, rand).(enode.ID),
N: rand.Intn(bucketSize),
N: rand.Intn(defaultBucketSize),
}
for _, id := range gen([]enode.ID{}, rand).([]enode.ID) {
r := new(enr.Record)
Expand Down
6 changes: 3 additions & 3 deletions p2p/discover/table_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func init() {

func newTestTable(t transport) (*Table, *enode.DB) {
db, _ := enode.OpenDB("")
tab, _ := newTable(t, db, nil, log.Root(), nil)
tab, _ := newTable(t, db, nil, log.Root(), nil, defaultBucketSize)
go tab.loop()
return tab, db
}
Expand Down Expand Up @@ -100,10 +100,10 @@ func intIP(i int) net.IP {
func fillBucket(tab *Table, n *node) (last *node) {
ld := enode.LogDist(tab.self().ID(), n.ID())
b := tab.bucket(n.ID())
for len(b.entries) < bucketSize {
for len(b.entries) < tab.bucketSize {
b.entries = append(b.entries, nodeAtDistance(tab.self().ID(), ld, intIP(ld)))
}
return b.entries[bucketSize-1]
return b.entries[tab.bucketSize-1]
}

// fillTable adds nodes the table to the end of their corresponding bucket
Expand Down
4 changes: 2 additions & 2 deletions p2p/discover/v4_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func TestUDPv4_Lookup(t *testing.T) {
for _, e := range results {
t.Logf(" ld=%d, %x", enode.LogDist(lookupTestnet.target.id(), e.ID()), e.ID().Bytes())
}
if len(results) != bucketSize {
t.Errorf("wrong number of results: got %d, want %d", len(results), bucketSize)
if len(results) != defaultBucketSize {
t.Errorf("wrong number of results: got %d, want %d", len(results), defaultBucketSize)
}
checkLookupResults(t, lookupTestnet, results)
}
Expand Down
22 changes: 18 additions & 4 deletions p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
log: cfg.Log,
}

tab, err := newMeteredTable(t, ln.Database(), cfg.Bootnodes, t.log, cfg.FilterFunction)
tab, err := newMeteredTable(t, ln.Database(), cfg.Bootnodes, t.log, cfg.FilterFunction, cfg.DHTBucketSize)
if err != nil {
return nil, err
}
Expand All @@ -155,6 +155,20 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return t, nil
}

func (t *UDPv4) NodesInDHT() [][]enode.Node {
if t == nil || t.tab == nil {
return nil
}
nodes := make([][]enode.Node, len(t.tab.buckets))
for i, bucket := range t.tab.buckets {
nodes[i] = make([]enode.Node, len(bucket.entries))
for j, entry := range bucket.entries {
nodes[i][j] = entry.Node
}
}
return nodes
}

// Self returns the local node.
func (t *UDPv4) Self() *enode.Node {
return t.localNode.Node()
Expand Down Expand Up @@ -303,7 +317,7 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke

// Add a matcher for 'neighbours' replies to the pending reply queue. The matcher is
// active until enough nodes have been received.
nodes := make([]*node, 0, bucketSize)
nodes := make([]*node, 0, defaultBucketSize)
nreceived := 0
rm := t.pending(toid, toaddr.IP, v4wire.NeighborsPacket, func(r v4wire.Packet) (matched bool, requestDone bool) {
reply := r.(*v4wire.Neighbors)
Expand All @@ -316,7 +330,7 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke
}
nodes = append(nodes, n)
}
return true, nreceived >= bucketSize
return true, nreceived >= defaultBucketSize
})
t.send(toaddr, toid, &v4wire.Findnode{
Target: target,
Expand Down Expand Up @@ -721,7 +735,7 @@ func (t *UDPv4) handleFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno

// Determine closest nodes.
target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
closest := t.tab.findnodeByID(target, bucketSize, true).entries
closest := t.tab.findnodeByID(target, defaultBucketSize, true).entries

// Send neighbors in chunks with at most maxNeighbors per packet
// to stay below the packet size limit.
Expand Down
6 changes: 3 additions & 3 deletions p2p/discover/v4_udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestUDPv4_findnode(t *testing.T) {
// take care not to overflow any bucket.
nodes := &nodesByDistance{target: testTarget.ID()}
live := make(map[enode.ID]bool)
numCandidates := 2 * bucketSize
numCandidates := 2 * defaultBucketSize
for i := 0; i < numCandidates; i++ {
key := newkey()
ip := net.IP{10, 13, 0, byte(i)}
Expand All @@ -278,12 +278,12 @@ func TestUDPv4_findnode(t *testing.T) {
test.table.db.UpdateLastPongReceived(remoteID, test.remoteaddr.IP, time.Now())

// check that closest neighbors are returned.
expected := test.table.findnodeByID(testTarget.ID(), bucketSize, true)
expected := test.table.findnodeByID(testTarget.ID(), defaultBucketSize, true)
test.packetIn(nil, &v4wire.Findnode{Target: testTarget, Expiration: futureExp})
waitNeighbors := func(want []*node) {
test.waitPacketOut(func(p *v4wire.Neighbors, to *net.UDPAddr, hash []byte) {
if len(p.Nodes) != len(want) {
t.Errorf("wrong number of results: got %d, want %d", len(p.Nodes), bucketSize)
t.Errorf("wrong number of results: got %d, want %d", len(p.Nodes), defaultBucketSize)
}
for i, n := range p.Nodes {
if n.ID.ID() != want[i].ID() {
Expand Down
2 changes: 1 addition & 1 deletion p2p/discover/v5_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
closeCtx: closeCtx,
cancelCloseCtx: cancelCloseCtx,
}
tab, err := newMeteredTable(t, t.db, cfg.Bootnodes, cfg.Log, cfg.FilterFunction)
tab, err := newMeteredTable(t, t.db, cfg.Bootnodes, cfg.Log, cfg.FilterFunction, cfg.DHTBucketSize)
if err != nil {
return nil, err
}
Expand Down
13 changes: 13 additions & 0 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ type Config struct {
Logger log.Logger `toml:",omitempty"`

clock mclock.Clock

// Size of each bucket in DHT
DHTBucketSize int
}

// Server manages all peer connections.
Expand Down Expand Up @@ -305,6 +308,15 @@ func (c *conn) set(f connFlag, val bool) {
}
}

// SetListenFunc sets the function used to accept inbound connections (testing only)
func (srv *Server) SetListenFunc(f func(network, addr string) (net.Listener, error)) {
srv.listenFunc = f
}

func (srv *Server) UDPv4() *discover.UDPv4 {
return srv.ntab
}

// LocalNode returns the local node record.
func (srv *Server) LocalNode() *enode.LocalNode {
return srv.localnode
Expand Down Expand Up @@ -630,6 +642,7 @@ func (srv *Server) setupDiscovery() error {
Unhandled: unhandled,
Log: srv.log,
FilterFunction: f,
DHTBucketSize: srv.Config.DHTBucketSize,
}
ntab, err := discover.ListenV4(conn, srv.localnode, cfg)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,21 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) {
return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
}

// Empty PeerStats
func (n *ExecNode) PeerStats() *PeerStats {
return &PeerStats{}
}

// Empty DHT
func (n *ExecNode) NodesInDHT() [][]enode.Node {
return nil
}

// Empty PeersInfo
func (n *ExecNode) PeersInfo() []*p2p.PeerInfo {
return nil
}

// execNodeConfig is used to serialize the node configuration so it can be
// passed to the child process as a JSON encoded environment variable
type execNodeConfig struct {
Expand Down
Loading
Loading