Skip to content

Commit 8ccd2ac

Browse files
author
Scott Hutton
committed
Track last error atomically
Eliminate the need for UdpSocket to be passed mutably into send_mmsg().
1 parent 4b1e816 commit 8ccd2ac

File tree

2 files changed

+59
-26
lines changed

2 files changed

+59
-26
lines changed

src/lib.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
use std::{
33
net::{IpAddr, Ipv6Addr, SocketAddr},
44
sync::atomic::{AtomicUsize, Ordering},
5-
time::{Duration, Instant},
65
};
76

87
pub use crate::cmsg::{AsPtr, EcnCodepoint, Source, Transmit};
8+
use imp::LastSendError;
99
use tracing::warn;
1010

1111
mod cmsg;
@@ -94,20 +94,18 @@ impl Default for RecvMeta {
9494
}
9595

9696
/// Log at most 1 IO error per minute
97-
const IO_ERROR_LOG_INTERVAL: Duration = std::time::Duration::from_secs(60);
97+
const IO_ERROR_LOG_INTERVAL: u64 = 60;
9898

9999
/// Logs a warning message when sendmsg fails
100100
///
101101
/// Logging will only be performed if at least [`IO_ERROR_LOG_INTERVAL`]
102102
/// has elapsed since the last error was logged.
103103
fn log_sendmsg_error<B: AsPtr<u8>>(
104-
last_send_error: &mut Instant,
104+
last_send_error: LastSendError,
105105
err: impl core::fmt::Debug,
106106
transmit: &Transmit<B>,
107107
) {
108-
let now = Instant::now();
109-
if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL {
110-
*last_send_error = now;
108+
if last_send_error.should_log() {
111109
warn!(
112110
"sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, enc: {:?}, len: {:?}, segment_size: {:?} }}",
113111
err, transmit.dst, transmit.src, transmit.ecn, transmit.contents.len(), transmit.segment_size);

src/unix.rs

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@ use std::{
44
mem::{self, MaybeUninit},
55
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
66
os::{fd::AsFd, unix::io::AsRawFd},
7-
sync::atomic::AtomicUsize,
7+
sync::{
8+
atomic::{AtomicU64, AtomicUsize, Ordering},
9+
Arc,
10+
},
811
task::{Context, Poll},
9-
time::Instant,
12+
time::SystemTime,
1013
};
1114

1215
use crate::cmsg::{AsPtr, EcnCodepoint, Source, Transmit};
@@ -31,7 +34,7 @@ type IpTosTy = libc::c_int;
3134
#[derive(Debug)]
3235
pub struct UdpSocket {
3336
io: tokio::net::UdpSocket,
34-
last_send_error: Instant,
37+
last_send_error: LastSendError,
3538
}
3639

3740
impl AsRawFd for UdpSocket {
@@ -46,16 +49,47 @@ impl AsFd for UdpSocket {
4649
}
4750
}
4851

52+
#[derive(Clone, Debug)]
53+
pub(crate) struct LastSendError(Arc<AtomicU64>);
54+
55+
impl Default for LastSendError {
56+
fn default() -> Self {
57+
let now = Self::now();
58+
Self(Arc::new(AtomicU64::new(
59+
now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
60+
)))
61+
}
62+
}
63+
64+
impl LastSendError {
65+
fn now() -> u64 {
66+
SystemTime::now()
67+
.duration_since(SystemTime::UNIX_EPOCH)
68+
.unwrap()
69+
.as_secs()
70+
}
71+
72+
// Determine whether the last error was more than IO_ERROR_LOG_INTERVAL
73+
// seconds ago. If so, update the last error time and return true.
74+
pub(crate) fn should_log(&self) -> bool {
75+
let now = Self::now();
76+
self.0
77+
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |cur| {
78+
((now - cur) > IO_ERROR_LOG_INTERVAL).then_some(now)
79+
})
80+
.is_ok()
81+
}
82+
}
83+
4984
impl UdpSocket {
5085
/// Creates a new UDP socket from a previously created `std::net::UdpSocket`
5186
pub fn from_std(socket: std::net::UdpSocket) -> io::Result<UdpSocket> {
5287
socket.set_nonblocking(true)?;
5388

5489
init(SockRef::from(&socket))?;
55-
let now = Instant::now();
5690
Ok(UdpSocket {
5791
io: tokio::net::UdpSocket::from_std(socket)?,
58-
last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
92+
last_send_error: LastSendError::default(),
5993
})
6094
}
6195

@@ -67,10 +101,9 @@ impl UdpSocket {
67101
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket> {
68102
let io = tokio::net::UdpSocket::bind(addr).await?;
69103
init(SockRef::from(&io))?;
70-
let now = Instant::now();
71104
Ok(UdpSocket {
72105
io,
73-
last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
106+
last_send_error: LastSendError::default(),
74107
})
75108
}
76109

@@ -195,13 +228,13 @@ impl UdpSocket {
195228
///
196229
/// [`sendmmsg`]: https://linux.die.net/man/2/sendmmsg
197230
pub async fn send_mmsg<B: AsPtr<u8>>(
198-
&mut self,
231+
&self,
199232
state: &UdpState,
200233
transmits: &[Transmit<B>],
201234
) -> Result<usize, io::Error> {
202235
let n = loop {
203236
self.io.writable().await?;
204-
let last_send_error = &mut self.last_send_error;
237+
let last_send_error = self.last_send_error.clone();
205238
let io = &self.io;
206239
match io.try_io(Interest::WRITABLE, || {
207240
send(state, SockRef::from(io), last_send_error, transmits)
@@ -278,11 +311,15 @@ impl UdpSocket {
278311
transmits: &[Transmit<B>],
279312
) -> Poll<io::Result<usize>> {
280313
loop {
281-
let last_send_error = &mut self.last_send_error;
282314
ready!(self.io.poll_send_ready(cx))?;
283315
let io = &self.io;
284316
if let Ok(res) = io.try_io(Interest::WRITABLE, || {
285-
send(state, SockRef::from(io), last_send_error, transmits)
317+
send(
318+
state,
319+
SockRef::from(io),
320+
self.last_send_error.clone(),
321+
transmits,
322+
)
286323
}) {
287324
return Poll::Ready(Ok(res));
288325
}
@@ -353,7 +390,7 @@ pub mod sync {
353390
#[derive(Debug)]
354391
pub struct UdpSocket {
355392
io: std::net::UdpSocket,
356-
last_send_error: Instant,
393+
last_send_error: LastSendError,
357394
}
358395

359396
impl AsRawFd for UdpSocket {
@@ -372,21 +409,19 @@ pub mod sync {
372409
pub fn from_std(socket: std::net::UdpSocket) -> io::Result<Self> {
373410
init(SockRef::from(&socket))?;
374411
socket.set_nonblocking(false)?;
375-
let now = Instant::now();
376412
Ok(Self {
377413
io: socket,
378-
last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
414+
last_send_error: LastSendError::default(),
379415
})
380416
}
381417
/// create a new UDP socket and attempt to bind to `addr`
382418
pub fn bind<A: std::net::ToSocketAddrs>(addr: A) -> io::Result<Self> {
383419
let io = std::net::UdpSocket::bind(addr)?;
384420
init(SockRef::from(&io))?;
385421
io.set_nonblocking(false)?;
386-
let now = Instant::now();
387422
Ok(Self {
388423
io,
389-
last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
424+
last_send_error: LastSendError::default(),
390425
})
391426
}
392427
/// sets nonblocking mode
@@ -474,7 +509,7 @@ pub mod sync {
474509
send(
475510
state,
476511
SockRef::from(&self.io),
477-
&mut self.last_send_error,
512+
self.last_send_error.clone(),
478513
transmits,
479514
)
480515
}
@@ -681,7 +716,7 @@ fn send_msg<B: AsPtr<u8>>(
681716
fn send<B: AsPtr<u8>>(
682717
state: &UdpState,
683718
io: SockRef<'_>,
684-
last_send_error: &mut Instant,
719+
last_send_error: LastSendError,
685720
transmits: &[Transmit<B>],
686721
) -> io::Result<usize> {
687722
use std::ptr;
@@ -802,7 +837,7 @@ fn send_msg<B: AsPtr<u8>>(
802837
fn send<B: AsPtr<u8>>(
803838
_state: &UdpState,
804839
io: SockRef<'_>,
805-
last_send_error: &mut Instant,
840+
last_send_error: LastSendError,
806841
transmits: &[Transmit<B>],
807842
) -> io::Result<usize> {
808843
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
@@ -828,7 +863,7 @@ fn send<B: AsPtr<u8>>(
828863
// Those are not fatal errors, since the
829864
// configuration can be dynamically changed.
830865
// - Destination unreachable errors have been observed for other
831-
log_sendmsg_error(last_send_error, e, &transmits[sent]);
866+
log_sendmsg_error(last_send_error.clone(), e, &transmits[sent]);
832867
sent += 1;
833868
}
834869
}

0 commit comments

Comments
 (0)