Skip to content

Track last error atomically #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
use std::{
net::{IpAddr, Ipv6Addr, SocketAddr},
sync::atomic::{AtomicUsize, Ordering},
time::{Duration, Instant},
};

pub use crate::cmsg::{AsPtr, EcnCodepoint, Source, Transmit};
use imp::LastSendError;
use tracing::warn;

mod cmsg;
Expand Down Expand Up @@ -94,20 +94,18 @@ impl Default for RecvMeta {
}

/// Log at most 1 IO error per minute
const IO_ERROR_LOG_INTERVAL: Duration = std::time::Duration::from_secs(60);
const IO_ERROR_LOG_INTERVAL: u64 = 60;

/// Logs a warning message when sendmsg fails
///
/// Logging will only be performed if at least [`IO_ERROR_LOG_INTERVAL`]
/// has elapsed since the last error was logged.
fn log_sendmsg_error<B: AsPtr<u8>>(
last_send_error: &mut Instant,
last_send_error: LastSendError,
err: impl core::fmt::Debug,
transmit: &Transmit<B>,
) {
let now = Instant::now();
if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL {
*last_send_error = now;
if last_send_error.should_log() {
warn!(
"sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, enc: {:?}, len: {:?}, segment_size: {:?} }}",
err, transmit.dst, transmit.src, transmit.ecn, transmit.contents.len(), transmit.segment_size);
Expand Down
79 changes: 59 additions & 20 deletions src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ use std::{
mem::{self, MaybeUninit},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
os::{fd::AsFd, unix::io::AsRawFd},
sync::atomic::AtomicUsize,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::Instant,
time::SystemTime,
};

use crate::cmsg::{AsPtr, EcnCodepoint, Source, Transmit};
Expand All @@ -31,7 +34,7 @@ type IpTosTy = libc::c_int;
#[derive(Debug)]
pub struct UdpSocket {
io: tokio::net::UdpSocket,
last_send_error: Instant,
last_send_error: LastSendError,
}

impl AsRawFd for UdpSocket {
Expand All @@ -46,16 +49,51 @@ impl AsFd for UdpSocket {
}
}

#[derive(Clone, Debug)]
pub(crate) struct LastSendError(Arc<AtomicU64>);

impl Default for LastSendError {
fn default() -> Self {
let now = Self::now();
Self(Arc::new(AtomicU64::new(
now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
)))
}
}

impl LastSendError {
fn now() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
}

/// Determine whether the last error was more than `IO_ERROR_LOG_INTERVAL`
/// seconds ago. If so, update the last error time and return true.
///
/// Note: if the system clock regresses more tha `IO_ERROR_LOG_INTERVAL`,
/// this function may impose an additional delay on log message emission.
/// Similarly, if it advances, messages may be emitted prematurely.
pub(crate) fn should_log(&self) -> bool {
let now = Self::now();
self.0
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |cur| {
(now.saturating_sub(cur) > IO_ERROR_LOG_INTERVAL).then_some(now)
})
.is_ok()
}
}

impl UdpSocket {
/// Creates a new UDP socket from a previously created `std::net::UdpSocket`
pub fn from_std(socket: std::net::UdpSocket) -> io::Result<UdpSocket> {
socket.set_nonblocking(true)?;

init(SockRef::from(&socket))?;
let now = Instant::now();
Ok(UdpSocket {
io: tokio::net::UdpSocket::from_std(socket)?,
last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
last_send_error: LastSendError::default(),
})
}

Expand All @@ -67,10 +105,9 @@ impl UdpSocket {
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket> {
let io = tokio::net::UdpSocket::bind(addr).await?;
init(SockRef::from(&io))?;
let now = Instant::now();
Ok(UdpSocket {
io,
last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
last_send_error: LastSendError::default(),
})
}

Expand Down Expand Up @@ -195,13 +232,13 @@ impl UdpSocket {
///
/// [`sendmmsg`]: https://linux.die.net/man/2/sendmmsg
pub async fn send_mmsg<B: AsPtr<u8>>(
&mut self,
&self,
state: &UdpState,
transmits: &[Transmit<B>],
) -> Result<usize, io::Error> {
let n = loop {
self.io.writable().await?;
let last_send_error = &mut self.last_send_error;
let last_send_error = self.last_send_error.clone();
let io = &self.io;
match io.try_io(Interest::WRITABLE, || {
send(state, SockRef::from(io), last_send_error, transmits)
Expand Down Expand Up @@ -278,11 +315,15 @@ impl UdpSocket {
transmits: &[Transmit<B>],
) -> Poll<io::Result<usize>> {
loop {
let last_send_error = &mut self.last_send_error;
ready!(self.io.poll_send_ready(cx))?;
let io = &self.io;
if let Ok(res) = io.try_io(Interest::WRITABLE, || {
send(state, SockRef::from(io), last_send_error, transmits)
send(
state,
SockRef::from(io),
self.last_send_error.clone(),
transmits,
)
}) {
return Poll::Ready(Ok(res));
}
Expand Down Expand Up @@ -353,7 +394,7 @@ pub mod sync {
#[derive(Debug)]
pub struct UdpSocket {
io: std::net::UdpSocket,
last_send_error: Instant,
last_send_error: LastSendError,
}

impl AsRawFd for UdpSocket {
Expand All @@ -372,21 +413,19 @@ pub mod sync {
pub fn from_std(socket: std::net::UdpSocket) -> io::Result<Self> {
init(SockRef::from(&socket))?;
socket.set_nonblocking(false)?;
let now = Instant::now();
Ok(Self {
io: socket,
last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
last_send_error: LastSendError::default(),
})
}
/// create a new UDP socket and attempt to bind to `addr`
pub fn bind<A: std::net::ToSocketAddrs>(addr: A) -> io::Result<Self> {
let io = std::net::UdpSocket::bind(addr)?;
init(SockRef::from(&io))?;
io.set_nonblocking(false)?;
let now = Instant::now();
Ok(Self {
io,
last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
last_send_error: LastSendError::default(),
})
}
/// sets nonblocking mode
Expand Down Expand Up @@ -474,7 +513,7 @@ pub mod sync {
send(
state,
SockRef::from(&self.io),
&mut self.last_send_error,
self.last_send_error.clone(),
transmits,
)
}
Expand Down Expand Up @@ -681,7 +720,7 @@ fn send_msg<B: AsPtr<u8>>(
fn send<B: AsPtr<u8>>(
state: &UdpState,
io: SockRef<'_>,
last_send_error: &mut Instant,
last_send_error: LastSendError,
transmits: &[Transmit<B>],
) -> io::Result<usize> {
use std::ptr;
Expand Down Expand Up @@ -802,7 +841,7 @@ fn send_msg<B: AsPtr<u8>>(
fn send<B: AsPtr<u8>>(
_state: &UdpState,
io: SockRef<'_>,
last_send_error: &mut Instant,
last_send_error: LastSendError,
transmits: &[Transmit<B>],
) -> io::Result<usize> {
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
Expand All @@ -828,7 +867,7 @@ fn send<B: AsPtr<u8>>(
// Those are not fatal errors, since the
// configuration can be dynamically changed.
// - Destination unreachable errors have been observed for other
log_sendmsg_error(last_send_error, e, &transmits[sent]);
log_sendmsg_error(last_send_error.clone(), e, &transmits[sent]);
sent += 1;
}
}
Expand Down
Loading