From 84a44c60053ff20fa25d6f7d916da218bceca3d6 Mon Sep 17 00:00:00 2001 From: sonhv0212 Date: Wed, 11 Dec 2024 09:39:20 +0700 Subject: [PATCH 1/2] p2p: add flag to config bucket size of DHT --- cmd/ronin/main.go | 1 + cmd/utils/flags.go | 7 +++++++ eth/ethconfig/config.go | 3 +++ p2p/discover/common.go | 1 + p2p/discover/lookup.go | 6 +++--- p2p/discover/table.go | 30 ++++++++++++++++++------------ p2p/discover/table_test.go | 6 +++--- p2p/discover/table_util_test.go | 6 +++--- p2p/discover/v4_lookup_test.go | 4 ++-- p2p/discover/v4_udp.go | 8 ++++---- p2p/discover/v4_udp_test.go | 6 +++--- p2p/discover/v5_udp.go | 2 +- p2p/server.go | 4 ++++ 13 files changed, 53 insertions(+), 31 deletions(-) diff --git a/cmd/ronin/main.go b/cmd/ronin/main.go index 5144bc14a9..f10154b970 100644 --- a/cmd/ronin/main.go +++ b/cmd/ronin/main.go @@ -180,6 +180,7 @@ var ( utils.DisableRoninProtocol, utils.AdditionalChainEventFlag, utils.DBEngineFlag, + utils.DHTBucketSizeFlag, } rpcFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 1a5377eeb0..949e081bab 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -871,6 +871,12 @@ var ( Usage: "Sets DNS discovery entry points (use \"\" to disable DNS)", Category: flags.NetworkingCategory, } + DHTBucketSizeFlag = &cli.IntFlag{ + Name: "dht.bucketsize", + Usage: "Size of each DHT bucket", + Value: 16, + Category: flags.NetworkingCategory, + } // ATM the url is left to the user and deployment to JSpathFlag = &flags.DirectoryFlag{ @@ -1542,6 +1548,7 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) { cfg.NoDiscovery = true cfg.DiscoveryV5 = false } + cfg.DHTBucketSize = ctx.Int(DHTBucketSizeFlag.Name) } // SetNodeConfig applies node-related command line flags to the config. diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 5e64069127..f629fe1a83 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -228,6 +228,9 @@ type Config struct { // Send additional chain event EnableAdditionalChainEvent bool + + // Size of each bucket in DHT + DHTBucketSize int } // CreateConsensusEngine creates a consensus engine for the given chain configuration. diff --git a/p2p/discover/common.go b/p2p/discover/common.go index 0872b1fa2a..7008b19e2a 100644 --- a/p2p/discover/common.go +++ b/p2p/discover/common.go @@ -78,6 +78,7 @@ type Config struct { ValidSchemes enr.IdentityScheme // allowed identity schemes Clock mclock.Clock FilterFunction NodeFilterFunc // function for filtering ENR entries + DHTBucketSize int // size of each bucket in DHT } func (cfg Config) withDefaults() Config { diff --git a/p2p/discover/lookup.go b/p2p/discover/lookup.go index 9ab4a71ce7..7ab1240b9a 100644 --- a/p2p/discover/lookup.go +++ b/p2p/discover/lookup.go @@ -73,7 +73,7 @@ func (it *lookup) advance() bool { for _, n := range nodes { if n != nil && !it.seen[n.ID()] { it.seen[n.ID()] = true - it.result.push(n, bucketSize) + it.result.push(n, defaultBucketSize) it.replyBuffer = append(it.replyBuffer, n) } } @@ -104,7 +104,7 @@ func (it *lookup) startQueries() bool { // The first query returns nodes from the local table. if it.queries == -1 { - closest := it.tab.findnodeByID(it.result.target, bucketSize, false) + closest := it.tab.findnodeByID(it.result.target, defaultBucketSize, false) // Avoid finishing the lookup too quickly if table is empty. It'd be better to wait // for the table to fill in this case, but there is no good mechanism for that // yet. @@ -151,7 +151,7 @@ func (it *lookup) query(n *node, reply chan<- []*node) { // Remove the node from the local table if it fails to return anything useful too // many times, but only if there are enough other nodes in the bucket. dropped := false - if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 { + if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= defaultBucketSize/2 { dropped = true it.tab.delete(n) } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 9e16f18d27..88fa36ef69 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -41,7 +41,7 @@ import ( const ( alpha = 3 // Kademlia concurrency factor - bucketSize = 16 // Kademlia bucket size + defaultBucketSize = 16 // Kademlia bucket size maxReplacements = 10 // Size of per-bucket replacement list maxWorkerTask = 90 // Maximum number of worker tasks timeoutWorkerTaskClose = 1 * time.Second // Timeout for waiting workerPoolTask is refill full @@ -67,11 +67,12 @@ const ( // itself up-to-date by verifying the liveness of neighbors and requesting their node // records when announcements of a new record version are received. type Table struct { - mutex sync.Mutex // protects buckets, bucket content, nursery, rand - buckets [nBuckets]*bucket // index of known nodes by distance - nursery []*node // bootstrap nodes - rand *mrand.Rand // source of randomness, periodically reseeded - ips netutil.DistinctNetSet + mutex sync.Mutex // protects buckets, bucket content, nursery, rand + buckets [nBuckets]*bucket // index of known nodes by distance + nursery []*node // bootstrap nodes + rand *mrand.Rand // source of randomness, periodically reseeded + ips netutil.DistinctNetSet + bucketSize int log log.Logger db *enode.DB // database of known nodes @@ -105,7 +106,7 @@ type bucket struct { index int } -func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc) (*Table, error) { +func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc, bucketSize int) (*Table, error) { tab := &Table{ net: t, db: db, @@ -118,6 +119,7 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, log: log, enrFilter: filter, + bucketSize: bucketSize, } if err := tab.setFallbackNodes(bootnodes); err != nil { return nil, err @@ -131,6 +133,10 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger for i := 0; i < maxWorkerTask; i++ { tab.workerPoolTask <- struct{}{} } + // Set default bucket size if bucketSize is not set + if tab.bucketSize <= 0 { + tab.bucketSize = defaultBucketSize + } tab.seedRand() tab.loadSeedNodes() @@ -138,8 +144,8 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger return tab, nil } -func newMeteredTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc) (*Table, error) { - tab, err := newTable(t, db, bootnodes, log, filter) +func newMeteredTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc, bucketSize int) (*Table, error) { + tab, err := newTable(t, db, bootnodes, log, filter, bucketSize) if err != nil { return nil, err } @@ -557,7 +563,7 @@ func (tab *Table) addSeenNodeSync(n *node) { // Already in bucket, don't add. return } - if len(b.entries) >= bucketSize { + if len(b.entries) >= tab.bucketSize { // Bucket full, maybe add as replacement. tab.addReplacement(b, n) return @@ -636,7 +642,7 @@ func (tab *Table) addVerifiedNodeSync(n *node) { // Already in bucket, moved to front. return } - if len(b.entries) >= bucketSize { + if len(b.entries) >= tab.bucketSize { // Bucket full, maybe add as replacement. tab.addReplacement(b, n) return @@ -646,7 +652,7 @@ func (tab *Table) addVerifiedNodeSync(n *node) { return } // Add to front of bucket. - b.entries, _ = pushNode(b.entries, n, bucketSize) + b.entries, _ = pushNode(b.entries, n, tab.bucketSize) b.replacements = deleteNode(b.replacements, n) n.addedAt = time.Now() if tab.nodeAddedHook != nil { diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index d6e9653775..42779cd728 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -79,7 +79,7 @@ func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding tab.mutex.Lock() defer tab.mutex.Unlock() - wantSize := bucketSize + wantSize := defaultBucketSize if !lastInBucketIsResponding && !newNodeIsResponding { wantSize-- } @@ -102,7 +102,7 @@ func TestBucket_bumpNoDuplicates(t *testing.T) { Rand: rand.New(rand.NewSource(time.Now().Unix())), Values: func(args []reflect.Value, rand *rand.Rand) { // generate a random list of nodes. this will be the content of the bucket. - n := rand.Intn(bucketSize-1) + 1 + n := rand.Intn(defaultBucketSize-1) + 1 nodes := make([]*node, n) for i := range nodes { nodes[i] = nodeAtDistance(enode.ID{}, 200, intIP(200)) @@ -296,7 +296,7 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value { t := &closeTest{ Self: gen(enode.ID{}, rand).(enode.ID), Target: gen(enode.ID{}, rand).(enode.ID), - N: rand.Intn(bucketSize), + N: rand.Intn(defaultBucketSize), } for _, id := range gen([]enode.ID{}, rand).([]enode.ID) { r := new(enr.Record) diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index 5da68e72e1..618558a075 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -43,7 +43,7 @@ func init() { func newTestTable(t transport) (*Table, *enode.DB) { db, _ := enode.OpenDB("") - tab, _ := newTable(t, db, nil, log.Root(), nil) + tab, _ := newTable(t, db, nil, log.Root(), nil, defaultBucketSize) go tab.loop() return tab, db } @@ -100,10 +100,10 @@ func intIP(i int) net.IP { func fillBucket(tab *Table, n *node) (last *node) { ld := enode.LogDist(tab.self().ID(), n.ID()) b := tab.bucket(n.ID()) - for len(b.entries) < bucketSize { + for len(b.entries) < tab.bucketSize { b.entries = append(b.entries, nodeAtDistance(tab.self().ID(), ld, intIP(ld))) } - return b.entries[bucketSize-1] + return b.entries[tab.bucketSize-1] } // fillTable adds nodes the table to the end of their corresponding bucket diff --git a/p2p/discover/v4_lookup_test.go b/p2p/discover/v4_lookup_test.go index a00de9ca18..2653a10889 100644 --- a/p2p/discover/v4_lookup_test.go +++ b/p2p/discover/v4_lookup_test.go @@ -58,8 +58,8 @@ func TestUDPv4_Lookup(t *testing.T) { for _, e := range results { t.Logf(" ld=%d, %x", enode.LogDist(lookupTestnet.target.id(), e.ID()), e.ID().Bytes()) } - if len(results) != bucketSize { - t.Errorf("wrong number of results: got %d, want %d", len(results), bucketSize) + if len(results) != defaultBucketSize { + t.Errorf("wrong number of results: got %d, want %d", len(results), defaultBucketSize) } checkLookupResults(t, lookupTestnet, results) } diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 267a0ffd60..bf1e9080a4 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -142,7 +142,7 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { log: cfg.Log, } - tab, err := newMeteredTable(t, ln.Database(), cfg.Bootnodes, t.log, cfg.FilterFunction) + tab, err := newMeteredTable(t, ln.Database(), cfg.Bootnodes, t.log, cfg.FilterFunction, cfg.DHTBucketSize) if err != nil { return nil, err } @@ -303,7 +303,7 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke // Add a matcher for 'neighbours' replies to the pending reply queue. The matcher is // active until enough nodes have been received. - nodes := make([]*node, 0, bucketSize) + nodes := make([]*node, 0, defaultBucketSize) nreceived := 0 rm := t.pending(toid, toaddr.IP, v4wire.NeighborsPacket, func(r v4wire.Packet) (matched bool, requestDone bool) { reply := r.(*v4wire.Neighbors) @@ -316,7 +316,7 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke } nodes = append(nodes, n) } - return true, nreceived >= bucketSize + return true, nreceived >= defaultBucketSize }) t.send(toaddr, toid, &v4wire.Findnode{ Target: target, @@ -721,7 +721,7 @@ func (t *UDPv4) handleFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno // Determine closest nodes. target := enode.ID(crypto.Keccak256Hash(req.Target[:])) - closest := t.tab.findnodeByID(target, bucketSize, true).entries + closest := t.tab.findnodeByID(target, defaultBucketSize, true).entries // Send neighbors in chunks with at most maxNeighbors per packet // to stay below the packet size limit. diff --git a/p2p/discover/v4_udp_test.go b/p2p/discover/v4_udp_test.go index 6a51fc563f..ec3edf1057 100644 --- a/p2p/discover/v4_udp_test.go +++ b/p2p/discover/v4_udp_test.go @@ -258,7 +258,7 @@ func TestUDPv4_findnode(t *testing.T) { // take care not to overflow any bucket. nodes := &nodesByDistance{target: testTarget.ID()} live := make(map[enode.ID]bool) - numCandidates := 2 * bucketSize + numCandidates := 2 * defaultBucketSize for i := 0; i < numCandidates; i++ { key := newkey() ip := net.IP{10, 13, 0, byte(i)} @@ -278,12 +278,12 @@ func TestUDPv4_findnode(t *testing.T) { test.table.db.UpdateLastPongReceived(remoteID, test.remoteaddr.IP, time.Now()) // check that closest neighbors are returned. - expected := test.table.findnodeByID(testTarget.ID(), bucketSize, true) + expected := test.table.findnodeByID(testTarget.ID(), defaultBucketSize, true) test.packetIn(nil, &v4wire.Findnode{Target: testTarget, Expiration: futureExp}) waitNeighbors := func(want []*node) { test.waitPacketOut(func(p *v4wire.Neighbors, to *net.UDPAddr, hash []byte) { if len(p.Nodes) != len(want) { - t.Errorf("wrong number of results: got %d, want %d", len(p.Nodes), bucketSize) + t.Errorf("wrong number of results: got %d, want %d", len(p.Nodes), defaultBucketSize) } for i, n := range p.Nodes { if n.ID.ID() != want[i].ID() { diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 4d88fb6147..8bf642f46d 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -164,7 +164,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { closeCtx: closeCtx, cancelCloseCtx: cancelCloseCtx, } - tab, err := newMeteredTable(t, t.db, cfg.Bootnodes, cfg.Log, cfg.FilterFunction) + tab, err := newMeteredTable(t, t.db, cfg.Bootnodes, cfg.Log, cfg.FilterFunction, cfg.DHTBucketSize) if err != nil { return nil, err } diff --git a/p2p/server.go b/p2p/server.go index 4b437dc3b7..2577a32218 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -168,6 +168,9 @@ type Config struct { Logger log.Logger `toml:",omitempty"` clock mclock.Clock + + // Size of each bucket in DHT + DHTBucketSize int } // Server manages all peer connections. @@ -630,6 +633,7 @@ func (srv *Server) setupDiscovery() error { Unhandled: unhandled, Log: srv.log, FilterFunction: f, + DHTBucketSize: srv.Config.DHTBucketSize, } ntab, err := discover.ListenV4(conn, srv.localnode, cfg) if err != nil { From d6e1f1cc9c43a992c1e6f9be657d1a5e7702c039 Mon Sep 17 00:00:00 2001 From: sonhv0212 Date: Thu, 12 Dec 2024 00:18:12 +0700 Subject: [PATCH 2/2] p2psim: Add network simulation for DHT --- cmd/p2psim/main.go | 425 +++++++++++++++++++++++++++++ p2p/discover/table.go | 2 +- p2p/discover/v4_udp.go | 14 + p2p/server.go | 9 + p2p/simulations/adapters/exec.go | 15 + p2p/simulations/adapters/inproc.go | 208 +++++++++++++- p2p/simulations/adapters/types.go | 101 +++++-- p2p/simulations/dht/dht.go | 122 +++++++++ p2p/simulations/dht/dht.sh | 43 +++ p2p/simulations/http.go | 73 +++++ p2p/simulations/http_test.go | 2 + p2p/simulations/network.go | 14 + p2p/simulations/network_test.go | 2 + 13 files changed, 1002 insertions(+), 28 deletions(-) create mode 100644 p2p/simulations/dht/dht.go create mode 100755 p2p/simulations/dht/dht.sh diff --git a/cmd/p2psim/main.go b/cmd/p2psim/main.go index 451b0d942d..c1383ebf70 100644 --- a/cmd/p2psim/main.go +++ b/cmd/p2psim/main.go @@ -40,10 +40,15 @@ import ( "encoding/json" "fmt" "io" + "math/rand" "os" + "os/signal" "strings" + "syscall" "text/tabwriter" + "time" + "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" @@ -102,6 +107,32 @@ func main() { Usage: "load a network snapshot from stdin", Action: loadSnapshot, }, + { + Name: "network", + Usage: "manage the simulation network", + Subcommands: []*cli.Command{ + { + Name: "start", + Usage: "start all nodes in the network", + Action: startNetwork, + }, + { + Name: "peer-stats", + Usage: "show peer stats", + Action: getNetworkPeerStats, + }, + { + Name: "dht", + Usage: "Get all nodes in the DHT of all nodes", + Action: getAllDHT, + }, + { + Name: "peers", + Usage: "Get all peers of all nodes", + Action: getAllNodePeersInfo, + }, + }, + }, { Name: "node", Usage: "manage simulation nodes", @@ -132,6 +163,98 @@ func main() { Value: "", Usage: "node private key (hex encoded)", }, + &cli.BoolFlag{ + Name: "sim.dialer", + Usage: "Use the simulation dialer", + }, + &cli.BoolFlag{ + Name: "fake.iplistener", + Usage: "Use the fake listener to random remote ip when accepting connections", + }, + &cli.BoolFlag{ + Name: "start", + Usage: "start the node after creating successfully", + }, + &cli.BoolFlag{ + Name: "autofill.bootnodes", + Usage: "autofill bootnodes with existing bootnodes from manager", + }, + &cli.StringFlag{ + Name: "node.type", + Value: "default", + Usage: "Set node type (default, outbound, dirty, bootnode)", + }, + &cli.BoolFlag{ + Name: "enable.enrfilter", + Usage: "Enable ENR filter when adding nodes to the DHT", + }, + utils.NoDiscoverFlag, + utils.DHTBucketSizeFlag, + utils.BootnodesFlag, + utils.MaxPeersFlag, + }, + }, + { + Name: "create-multi", + Usage: "create a node", + Action: createMultiNode, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "name", + Value: "", + Usage: "node name", + }, + &cli.IntFlag{ + Name: "count", + Value: 1, + Usage: "number of nodes to create", + }, + &cli.StringFlag{ + Name: "services", + Value: "", + Usage: "node services (comma separated)", + }, + &cli.BoolFlag{ + Name: "sim.dialer", + Usage: "Use the simulation dialer", + }, + &cli.BoolFlag{ + Name: "fake.iplistener", + Usage: "Use the fake listener to random remote ip when accepting connections", + }, + &cli.BoolFlag{ + Name: "start", + Usage: "start the node after creating successfully", + }, + &cli.BoolFlag{ + Name: "autofill.bootnodes", + Usage: "autofill bootnodes with existing bootnodes from manager", + }, + &cli.StringFlag{ + Name: "node.type", + Value: "default", + Usage: "Set node type (default, outbound, dirty, bootnode)", + }, + &cli.BoolFlag{ + Name: "enable.enrfilter", + Usage: "Enable ENR filter when adding nodes to the DHT", + }, + &cli.DurationFlag{ + Name: "interval", + Usage: "create interval", + }, + &cli.IntFlag{ + Name: "dirty.rate", + Usage: "Rate of dirty nodes", + }, + &cli.IntFlag{ + Name: "only.outbound.rate", + Usage: "Rate of nodes that only allow outbound connections", + }, + utils.NoDiscoverFlag, + utils.DHTBucketSizeFlag, + utils.BootnodesFlag, + utils.MaxPeersFlag, }, }, { @@ -176,6 +299,29 @@ func main() { }, }, }, + { + Name: "peer-stats", + Usage: "show peer stats", + ArgsUsage: "", + Action: getNodePeerStats, + }, + }, + }, + { + Name: "log-stats", + Usage: "log peer stats to a CSV file", + Action: startLogStats, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "file", + Usage: "output file", + Value: "stats.csv", + }, + &cli.DurationFlag{ + Name: "interval", + Usage: "log interval", + Value: 15 * time.Second, + }, }, }, } @@ -287,6 +433,31 @@ func createNode(ctx *cli.Context) error { config.ID = enode.PubkeyToIDV4(&privKey.PublicKey) config.PrivateKey = privKey } + if ctx.Bool(utils.NoDiscoverFlag.Name) { + config.NoDiscovery = true + } + if ctx.Bool("sim.dialer") { + config.UseTCPDialer = false + } else { + config.UseTCPDialer = true + } + if ctx.Bool("fake.iplistener") { + config.UseFakeIPListener = true + } + config.BootstrapNodeURLs = ctx.String(utils.BootnodesFlag.Name) + if ctx.Bool("autofill.bootnodes") { + bootnodeURLs, err := getBootnodes() + if err != nil { + return err + } + if bootnodeURLs != "" { + config.BootstrapNodeURLs += "," + bootnodeURLs + } + } + config.MaxPeers = ctx.Int(utils.MaxPeersFlag.Name) + config.DHTBucketSize = ctx.Int(utils.DHTBucketSizeFlag.Name) + config.DisableTCPListener = ctx.Bool("only.outbound") + config.EnableENRFilter = ctx.Bool("enable.enrfilter") if services := ctx.String("services"); services != "" { config.Lifecycles = strings.Split(services, ",") } @@ -295,6 +466,112 @@ func createNode(ctx *cli.Context) error { return err } fmt.Fprintln(ctx.App.Writer, "Created", node.Name) + + // Start node if needed + if ctx.Bool("start") { + if err := client.StartNode(node.Name); err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Started", node.Name) + } + + return nil +} + +func getBootnodes() (string, error) { + nodes, err := client.GetNodes() + if err != nil { + return "", err + } + + bootnodes := make([]string, 0) + for _, node := range nodes { + if strings.HasPrefix(node.Name, "bootnode") { + bootnodes = append(bootnodes, node.Enode) + } + } + + return strings.Join(bootnodes, ","), nil +} + +func createMultiNode(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + + t := time.Now() + + createInterval := ctx.Duration("interval") + bootNodeURLs := ctx.String(utils.BootnodesFlag.Name) + if ctx.Bool("autofill.bootnodes") { + existedBootnodeURLs, err := getBootnodes() + if err != nil { + return err + } + if existedBootnodeURLs != "" { + bootNodeURLs += "," + existedBootnodeURLs + } + } + + // Create nodes + count := ctx.Int("count") + outboundRate := ctx.Int("only.outbound.rate") + dirtyRate := ctx.Int("dirty.rate") + per := make([]int, 0) + for i := 0; i < count; i++ { + if i < outboundRate*count/100 { + per = append(per, 1) + } else if i < (outboundRate+dirtyRate)*count/100 { + per = append(per, 2) + } else { + per = append(per, 0) + } + } + rand.Shuffle(len(per), func(i, j int) { per[i], per[j] = per[j], per[i] }) + + isBootnode := ctx.String("node.type") == "bootnode" + + for i := 0; i < count; i++ { + var nodeName string + if isBootnode { + nodeName = fmt.Sprintf("bootnode-%d-%d", t.Unix(), i) + ctx.Set(utils.BootnodesFlag.Name, "") + } else { + nodeType := per[i%len(per)] + switch nodeType { + case 1: + ctx.Set("only.outbound", "true") + ctx.Set("node.type", "outbound") + ctx.Set("services", "valid") + nodeName = fmt.Sprintf("outbound-%d-%d", t.Unix(), i) + case 2: + ctx.Set("only.outbound", "false") + ctx.Set("node.type", "dirty") + ctx.Set("services", "invalid") + nodeName = fmt.Sprintf("dirty-%d-%d", t.Unix(), i) + default: + ctx.Set("only.outbound", "false") + ctx.Set("node.type", "default") + ctx.Set("services", "valid") + nodeName = fmt.Sprintf("node-%d-%d", t.Unix(), i) + } + } + ctx.Set("name", nodeName) + for { + if err := createNode(ctx); err != nil { + fmt.Fprintln(ctx.App.Writer, "Failed to create node", nodeName, err) + // Try to create the node again + client.DeleteNode(nodeName) + time.Sleep(500 * time.Millisecond) + } else { + break + } + } + if createInterval > 0 { + time.Sleep(createInterval) + } + } + return nil } @@ -429,3 +706,151 @@ func rpcSubscribe(client *rpc.Client, out io.Writer, method string, args ...stri } } } + +func startNetwork(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + if err := client.StartNetwork(); err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Started network") + return nil +} + +func getNodePeerStats(ctx *cli.Context) error { + if ctx.Args().Len() != 1 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + nodeName := ctx.Args().Get(0) + stats, err := client.GetNodePeerStats(nodeName) + if err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Peer stats of", ctx.String("node")) + fmt.Fprintln(ctx.App.Writer, "Peer count: ", stats.PeerCount) + fmt.Fprintln(ctx.App.Writer, "Tried: ", stats.Tried) + fmt.Fprintln(ctx.App.Writer, "Failed: ", stats.Failed) + fmt.Fprintln(ctx.App.Writer, "Nodes count: ", stats.DifferentNodesDiscovered) + fmt.Fprintln(ctx.App.Writer, "DHT: ", stats.DHTBuckets) + return nil +} + +func getNetworkPeerStats(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + stats, err := client.GetAllNodePeerStats() + if err != nil { + return err + } + for nodeID, stats := range stats { + fmt.Fprintln(ctx.App.Writer, "Peer stats of", nodeID) + fmt.Fprintln(ctx.App.Writer, "Peer count: ", stats.PeerCount) + fmt.Fprintln(ctx.App.Writer, "Tried: ", stats.Tried) + fmt.Fprintln(ctx.App.Writer, "Failed: ", stats.Failed) + fmt.Fprintln(ctx.App.Writer, "Nodes count: ", stats.DifferentNodesDiscovered) + fmt.Fprintln(ctx.App.Writer, "DHT: ", stats.DHTBuckets) + } + return nil +} + +func getAllDHT(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + + nodes, err := client.GetNodes() + if err != nil { + return err + } + nodeID2Name := make(map[string]string) + for _, node := range nodes { + nodeID2Name[node.ID] = node.Name + } + + dht, err := client.GetAllNodeDHT() + if err != nil { + return err + } + for nodeName, buckets := range dht { + fmt.Fprintf(ctx.App.Writer, "%s: ", nodeName) + for _, bucket := range buckets { + fmt.Fprintf(ctx.App.Writer, "[") + for _, node := range bucket { + fmt.Fprintf(ctx.App.Writer, "%s ", nodeID2Name[node.ID().String()]) + } + fmt.Fprintf(ctx.App.Writer, "],") + } + fmt.Fprintf(ctx.App.Writer, "\n") + } + return nil +} + +func getAllNodePeersInfo(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + + nodes, err := client.GetNodes() + if err != nil { + return err + } + nodeID2Name := make(map[string]string) + for _, node := range nodes { + nodeID2Name[node.ID] = node.Name + } + + peers, err := client.GetAllNodePeersInfo() + if err != nil { + return err + } + for nodeName, peerInfos := range peers { + fmt.Fprintf(ctx.App.Writer, "%s: ", nodeName) + for _, peerInfo := range peerInfos { + fmt.Fprintf(ctx.App.Writer, "(%s %v), ", nodeID2Name[peerInfo.ID], peerInfo.Network.Inbound) + } + fmt.Fprintf(ctx.App.Writer, "\n") + } + return nil +} + +func startLogStats(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + csvFile := ctx.String("file") + f, err := os.OpenFile(csvFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return err + } + defer f.Close() + + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + timer := time.NewTicker(ctx.Duration("interval")) + + f.WriteString("node,timestamp,type,value\n") + +loop: + for { + select { + case <-sig: + return nil + case <-timer.C: + stats, err := client.GetAllNodePeerStats() + if err != nil { + fmt.Fprintln(ctx.App.Writer, err) + goto loop + } + for nodeID, stats := range stats { + t := time.Now() + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "PeerCount", stats.PeerCount)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "Tried", stats.Tried)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "Failed", stats.Failed)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "DifferentNodesDiscovered", stats.DifferentNodesDiscovered)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "DHTBuckets", stats.DHTBuckets)) + } + } + } +} diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 88fa36ef69..aa3c488426 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -587,7 +587,7 @@ func (tab *Table) filterNode(n *node) bool { } if node, err := tab.net.RequestENR(unwrapNode(n)); err != nil { tab.log.Debug("ENR request failed", "id", n.ID(), "addr", n.addr(), "err", err) - return false + return true } else if !tab.enrFilter(node.Record()) { tab.log.Trace("ENR record filter out", "id", n.ID(), "addr", n.addr()) return true diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index bf1e9080a4..16e7d4d1aa 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -155,6 +155,20 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { return t, nil } +func (t *UDPv4) NodesInDHT() [][]enode.Node { + if t == nil || t.tab == nil { + return nil + } + nodes := make([][]enode.Node, len(t.tab.buckets)) + for i, bucket := range t.tab.buckets { + nodes[i] = make([]enode.Node, len(bucket.entries)) + for j, entry := range bucket.entries { + nodes[i][j] = entry.Node + } + } + return nodes +} + // Self returns the local node. func (t *UDPv4) Self() *enode.Node { return t.localNode.Node() diff --git a/p2p/server.go b/p2p/server.go index 2577a32218..5bf67483b1 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -308,6 +308,15 @@ func (c *conn) set(f connFlag, val bool) { } } +// SetListenFunc sets the function used to accept inbound connections (testing only) +func (srv *Server) SetListenFunc(f func(network, addr string) (net.Listener, error)) { + srv.listenFunc = f +} + +func (srv *Server) UDPv4() *discover.UDPv4 { + return srv.ntab +} + // LocalNode returns the local node record. func (srv *Server) LocalNode() *enode.LocalNode { return srv.localnode diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index 35ccdfb068..e4d80cb2ae 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -364,6 +364,21 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) { return snapshots, n.client.Call(&snapshots, "simulation_snapshot") } +// Empty PeerStats +func (n *ExecNode) PeerStats() *PeerStats { + return &PeerStats{} +} + +// Empty DHT +func (n *ExecNode) NodesInDHT() [][]enode.Node { + return nil +} + +// Empty PeersInfo +func (n *ExecNode) PeersInfo() []*p2p.PeerInfo { + return nil +} + // execNodeConfig is used to serialize the node configuration so it can be // passed to the child process as a JSON encoded environment variable type execNodeConfig struct { diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index 1cb26a8ea0..bf4eb733a2 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -20,16 +20,21 @@ import ( "context" "errors" "fmt" - "math" + "math/rand" "net" + "strings" "sync" + "time" + "github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/simulations/pipes" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/gorilla/websocket" ) @@ -91,14 +96,32 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { return nil, err } + p2pCfg := p2p.Config{ + PrivateKey: config.PrivateKey, + MaxPeers: config.MaxPeers, + NoDiscovery: config.NoDiscovery, + EnableMsgEvents: config.EnableMsgEvents, + DHTBucketSize: config.DHTBucketSize, + } + if !config.DisableTCPListener { + p2pCfg.ListenAddr = fmt.Sprintf(":%d", config.Port) + } + if len(config.BootstrapNodeURLs) > 0 { + for _, url := range strings.Split(config.BootstrapNodeURLs, ",") { + if len(url) == 0 { + continue + } + n, err := enode.Parse(enode.ValidSchemes, url) + if err != nil { + log.Warn("invalid bootstrap node URL", "url", url, "err", err) + continue + } + p2pCfg.BootstrapNodes = append(p2pCfg.BootstrapNodes, n) + } + } + n, err := node.New(&node.Config{ - P2P: p2p.Config{ - PrivateKey: config.PrivateKey, - MaxPeers: math.MaxInt32, - NoDiscovery: true, - Dialer: s, - EnableMsgEvents: config.EnableMsgEvents, - }, + P2P: p2pCfg, ExternalSigner: config.ExternalSigner, Logger: log.New("node.id", id.String()), }) @@ -106,6 +129,10 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { return nil, err } + if config.UseFakeIPListener { + n.Server().SetListenFunc(listenFakeAddrFunc) + } + simNode := &SimNode{ ID: id, config: config, @@ -113,6 +140,34 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { adapter: s, running: make(map[string]node.Lifecycle), } + if !config.UseTCPDialer { + n.Server().Dialer = s + } else { + simNode.dialer = &wrapTCPDialerStats{ + d: &net.Dialer{Timeout: 15 * time.Second}, + resultCh: make(chan resultDial, 10000), + } + n.Server().Dialer = simNode.dialer + } + + if config.EnableENRFilter { + n.Server().SetFilter(func(id forkid.ID) error { + var eth struct { + ForkID forkid.ID + Rest []rlp.RawValue `rlp:"tail"` + } + if err := n.Server().Self().Record().Load(enr.WithEntry("eth", ð)); err != nil { + log.Warn("failed to load eth entry", "err", err) + return err + } + + if id == eth.ForkID { + return nil + } + return forkid.ErrLocalIncompatibleOrStale + }) + } + s.nodes[id] = simNode return simNode, nil } @@ -162,6 +217,8 @@ func (s *SimAdapter) GetNode(id enode.ID) (*SimNode, bool) { // net.Pipe (see SimAdapter.Dial), running devp2p protocols directly over that // pipe type SimNode struct { + ctx context.Context + cancel context.CancelFunc lock sync.RWMutex ID enode.ID config *NodeConfig @@ -170,6 +227,11 @@ type SimNode struct { running map[string]node.Lifecycle client *rpc.Client registerOnce sync.Once + dialer *wrapTCPDialerStats + + // Track different nodes discovered by the node + discoveredNodes sync.Map + differentNodeCount int } // Close closes the underlaying node.Node to release @@ -240,6 +302,15 @@ func (sn *SimNode) Snapshots() (map[string][]byte, error) { // Start registers the services and starts the underlying devp2p node func (sn *SimNode) Start(snapshots map[string][]byte) error { + sn.lock.Lock() + if sn.cancel != nil { + sn.lock.Unlock() + return errors.New("node already started") + } + + sn.ctx, sn.cancel = context.WithCancel(context.Background()) + sn.lock.Unlock() + // ensure we only register the services once in the case of the node // being stopped and then started again var regErr error @@ -282,6 +353,8 @@ func (sn *SimNode) Start(snapshots map[string][]byte) error { sn.client = client sn.lock.Unlock() + go sn.trackDiscoveredNode() + return nil } @@ -292,6 +365,10 @@ func (sn *SimNode) Stop() error { sn.client.Close() sn.client = nil } + if sn.cancel != nil { + sn.cancel() + sn.cancel = nil + } sn.lock.Unlock() return sn.node.Close() } @@ -351,3 +428,118 @@ func (sn *SimNode) NodeInfo() *p2p.NodeInfo { } return server.NodeInfo() } + +// PeerStats returns statistics about the node's peers +func (sn *SimNode) PeerStats() *PeerStats { + if sn.dialer == nil || sn.node.Server() == nil || sn.node.Server().UDPv4() == nil { + return &PeerStats{} + } + + nodesCount := 0 + sn.discoveredNodes.Range(func(_, _ interface{}) bool { + nodesCount++ + return true + }) + buckets := sn.node.Server().UDPv4().NodesInDHT() + bucketSizes := make([]int, len(buckets)) + for i, bucket := range buckets { + bucketSizes[i] = len(bucket) + } + return &PeerStats{ + PeerCount: sn.node.Server().PeerCount(), + Failed: sn.dialer.failed, + Tried: sn.dialer.tried, + DifferentNodesDiscovered: nodesCount, + DHTBuckets: bucketSizes, + } +} + +// NodesInDHT returns the nodes in the DHT buckets +func (sn *SimNode) NodesInDHT() [][]enode.Node { + if sn.node.Server() == nil || sn.node.Server().UDPv4() == nil { + return nil + } + return sn.node.Server().UDPv4().NodesInDHT() +} + +// PeersInfo returns information about the node's peers +func (sn *SimNode) PeersInfo() []*p2p.PeerInfo { + if sn.node.Server() == nil { + return nil + } + return sn.node.Server().PeersInfo() +} + +func (sn *SimNode) trackDiscoveredNode() { + if sn.dialer == nil { + return + } + + for { + select { + case <-sn.ctx.Done(): + return + case r := <-sn.dialer.resultCh: + if _, ok := sn.discoveredNodes.LoadOrStore(r.node, struct{}{}); !ok { + sn.differentNodeCount++ + } + if r.err != nil { + log.Info("dial failed", "node", r.node, "err", r.err) + sn.dialer.failed++ + } + log.Info("dial tried", "from", sn.ID, "to", r.node) + sn.dialer.tried++ + } + } +} + +func listenFakeAddrFunc(network, laddr string) (net.Listener, error) { + l, err := net.Listen(network, laddr) + if err != nil { + return nil, err + } + fakeAddr := &net.TCPAddr{IP: net.IP{byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255))}, Port: rand.Intn(65535)} + return &fakeAddrListener{l, fakeAddr}, nil +} + +// fakeAddrListener is a listener that creates connections with a mocked remote address. +type fakeAddrListener struct { + net.Listener + remoteAddr net.Addr +} + +type fakeAddrConn struct { + net.Conn + remoteAddr net.Addr +} + +func (l *fakeAddrListener) Accept() (net.Conn, error) { + c, err := l.Listener.Accept() + if err != nil { + return nil, err + } + return &fakeAddrConn{c, l.remoteAddr}, nil +} + +func (c *fakeAddrConn) RemoteAddr() net.Addr { + return c.remoteAddr +} + +type wrapTCPDialerStats struct { + d *net.Dialer + failed int + tried int + resultCh chan resultDial +} + +type resultDial struct { + err error + node enode.ID +} + +func (d wrapTCPDialerStats) Dial(ctx context.Context, dest *enode.Node) (net.Conn, error) { + nodeAddr := &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()} + conn, err := d.d.DialContext(ctx, "tcp", nodeAddr.String()) + d.resultCh <- resultDial{err, dest.ID()} + return conn, err +} diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go index aeb8ef7772..c9062f64ba 100644 --- a/p2p/simulations/adapters/types.go +++ b/p2p/simulations/adapters/types.go @@ -42,7 +42,6 @@ import ( // * SimNode - An in-memory node // * ExecNode - A child process node // * DockerNode - A Docker container node -// type Node interface { // Addr returns the node's address (e.g. an Enode URL) Addr() []byte @@ -65,6 +64,15 @@ type Node interface { // Snapshots creates snapshots of the running services Snapshots() (map[string][]byte, error) + + // PeerStats returns the node's peer statistics + PeerStats() *PeerStats + + // NodesInDHT returns all nodes in the DHT + NodesInDHT() [][]enode.Node + + // PeersInfo returns information about the node's peers + PeersInfo() []*p2p.PeerInfo } // NodeAdapter is used to create Nodes in a simulation network @@ -119,7 +127,8 @@ type NodeConfig struct { // function to sanction or prevent suggesting a peer Reachable func(id enode.ID) bool - Port uint16 + Port uint16 + DisableTCPListener bool // LogFile is the log file name of the p2p node at runtime. // @@ -131,34 +140,71 @@ type NodeConfig struct { // // The default verbosity is INFO. LogVerbosity log.Lvl + + // NoDiscovery disables the peer discovery mechanism (manual peer addition) + NoDiscovery bool + + // Use default TCP dialer + UseTCPDialer bool + + // UseFakeIPListener is used to fake the remote IP address when accepting incoming connections + UseFakeIPListener bool + + // DHTBucketSize is the bucket size of DHT + DHTBucketSize int + + // BootstrapNodes is the list of bootstrap nodes + BootstrapNodeURLs string + + // MaxPeers is the maximum number of peers + MaxPeers int + + // EnableENRFilter enables the ENR filter when adding node into the DHT + EnableENRFilter bool } // nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding // all fields as strings type nodeConfigJSON struct { - ID string `json:"id"` - PrivateKey string `json:"private_key"` - Name string `json:"name"` - Lifecycles []string `json:"lifecycles"` - Properties []string `json:"properties"` - EnableMsgEvents bool `json:"enable_msg_events"` - Port uint16 `json:"port"` - LogFile string `json:"logfile"` - LogVerbosity int `json:"log_verbosity"` + ID string `json:"id"` + PrivateKey string `json:"private_key"` + Name string `json:"name"` + Lifecycles []string `json:"lifecycles"` + Properties []string `json:"properties"` + EnableMsgEvents bool `json:"enable_msg_events"` + Port uint16 `json:"port"` + DisableTCPListener bool `json:"disable_tcp_listener"` + LogFile string `json:"logfile"` + LogVerbosity int `json:"log_verbosity"` + NoDiscovery bool `json:"no_discovery"` + UseTCPDialer bool `json:"use_tcp_dialer"` + UseFakeIPListener bool `json:"use_fake_ip_listener"` + DHTBucketSize int `json:"dht_bucket_size"` + BootstrapNodeURLs string `json:"bootstrap_node_urls"` + MaxPeers int `json:"max_peers"` + EnableENRFilter bool `json:"enable_enr_filter"` } // MarshalJSON implements the json.Marshaler interface by encoding the config // fields as strings func (n *NodeConfig) MarshalJSON() ([]byte, error) { confJSON := nodeConfigJSON{ - ID: n.ID.String(), - Name: n.Name, - Lifecycles: n.Lifecycles, - Properties: n.Properties, - Port: n.Port, - EnableMsgEvents: n.EnableMsgEvents, - LogFile: n.LogFile, - LogVerbosity: int(n.LogVerbosity), + ID: n.ID.String(), + Name: n.Name, + Lifecycles: n.Lifecycles, + Properties: n.Properties, + Port: n.Port, + DisableTCPListener: n.DisableTCPListener, + EnableMsgEvents: n.EnableMsgEvents, + LogFile: n.LogFile, + LogVerbosity: int(n.LogVerbosity), + NoDiscovery: n.NoDiscovery, + UseTCPDialer: n.UseTCPDialer, + UseFakeIPListener: n.UseFakeIPListener, + DHTBucketSize: n.DHTBucketSize, + BootstrapNodeURLs: n.BootstrapNodeURLs, + MaxPeers: n.MaxPeers, + EnableENRFilter: n.EnableENRFilter, } if n.PrivateKey != nil { confJSON.PrivateKey = hex.EncodeToString(crypto.FromECDSA(n.PrivateKey)) @@ -196,9 +242,17 @@ func (n *NodeConfig) UnmarshalJSON(data []byte) error { n.Lifecycles = confJSON.Lifecycles n.Properties = confJSON.Properties n.Port = confJSON.Port + n.DisableTCPListener = confJSON.DisableTCPListener n.EnableMsgEvents = confJSON.EnableMsgEvents n.LogFile = confJSON.LogFile n.LogVerbosity = log.Lvl(confJSON.LogVerbosity) + n.NoDiscovery = confJSON.NoDiscovery + n.UseTCPDialer = confJSON.UseTCPDialer + n.UseFakeIPListener = confJSON.UseFakeIPListener + n.DHTBucketSize = confJSON.DHTBucketSize + n.BootstrapNodeURLs = confJSON.BootstrapNodeURLs + n.MaxPeers = confJSON.MaxPeers + n.EnableENRFilter = confJSON.EnableENRFilter return nil } @@ -324,3 +378,12 @@ func (n *NodeConfig) initEnode(ip net.IP, tcpport int, udpport int) error { func (n *NodeConfig) initDummyEnode() error { return n.initEnode(net.IPv4(127, 0, 0, 1), int(n.Port), 0) } + +// PeerStats is a struct that holds the statistics of a node's peers +type PeerStats struct { + PeerCount int `json:"peer_count"` + Tried int `json:"tried"` + DifferentNodesDiscovered int `json:"different_nodes_discovered"` + Failed int `json:"failed"` + DHTBuckets []int `json:"dht_buckets"` +} diff --git a/p2p/simulations/dht/dht.go b/p2p/simulations/dht/dht.go new file mode 100644 index 0000000000..5ba4a474e4 --- /dev/null +++ b/p2p/simulations/dht/dht.go @@ -0,0 +1,122 @@ +package main + +import ( + "context" + "flag" + "fmt" + "net/http" + "os" + + "github.com/ethereum/go-ethereum/core/forkid" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rlp" +) + +var ( + verbosity = flag.Int("verbosity", 3, "logging verbosity") + port = flag.Int("port", 8888, "port to listen on") +) + +var ( + validETHEntry = mockETHEntry{ForkID: forkid.ID{Hash: [4]byte{1, 2, 3, 4}}} + invalidETHEntry = mockETHEntry{ForkID: forkid.ID{Hash: [4]byte{5, 6, 7, 8}}} +) + +func main() { + flag.Parse() + + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(false)))) + + services := map[string]adapters.LifecycleConstructor{ + "valid": func(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) { + s := newMockService("valid") + s.SetAttributes([]enr.Entry{validETHEntry}) + stack.RegisterProtocols(s.Protocols()) + return s, nil + }, + "invalid": func(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) { + s := newMockService("invalid") + s.SetAttributes([]enr.Entry{invalidETHEntry}) + stack.RegisterProtocols(s.Protocols()) + return s, nil + }, + } + adapters.RegisterLifecycles(services) + + adapter := adapters.NewSimAdapter(services) + + log.Info("starting simulation server", "port", *port) + network := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ + DefaultService: "valid", + }) + if err := http.ListenAndServe(fmt.Sprintf(":%d", *port), simulations.NewServer(network)); err != nil { + log.Crit("error starting simulation server", "err", err) + } +} + +type mockService struct { + name string + attrs []enr.Entry + ctx context.Context + cancel context.CancelFunc +} + +func newMockService(name string) *mockService { + s := &mockService{ + name: name, + } + s.ctx, s.cancel = context.WithCancel(context.Background()) + return s +} + +func (s *mockService) SetAttributes(attrs []enr.Entry) { + s.attrs = attrs +} + +func (s *mockService) Protocols() []p2p.Protocol { + return []p2p.Protocol{{ + Name: s.name, + Version: 1, + Length: 1, + Run: s.Run, + NodeInfo: s.Info, + Attributes: s.attrs, + }} +} + +func (s *mockService) Start() error { + return nil +} + +func (s *mockService) Stop() error { + s.cancel() + return nil +} + +func (s *mockService) Info() interface{} { + return nil +} + +func (s *mockService) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error { + if !peer.RunningCap(s.name, []uint{1}) { + log.Error("peer does not support protocol", "peer", peer.ID()) + return fmt.Errorf("peer does not support protocol %s", s.name) + } + + <-s.ctx.Done() + return nil +} + +type mockETHEntry struct { + ForkID forkid.ID + Rest []rlp.RawValue `rlp:"tail"` +} + +func (e mockETHEntry) ENRKey() string { + return "eth" +} diff --git a/p2p/simulations/dht/dht.sh b/p2p/simulations/dht/dht.sh new file mode 100755 index 0000000000..f20e628a64 --- /dev/null +++ b/p2p/simulations/dht/dht.sh @@ -0,0 +1,43 @@ +#!/bin/bash + +main_cmd="go run ." +p2psim_cmd="go run ../../../cmd/p2psim" + +distribution=(150 100 100) + +benchmark() { + local bucket_size=$1 + local sleep_time=$2 + local other=$3 + local test_name=$4 + + pids=() + echo "Start server $test_name..." + $main_cmd > tmp_$test_name.log 2> tmp_$test_name.err & + echo "Start stats $test_name..." + $p2psim_cmd log-stats --file ./stats_$test_name.csv & + + echo "Start bootnodes $test_name..." + $p2psim_cmd node create-multi --count 2 --fake.iplistener --start --dht.bucketsize $bucket_size --node.type bootnode $other + + for num_node in ${distribution[@]}; do + echo "Start $num_node nodes..." + $p2psim_cmd node create-multi --count $num_node --fake.iplistener --start --dht.bucketsize 16 --autofill.bootnodes --interval 1s --dirty.rate 60 --only.outbound.rate 20 $other + echo "Sleep $sleep_time..." + sleep $sleep_time + done + + $p2psim_cmd network dht > dht_$test_name.log + $p2psim_cmd network peers > peers_$test_name.log + + echo "Kill server and stats $test_name..." + kill -9 $(lsof -t -i:8888) + ps aux | grep "p2psim" | grep -v "grep" | awk '{print $2}' | xargs kill -9 + + sleep 10 +} + +benchmark 16 1200 "" "16_disable_filter" +benchmark 16 1200 "--enable.enrfilter" "16_enable_filter" +benchmark 256 1200 "" "32_disable_filter" +benchmark 256 1200 "--enable.enrfilter" "32_enable_filter" diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go index 27ed5b75d2..605d0bb9fe 100644 --- a/p2p/simulations/http.go +++ b/p2p/simulations/http.go @@ -179,6 +179,11 @@ func (c *Client) CreateNode(config *adapters.NodeConfig) (*p2p.NodeInfo, error) return node, c.Post("/nodes", config, node) } +// DeleteNode deletes a node from the network +func (c *Client) DeleteNode(nodeID string) error { + return c.Delete(fmt.Sprintf("/nodes/%s", nodeID)) +} + // GetNode returns details of a node func (c *Client) GetNode(nodeID string) (*p2p.NodeInfo, error) { node := &p2p.NodeInfo{} @@ -211,6 +216,28 @@ func (c *Client) RPCClient(ctx context.Context, nodeID string) (*rpc.Client, err return rpc.DialWebsocket(ctx, fmt.Sprintf("%s/nodes/%s/rpc", baseURL, nodeID), "") } +// GetNodePeerStats returns the peer stats of a node +func (c *Client) GetNodePeerStats(nodeID string) (*adapters.PeerStats, error) { + stats := &adapters.PeerStats{} + return stats, c.Get(fmt.Sprintf("/peerstats/%s", nodeID), stats) +} + +// GetAllNodePeerStats returns the peer stats of all nodes +func (c *Client) GetAllNodePeerStats() (map[string]*adapters.PeerStats, error) { + stats := make(map[string]*adapters.PeerStats) + return stats, c.Get("/peerstats", &stats) +} + +func (c *Client) GetAllNodeDHT() (map[string][][]enode.Node, error) { + nodesDHT := make(map[string][][]enode.Node) + return nodesDHT, c.Get("/dht", &nodesDHT) +} + +func (c *Client) GetAllNodePeersInfo() (map[string][]*p2p.PeerInfo, error) { + peersInfo := make(map[string][]*p2p.PeerInfo) + return peersInfo, c.Get("/peers", &peersInfo) +} + // Get performs a HTTP GET request decoding the resulting JSON response // into "out" func (c *Client) Get(path string, out interface{}) error { @@ -296,6 +323,11 @@ func NewServer(network *Network) *Server { s.POST("/nodes/:nodeid/conn/:peerid", s.ConnectNode) s.DELETE("/nodes/:nodeid/conn/:peerid", s.DisconnectNode) s.GET("/nodes/:nodeid/rpc", s.NodeRPC) + s.GET("/peerstats/:nodeid", s.GetNodePeerStats) + s.GET("/peerstats", s.GetAllNodePeerStats) + s.DELETE("/nodes/:nodeid", s.DeleteNode) + s.GET("/dht", s.GetAllNodeDHT) + s.GET("/peers", s.GetAllNodePeersInfo) return s } @@ -592,6 +624,13 @@ func (s *Server) GetNode(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, node.NodeInfo()) } +// DeleteNode deletes a node from the network +func (s *Server) DeleteNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + s.network.DeleteNode(node.NodeInfo().Name) + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + // StartNode starts a node func (s *Server) StartNode(w http.ResponseWriter, req *http.Request) { node := req.Context().Value("node").(*Node) @@ -642,6 +681,40 @@ func (s *Server) DisconnectNode(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, node.NodeInfo()) } +// GetNodePeerStats returns the peer stats of a node +func (s *Server) GetNodePeerStats(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + s.JSON(w, http.StatusOK, node.PeerStats()) +} + +// GetAllNodePeerStats returns the peer stats of all nodes +func (s *Server) GetAllNodePeerStats(w http.ResponseWriter, req *http.Request) { + stats := make(map[string]*adapters.PeerStats) + for _, node := range s.network.GetNodes() { + stats[node.Config.Name] = node.PeerStats() + } + + s.JSON(w, http.StatusOK, stats) +} + +func (s *Server) GetAllNodeDHT(w http.ResponseWriter, req *http.Request) { + nodesDHT := make(map[string][][]enode.Node) + for _, node := range s.network.GetNodes() { + nodesDHT[node.Config.Name] = node.NodesInDHT() + } + + s.JSON(w, http.StatusOK, nodesDHT) +} + +func (s *Server) GetAllNodePeersInfo(w http.ResponseWriter, req *http.Request) { + peersInfo := make(map[string][]*p2p.PeerInfo) + for _, node := range s.network.GetNodes() { + peersInfo[node.Config.Name] = node.PeersInfo() + } + + s.JSON(w, http.StatusOK, peersInfo) +} + // Options responds to the OPTIONS HTTP method by returning a 200 OK response // with the "Access-Control-Allow-Headers" header set to "Content-Type" func (s *Server) Options(w http.ResponseWriter, req *http.Request) { diff --git a/p2p/simulations/http_test.go b/p2p/simulations/http_test.go index 6d7f0b6d7a..d63747cc3e 100644 --- a/p2p/simulations/http_test.go +++ b/p2p/simulations/http_test.go @@ -621,6 +621,8 @@ func TestHTTPSnapshot(t *testing.T) { nodes := make([]*p2p.NodeInfo, nodeCount) for i := 0; i < nodeCount; i++ { config := adapters.RandomNodeConfig() + // No need to use TCP for this test + config.DisableTCPListener = true node, err := client.CreateNode(config) if err != nil { t.Fatalf("error creating node: %s", err) diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 962910dd25..e18266a5c1 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -584,6 +584,20 @@ func (net *Network) getRandomNode(ids []enode.ID, excludeIDs []enode.ID) *Node { return net.getNode(filtered[rand.Intn(l)]) } +// DeleteNode deletes the node with the given ID from the network +func (net *Network) DeleteNode(name string) { + net.lock.Lock() + defer net.lock.Unlock() + + for i, node := range net.Nodes { + if node.Config.Name == name { + delete(net.nodeMap, node.ID()) + net.Nodes = append(net.Nodes[:i], net.Nodes[i+1:]...) + break + } + } +} + func filterIDs(ids []enode.ID, excludeIDs []enode.ID) []enode.ID { exclude := make(map[enode.ID]bool) for _, id := range excludeIDs { diff --git a/p2p/simulations/network_test.go b/p2p/simulations/network_test.go index d5651441a2..79af296eb4 100644 --- a/p2p/simulations/network_test.go +++ b/p2p/simulations/network_test.go @@ -64,6 +64,8 @@ func TestSnapshot(t *testing.T) { ids := make([]enode.ID, nodeCount) for i := 0; i < nodeCount; i++ { conf := adapters.RandomNodeConfig() + // No need to use TCP for this test + conf.DisableTCPListener = true node, err := network.NewNodeWithConfig(conf) if err != nil { t.Fatalf("error creating node: %s", err)