Skip to content

Commit c0e840e

Browse files
authored
fix: require explicit muxer start before handshake (#1002)
This removes a race condition that caused random failures when the remote peer sent a handshake message before we have registered the protocol Signed-off-by: Aurora Gaffney <[email protected]>
1 parent 7ed7244 commit c0e840e

File tree

2 files changed

+21
-12
lines changed

2 files changed

+21
-12
lines changed

connection.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ func (c *Connection) setupConnection() error {
341341
} else {
342342
c.handshake.Client.Start()
343343
}
344+
c.muxer.StartOnce()
344345
// Wait for handshake completion or error
345346
select {
346347
case <-c.doneChan:

muxer/muxer.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,15 @@ func (m *Muxer) Start() {
111111
}
112112
}
113113

114+
// StartOnce unblocks the read loop for one iteration. This is generally used to perform the handshake before registering
115+
// additional protocols and calling Start
116+
func (m *Muxer) StartOnce() {
117+
select {
118+
case m.startChan <- false:
119+
default:
120+
}
121+
}
122+
114123
// Stop shuts down the muxer
115124
func (m *Muxer) Stop() {
116125
m.onceStop.Do(func() {
@@ -262,6 +271,17 @@ func (m *Muxer) readLoop() {
262271
return
263272
default:
264273
}
274+
// Wait until the muxer is started to continue
275+
if !started {
276+
select {
277+
case <-m.doneChan:
278+
// Break out of read loop if we're shutting down
279+
return
280+
case v := <-m.startChan:
281+
// We block again on the next iteration of we get 'false' from startChan
282+
started = v
283+
}
284+
}
265285
header := SegmentHeader{}
266286
if err := binary.Read(m.conn, binary.BigEndian, &header); err != nil {
267287
if errors.Is(err, io.ErrClosedPipe) {
@@ -334,17 +354,5 @@ func (m *Muxer) readLoop() {
334354
return
335355
}
336356
recvChan <- msg
337-
338-
// Wait until the muxer is started to continue
339-
// We don't want to read more than one segment until the handshake is complete
340-
if !started {
341-
select {
342-
case <-m.doneChan:
343-
// Break out of read loop if we're shutting down
344-
return
345-
case <-m.startChan:
346-
started = true
347-
}
348-
}
349357
}
350358
}

0 commit comments

Comments
 (0)