-
Notifications
You must be signed in to change notification settings - Fork 5.8k
Implement netperf for "mc support perf net" #14397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6850f68 to
42de653
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides the mentioned, it should detect if any speedtest is currently running, and not start a new.
14ef922 to
910d68e
Compare
|
Here is an example of how stats can be collected safely. When the request comes in, the server decides on a random ID. The first request that comes with the ID to any server starts recording bytes. I added some time keeping on first/last connection. Not sure how much it is needed. This is nor order dependent, and ensures you don't have multiple running at the same time. var globalNetperfRX netperfStats
// Used for collecting stats for netperf
type netperfStats struct {
received uint64
id uint64
first int64
last int64
connected int32
mu sync.Mutex
}
// recorder will return a counter linked to the id.
// If the ID is unknown and no other counters are running
// a new will be initialized.
// The returned pointer must be atomically incremented.
// If nil is returned another recording is already.
// Whenever a non-nil value is returned release() must be called.
func (n *netperfStats) recorder(id uint64) *uint64 {
n.mu.Lock()
defer n.mu.Unlock()
if id == n.id {
atomic.AddInt32(&n.connected, 1)
return &n.received
}
if atomic.LoadInt32(&n.connected) > 0 {
return nil
}
// Replace with new, no-one is connected.
n.id = id
n.received = 0
n.connected++
n.first = time.Now().UnixNano()
n.last = 0
return &n.received
}
// getRecorded returns statistics recorded on the specific ID.
func (n *netperfStats) getRecorded(id uint64) (uint64, time.Duration, error) {
n.mu.Lock()
defer n.mu.Unlock()
if n.id != id {
return 0, 0, errors.New("id not found")
}
if n.connected > 0 {
return 0, 0, errors.New("clients are still connected")
}
return n.received, time.Duration(n.last-n.first) * time.Nanosecond, nil
}
// release a connection from the stats.
func (n *netperfStats) release() {
atomic.AddInt32(&n.connected, -1)
atomic.StoreInt64(&n.last, time.Now().UnixNano())
}
// DevNull - everything goes to io.Discard
func (s *peerRESTServer) DevNull(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
id, err := strconv.ParseUint(r.Form.Get("id"), 10, 64)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
rx := globalNetperfRX.recorder(id)
if rx == nil {
w.WriteHeader(http.StatusConflict)
return
}
defer globalNetperfRX.release()
for {
n, err := io.CopyN(io.Discard, r.Body, 128*humanize.KiByte)
atomic.AddUint64(rx, uint64(n))
if err != nil {
break
}
}
} |
910d68e to
84e295a
Compare
@klauspost this implementation/code is not worth it. Though this covers the start of the test but not the end of the test. i.e after 10 sec timeout we have to wait till peers stop sending data if we want to be accurate towards the end of the test. But that extends the duration for RX to more than 10 seconds. So for TX the duration will be 10 seconds but RX will be more than 10 secs. Adjusting calculation for these timing variations will have additional complexity. The "net perf" test's purpose is to get a quick rough estimate of the network capability to rule out glaring network issues. The code implemented in this PR is good enough as proven by the tests. |
|
@krishnasrinivas The timing is purely optional. But it will at least record when you have a hard time starting at the same time. Starting in sync is very hard. Warp gives an absolute start time 3 seconds in the future, but that requires that clocks are in sync, which for sure isn't perfect, though we check if they are within 1 second absolute time. The main change is to make the current system fault tolerant and not risk multiple running at once and ensure that stats aren't dropped in high latency cases. We could also add max concurrent count, and maybe only average that. Let me give that a shot. |
|
@krishnasrinivas This solves the timing problem by only recording between last connected and first disconnected. var globalNetperfRX netperfStats
// Used for collecting stats for netperf
type netperfStats struct {
id uint64 // id of current test
rxCounter uint64 // temporary counter.
received uint64 // received between lastConnected and firstDisconnect
lastConnected int64 // time when last client connected.
firstDisconnect int64 // time when first client disconnected.
connected int32 // n connected.
someDisconnected int32 // 0 when none have disconnected yet, 1 when any has disconnected.
mu sync.Mutex
}
// recorder will return a counter linked to the id.
// If the ID is unknown and no other counters are running
// a new will be initialized.
// The returned pointer must be atomically incremented.
// If nil is returned another recording is already.
// Whenever a non-nil value is returned release() must be called.
func (n *netperfStats) recorder(id uint64) *uint64 {
n.mu.Lock()
defer n.mu.Unlock()
if id == n.id {
atomic.AddInt32(&n.connected, 1)
// Reset timing
atomic.StoreUint64(&n.rxCounter, 0)
atomic.StoreInt64(&n.lastConnected, time.Now().UnixNano())
return &n.rxCounter
}
if atomic.LoadInt32(&n.connected) > 0 {
return nil
}
// Replace with new, no-one is connected.
n.id = id
n.rxCounter = 0
n.connected++
n.lastConnected = time.Now().UnixNano()
n.firstDisconnect = 0
n.someDisconnected = 0
return &n.rxCounter
}
// getRecorded returns statistics recorded on the specific ID.
func (n *netperfStats) getRecorded(id uint64) (uint64, time.Duration, error) {
n.mu.Lock()
defer n.mu.Unlock()
if n.id != id {
return 0, 0, errors.New("id not found")
}
if n.connected > 0 {
return 0, 0, errors.New("clients are still connected")
}
return n.rxCounter, time.Duration(n.firstDisconnect-n.lastConnected) * time.Nanosecond, nil
}
// release a connection from the stats.
func (n *netperfStats) release() {
atomic.AddInt32(&n.connected, -1)
// Store when first disconnects.
if atomic.CompareAndSwapInt32(&n.someDisconnected, 0, 1) {
r := atomic.LoadUint64(&n.rxCounter)
atomic.StoreUint64(&n.received, r)
atomic.StoreInt64(&n.firstDisconnect, time.Now().UnixNano())
}
}
// DevNull - everything goes to io.Discard
func (s *peerRESTServer) DevNull(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
id, err := strconv.ParseUint(r.Form.Get("id"), 10, 64)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
rx := globalNetperfRX.recorder(id)
if rx == nil {
w.WriteHeader(http.StatusConflict)
return
}
defer globalNetperfRX.release()
for {
n, err := io.CopyN(io.Discard, r.Body, 128*humanize.KiByte)
atomic.AddUint64(rx, uint64(n))
if err != nil {
break
}
}
}Note that you can have |
|
@klauspost my point is the implementation in the PR is the best approach because of code simplicity. I have added locking as well so that only one instance of netperf is running. My tests in lab show that it shows correct throughput numbers. In future if there are any shortcomings in the implementation then we can fix it at that point. netperf's purpose is not to give highly accurate numbers, but to just give a quick rough estimate of the network throughput capabilities of the servers. |
|
@krishnasrinivas "Works on my machine" is not good enough. We need to be reliable in many setups. What happens with 50 machines? 500? Implementing it is trivial, you only really need to send a random ID, and you get actual peak performance, as well as an indication of data quality. |
a383848 to
85a052b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest of it looks good to me.
e3bbf14 to
c1f19f5
Compare
|
@klauspost can you check? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏼
Co-authored-by: Klaus Post <klauspost@gmail.com>
Mint Automation
|


Description
Support for mesh style throughput test
Motivation and Context
How to test this PR?
mc support perf net ALIAS
Related mc PR: minio/mc#3998
Types of changes
Checklist:
commit-idorPR #here)