Skip to content
This repository was archived by the owner on Jun 27, 2022. It is now read-only.

Commit fb6abcd

Browse files
committed
add read timeout
This commit adds a `set_read_timeout` function similar to Rust's `set_read_timeout`[1]. The differences compared to upstream are: - Operation always succeed. - It takes an i64 instead Duration. [1] https://doc.rust-lang.org/nightly/std/net/struct.UdpSocket.html#method.set_read_timeout
1 parent a326703 commit fb6abcd

File tree

1 file changed

+91
-23
lines changed

1 file changed

+91
-23
lines changed

src/socket.rs

Lines changed: 91 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ use std::cmp::{min, max};
22
use std::collections::VecDeque;
33
use std::net::{ToSocketAddrs, SocketAddr, UdpSocket};
44
use std::io::{Result, Error, ErrorKind};
5+
use std::error::Error as ErrorTrait;
56
use util::{now_microseconds, ewma};
67
use packet::{Packet, PacketType, Encodable, Decodable, ExtensionType, HEADER_SIZE};
78
use rand::{self, Rng};
89
use with_read_timeout::WithReadTimeout;
10+
use std::fmt;
911

1012
// For simplicity's sake, let us assume no packet will ever exceed the
1113
// Ethernet maximum transfer unit of 1500 bytes.
@@ -34,23 +36,45 @@ pub enum SocketError {
3436
ConnectionClosed,
3537
ConnectionReset,
3638
ConnectionTimedOut,
39+
UserTimedOut,
3740
InvalidAddress,
3841
InvalidPacket,
3942
InvalidReply,
4043
}
4144

