diff --git a/src/producer/future_producer.rs b/src/producer/future_producer.rs index 4bdea7e2a..5168d8b1e 100644 --- a/src/producer/future_producer.rs +++ b/src/producer/future_producer.rs @@ -132,14 +132,25 @@ pub struct FutureProducerContext { wrapped_context: C, } +/// If message delivery was successful, this information about the message is returned. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub struct MessageInfo { + /// Partition of the message + pub partition: i32, + /// Offset of the message + pub offset: i64, + /// Timestamp of the message + pub timestamp: Timestamp, +} + /// Represents the result of message production as performed from the /// `FutureProducer`. /// /// If message delivery was successful, `OwnedDeliveryResult` will return the -/// partition and offset of the message. If the message failed to be delivered -/// an error will be returned, together with an owned copy of the original -/// message. -pub type OwnedDeliveryResult = Result<(i32, i64, Timestamp), (KafkaError, OwnedMessage)>; +/// message metadata including the partition, offset and timestamp of the +/// message. If the message failed to be delivered an error will be returned, +/// together with an owned copy of the original message. +pub type OwnedDeliveryResult = Result; // Delegates all the methods calls to the wrapped context. impl ClientContext for FutureProducerContext { @@ -183,7 +194,11 @@ where tx: Box>, ) { let owned_delivery_result = match *delivery_result { - Ok(ref message) => Ok((message.partition(), message.offset(), message.timestamp())), + Ok(ref message) => Ok(MessageInfo { + partition: message.partition(), + offset: message.offset(), + timestamp: message.timestamp(), + }), Err((ref error, ref message)) => Err((error.clone(), message.detach())), }; let _ = tx.send(owned_delivery_result); // TODO: handle error diff --git a/tests/test_high_producers.rs b/tests/test_high_producers.rs index 76e02a5e4..1871714df 100644 --- a/tests/test_high_producers.rs +++ b/tests/test_high_producers.rs @@ -9,6 +9,7 @@ use rdkafka::client::DefaultClientContext; use rdkafka::config::ClientConfig; use rdkafka::error::{KafkaError, RDKafkaErrorCode}; use rdkafka::message::{Header, Headers, Message, OwnedHeaders}; +use rdkafka::producer::future_producer::MessageInfo; use rdkafka::producer::{FutureProducer, FutureRecord, Producer}; use rdkafka::util::Timeout; @@ -44,7 +45,11 @@ async fn test_future_producer_send() { let results: Vec<_> = results.collect().await; assert!(results.len() == 10); for (i, result) in results.into_iter().enumerate() { - let (partition, offset, _) = result.unwrap(); + let MessageInfo { + partition, + offset, + timestamp: _, + } = result.unwrap(); assert_eq!(partition, 1); assert_eq!(offset, i as i64); } diff --git a/tests/utils.rs b/tests/utils.rs index 0fade0414..1ac034d50 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -5,6 +5,7 @@ use std::env::{self, VarError}; use std::time::Duration; use rand::Rng; +use rdkafka::producer::future_producer::MessageInfo; use regex::Regex; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; @@ -148,7 +149,11 @@ where let mut message_map = HashMap::new(); for (id, future) in futures { match future.await { - Ok((partition, offset, _)) => message_map.insert((partition, offset), id), + Ok(MessageInfo { + partition, + offset, + timestamp: _, + }) => message_map.insert((partition, offset), id), Err((kafka_error, _message)) => panic!("Delivery failed: {}", kafka_error), }; }