45+
impl fmt::Display for SocketError {
46+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
47+
write!(f, "{:?}", *self)
48+
}
49+
}
50+
51+
impl ErrorTrait for SocketError {
52+
fn description(&self) -> &str {
53+
use self::SocketError::*;
54+
match *self {
55+
ConnectionClosed => "The socket is closed",
56+
ConnectionReset => "Connection reset by remote peer",
57+
ConnectionTimedOut => "Connection timed out",
58+
UserTimedOut => "User timeout reached",
59+
InvalidAddress => "Invalid address",
60+
InvalidPacket => "Error parsing packet",
61+
InvalidReply => "The remote peer sent an invalid reply",
62+
}
63+
}
64+
}
65+
4266
impl From<SocketError> for Error {
4367
fn from(error: SocketError) -> Error {
4468
use self::SocketError::*;
45-
let (kind, message) = match error {
46-
ConnectionClosed => (ErrorKind::NotConnected, "The socket is closed"),
47-
ConnectionReset => (ErrorKind::ConnectionReset, "Connection reset by remote peer"),
48-
ConnectionTimedOut => (ErrorKind::TimedOut, "Connection timed out"),
49-
InvalidAddress => (ErrorKind::InvalidInput, "Invalid address"),
50-
InvalidPacket => (ErrorKind::Other, "Error parsing packet"),
51-
InvalidReply => (ErrorKind::ConnectionRefused, "The remote peer sent an invalid reply"),
69+
let kind = match error {
70+
ConnectionClosed => ErrorKind::NotConnected,
71+
ConnectionReset => ErrorKind::ConnectionReset,
72+
ConnectionTimedOut | UserTimedOut => ErrorKind::TimedOut,
73+
InvalidAddress => ErrorKind::InvalidInput,
74+
InvalidPacket => ErrorKind::Other,
75+
InvalidReply => ErrorKind::ConnectionRefused,
5276
};
53-
Error::new(kind, message)
77+
Error::new(kind, error)
5478
}
5579
}
5680

@@ -185,6 +209,9 @@ pub struct UtpSocket {
185209

186210
/// Maximum retransmission retries
187211
pub max_retransmission_retries: u32,
212+
213+
/// Used by `set_read_timeout`.
214+
user_read_timeout: i64,
188215
}
189216

190217
impl UtpSocket {
@@ -231,6 +258,7 @@ impl UtpSocket {
231258
congestion_timeout: INITIAL_CONGESTION_TIMEOUT,
232259
cwnd: INIT_CWND * MSS,
233260
max_retransmission_retries: MAX_RETRANSMISSION_RETRIES,
261+
user_read_timeout: 0,
234262
}
235263
}
236264

@@ -335,7 +363,7 @@ impl UtpSocket {
335363
// Receive JAKE
336364
let mut buf = [0; BUF_SIZE];
337365
while self.state != SocketState::Closed {
338-
try!(self.recv(&mut buf));
366+
try!(self.recv(&mut buf, false));
339367
}
340368

341369
Ok(())
@@ -364,7 +392,8 @@ impl UtpSocket {
364392
return Ok((0, self.connected_to));
365393
}
366394

367-
match self.recv(buf) {
395+
let user_read_timeout = self.user_read_timeout;
396+
match self.recv(buf, user_read_timeout != 0) {
368397
Ok((0, _src)) => continue,
369398
Ok(x) => return Ok(x),
370399
Err(e) => return Err(e)
@@ -373,11 +402,32 @@ impl UtpSocket {
373402
}
374403
}
375404

376-
fn recv(&mut self, buf: &mut[u8]) -> Result<(usize, SocketAddr)> {
405+
/// Changes read operations to block for at most the specified number of
406+
/// milliseconds.
407+
pub fn set_read_timeout(&mut self, user_timeout: Option<i64>) {
408+
self.user_read_timeout = match user_timeout {
409+
Some(t) => {
410+
if t > 0 {
411+
t
412+
} else {
413+
0
414+
}
415+
},
416+
None => 0
417+
}
418+
}
419+
420+
fn recv(&mut self, buf: &mut[u8], use_user_timeout: bool)
421+
-> Result<(usize, SocketAddr)> {
377422
let mut b = [0; BUF_SIZE + HEADER_SIZE];
378423
let now = now_microseconds();
379424
let (read, src);
380425
let mut retries = 0;
426+
let user_timeout = if use_user_timeout {
427+
self.user_read_timeout
428+
} else {
429+
0
430+
};
381431

382432
// Try to receive a packet and handle timeouts
383433
loop {
@@ -387,17 +437,32 @@ impl UtpSocket {
387437
return Err(Error::from(SocketError::ConnectionTimedOut));
388438
}
389439

390-
let timeout = if self.state != SocketState::New {
440+
let congestion_timeout = if self.state != SocketState::New {
391441
debug!("setting read timeout of {} ms", self.congestion_timeout);
392442
self.congestion_timeout as i64
393443
} else { 0 };
444+
let timeout = if user_timeout != 0 {
445+
if congestion_timeout != 0 {
446+
use std::cmp::min;
447+
min(congestion_timeout, user_timeout)
448+
} else {
449+
user_timeout
450+
}
451+
} else {
452+
congestion_timeout
453+
};
454+
455+
if user_timeout != 0
456+
&& ((now_microseconds() - now) / 1000) as i64 >= user_timeout {
457+
return Err(Error::from(SocketError::UserTimedOut));
458+
}
394459

395460
match self.socket.recv_timeout(&mut b, timeout) {
396461
Ok((r, s)) => { read = r; src = s; break },
397462
Err(ref e) if (e.kind() == ErrorKind::WouldBlock ||
398463
e.kind() == ErrorKind::TimedOut) => {
399464
debug!("recv_from timed out");
400-
try!(self.handle_receive_timeout());
465+
try!(self.handle_receive_timeout(user_timeout != 0));
401466
},
402467
Err(e) => return Err(e),
403468
};
@@ -438,8 +503,11 @@ impl UtpSocket {
438503
Ok((read, src))
439504
}
440505

441-
fn handle_receive_timeout(&mut self) -> Result<()> {
442-
self.congestion_timeout = self.congestion_timeout * 2;
506+
fn handle_receive_timeout(&mut self, keep_current_timeout: bool)
507+
-> Result<()> {
508+
if !keep_current_timeout {
509+
self.congestion_timeout *= 2
510+
}
443511
self.cwnd = MSS;
444512

445513
// There are three possible cases here:
@@ -605,7 +673,7 @@ impl UtpSocket {
605673
let mut buf = [0u8; BUF_SIZE];
606674
while !self.send_window.is_empty() {
607675
debug!("packets in send window: {}", self.send_window.len());
608-
try!(self.recv(&mut buf));
676+
try!(self.recv(&mut buf, false));
609677
}
610678

611679
Ok(())
@@ -637,7 +705,7 @@ impl UtpSocket {
637705
debug!("self.duplicate_ack_count: {}", self.duplicate_ack_count);
638706
debug!("now_microseconds() - now = {}", now_microseconds() - now);
639707
let mut buf = [0; BUF_SIZE];
640-
try!(self.recv(&mut buf));
708+
try!(self.recv(&mut buf, false));
641709
}
642710
debug!("out: now_microseconds() - now = {}", now_microseconds() - now);
643711

@@ -1355,7 +1423,7 @@ mod test {
13551423
thread::spawn(move || {
13561424
// Make the server listen for incoming connections
13571425
let mut buf = [0u8; BUF_SIZE];
1358-
let _resp = server.recv(&mut buf);
1426+
let _resp = server.recv(&mut buf, false);
13591427
tx.send(server.seq_nr).unwrap();
13601428

13611429
// Close the connection
@@ -1719,7 +1787,7 @@ mod test {
17191787

17201788
let mut buf = [0; BUF_SIZE];
17211789
// Expect SYN
1722-
iotry!(server.recv(&mut buf));
1790+
iotry!(server.recv(&mut buf, false));
17231791

17241792
// Receive data
17251793
let data_packet = match server.socket.recv_from(&mut buf) {
@@ -1792,7 +1860,7 @@ mod test {
17921860
});
17931861

17941862
let mut buf = [0u8; BUF_SIZE];
1795-
server.recv(&mut buf).unwrap();
1863+
server.recv(&mut buf, false).unwrap();
17961864
// After establishing a new connection, the server's ids are a mirror of the client's.
17971865
assert_eq!(server.receiver_connection_id, server.sender_connection_id + 1);
17981866

@@ -1899,7 +1967,7 @@ mod test {
18991967
});
19001968

19011969
let mut buf = [0u8; BUF_SIZE];
1902-
iotry!(server.recv(&mut buf));
1970+
iotry!(server.recv(&mut buf, false));
19031971
// After establishing a new connection, the server's ids are a mirror of the client's.
19041972
assert_eq!(server.receiver_connection_id, server.sender_connection_id + 1);
19051973

@@ -2233,7 +2301,7 @@ mod test {
22332301
let mut buf = [0; BUF_SIZE];
22342302

22352303
// Accept connection
2236-
iotry!(server.recv(&mut buf));
2304+
iotry!(server.recv(&mut buf, false));
22372305

22382306
// Send FIN without acknowledging packets received
22392307
let mut packet = Packet::new();
@@ -2348,7 +2416,7 @@ mod test {
23482416

23492417
// Try to receive ACKs, time out too many times on flush, and fail with `TimedOut`
23502418
let mut buf = [0; BUF_SIZE];
2351-
match server.recv(&mut buf) {
2419+
match server.recv(&mut buf, false) {
23522420
Err(ref e) if e.kind() == ErrorKind::TimedOut => (),
23532421
x => panic!("Expected Err(TimedOut), got {:?}", x),
23542422
}

0 commit comments

Comments
 (0)