Skip to content

Commit 1a0c776

Browse files
authored
Merge pull request #582 from denizevrenci/issue_579
Add assertions and cleanup
2 parents eef60cf + 4d5979c commit 1a0c776

File tree

4 files changed

+82
-84
lines changed

4 files changed

+82
-84
lines changed

aeron-client/src/main/cpp/Image.h

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
#include <concurrent/logbuffer/TermBlockScanner.h>
2626
#include <concurrent/status/UnsafeBufferPosition.h>
2727
#include <algorithm>
28+
#include <array>
2829
#include <atomic>
30+
#include <cassert>
2931
#include "LogBuffers.h"
3032

3133
namespace aeron {
@@ -140,38 +142,29 @@ class Image
140142
}
141143

142144
Image(const Image& image) :
145+
m_termBuffers(image.m_termBuffers),
143146
m_header(image.m_header),
144147
m_subscriberPosition(image.m_subscriberPosition),
148+
m_logBuffers(image.m_logBuffers),
145149
m_sourceIdentity(image.m_sourceIdentity),
146150
m_isClosed(image.isClosed()),
147-
m_exceptionHandler(image.m_exceptionHandler)
151+
m_exceptionHandler(image.m_exceptionHandler),
152+
m_correlationId(image.m_correlationId),
153+
m_subscriptionRegistrationId(image.m_subscriptionRegistrationId),
154+
m_joinPosition(image.m_joinPosition),
155+
m_finalPosition(image.m_finalPosition),
156+
m_sessionId(image.m_sessionId),
157+
m_termLengthMask(image.m_termLengthMask),
158+
m_positionBitsToShift(image.m_positionBitsToShift),
159+
m_isEos(image.m_isEos)
148160
{
149-
for (int i = 0; i < LogBufferDescriptor::PARTITION_COUNT; i++)
150-
{
151-
m_termBuffers[i].wrap(image.m_termBuffers[i]);
152-
}
153-
154-
m_subscriberPosition.wrap(image.m_subscriberPosition);
155-
m_logBuffers = image.m_logBuffers;
156-
m_correlationId = image.m_correlationId;
157-
m_subscriptionRegistrationId = image.m_subscriptionRegistrationId;
158-
m_joinPosition = image.m_joinPosition;
159-
m_finalPosition = image.m_finalPosition;
160-
m_sessionId = image.m_sessionId;
161-
m_termLengthMask = image.m_termLengthMask;
162-
m_positionBitsToShift = image.m_positionBitsToShift;
163-
m_isEos = image.m_isEos;
164161
}
165162

166-
Image& operator=(Image& image)
163+
Image& operator=(const Image& image)
167164
{
168-
for (int i = 0; i < LogBufferDescriptor::PARTITION_COUNT; i++)
169-
{
170-
m_termBuffers[i].wrap(image.m_termBuffers[i]);
171-
}
172-
165+
m_termBuffers = image.m_termBuffers;
173166
m_header = image.m_header;
174-
m_subscriberPosition.wrap(image.m_subscriberPosition);
167+
m_subscriberPosition = image.m_subscriberPosition;
175168
m_logBuffers = image.m_logBuffers;
176169
m_sourceIdentity = image.m_sourceIdentity;
177170
m_isClosed = image.isClosed();
@@ -355,8 +348,9 @@ class Image
355348
{
356349
const std::int64_t position = m_subscriberPosition.get();
357350
const std::int32_t termOffset = (std::int32_t) position & m_termLengthMask;
358-
AtomicBuffer &termBuffer = m_termBuffers[LogBufferDescriptor::indexByPosition(
359-
position, m_positionBitsToShift)];
351+
const int index = LogBufferDescriptor::indexByPosition(position, m_positionBitsToShift);
352+
assert(index >= 0 && index < LogBufferDescriptor::PARTITION_COUNT);
353+
AtomicBuffer &termBuffer = m_termBuffers[index];
360354
TermReader::ReadOutcome readOutcome;
361355

362356
TermReader::read(readOutcome, termBuffer, termOffset, fragmentHandler, fragmentLimit, m_header, m_exceptionHandler);
@@ -395,8 +389,9 @@ class Image
395389
int fragmentsRead = 0;
396390
std::int64_t initialPosition = m_subscriberPosition.get();
397391
std::int32_t initialOffset = (std::int32_t) initialPosition & m_termLengthMask;
398-
AtomicBuffer &termBuffer = m_termBuffers[LogBufferDescriptor::indexByPosition(
399-
initialPosition, m_positionBitsToShift)];
392+
const int index = LogBufferDescriptor::indexByPosition(initialPosition, m_positionBitsToShift);
393+
assert(index >= 0 && index < LogBufferDescriptor::PARTITION_COUNT);
394+
AtomicBuffer &termBuffer = m_termBuffers[index];
400395
std::int32_t resultingOffset = initialOffset;
401396
const util::index_t capacity = termBuffer.capacity();
402397

@@ -489,8 +484,9 @@ class Image
489484
int fragmentsRead = 0;
490485
std::int64_t initialPosition = m_subscriberPosition.get();
491486
std::int32_t initialOffset = (std::int32_t) initialPosition & m_termLengthMask;
492-
AtomicBuffer &termBuffer = m_termBuffers[LogBufferDescriptor::indexByPosition(
493-
initialPosition, m_positionBitsToShift)];
487+
const int index = LogBufferDescriptor::indexByPosition(initialPosition, m_positionBitsToShift);
488+
assert(index >= 0 && index < LogBufferDescriptor::PARTITION_COUNT);
489+
AtomicBuffer &termBuffer = m_termBuffers[index];
494490
std::int32_t resultingOffset = initialOffset;
495491
const std::int64_t capacity = termBuffer.capacity();
496492
const std::int32_t endOffset =
@@ -587,8 +583,9 @@ class Image
587583
std::int32_t initialOffset = static_cast<std::int32_t>(initialPosition & m_termLengthMask);
588584
std::int32_t offset = initialOffset;
589585
std::int64_t position = initialPosition;
590-
AtomicBuffer &termBuffer = m_termBuffers[LogBufferDescriptor::indexByPosition(
591-
initialPosition, m_positionBitsToShift)];
586+
const int index = LogBufferDescriptor::indexByPosition(initialPosition, m_positionBitsToShift);
587+
assert(index >= 0 && index < LogBufferDescriptor::PARTITION_COUNT);
588+
AtomicBuffer &termBuffer = m_termBuffers[index];
592589
const util::index_t capacity = termBuffer.capacity();
593590

594591
m_header.buffer(termBuffer);
@@ -677,8 +674,9 @@ class Image
677674
{
678675
const std::int64_t position = m_subscriberPosition.get();
679676
const std::int32_t termOffset = (std::int32_t) position & m_termLengthMask;
680-
AtomicBuffer &termBuffer = m_termBuffers[LogBufferDescriptor::indexByPosition(
681-
position, m_positionBitsToShift)];
677+
const int index = LogBufferDescriptor::indexByPosition(position, m_positionBitsToShift);
678+
assert(index >= 0 && index < LogBufferDescriptor::PARTITION_COUNT);
679+
AtomicBuffer &termBuffer = m_termBuffers[index];
682680
const std::int32_t limitOffset = std::min(termOffset + blockLengthLimit, termBuffer.capacity());
683681
const std::int32_t resultingOffset = TermBlockScanner::scan(termBuffer, termOffset, limitOffset);
684682
const std::int32_t length = resultingOffset - termOffset;
@@ -723,7 +721,7 @@ class Image
723721
/// @endcond
724722

725723
private:
726-
AtomicBuffer m_termBuffers[LogBufferDescriptor::PARTITION_COUNT];
724+
std::array<AtomicBuffer, LogBufferDescriptor::PARTITION_COUNT> m_termBuffers;
727725
Header m_header;
728726
Position<UnsafeBufferPosition> m_subscriberPosition;
729727
std::shared_ptr<LogBuffers> m_logBuffers;

aeron-client/src/main/cpp/concurrent/AtomicBuffer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class AtomicBuffer
8888
{
8989
}
9090

91-
AtomicBuffer& operator=(AtomicBuffer& buffer) = default;
91+
AtomicBuffer& operator=(const AtomicBuffer& buffer) = default;
9292

9393
// this class does not own the memory. It simply overlays it.
9494
virtual ~AtomicBuffer() = default;

aeron-client/src/main/cpp/concurrent/logbuffer/Header.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class Header
3838

3939
Header(const Header& header) = default;
4040

41-
Header& operator=(Header& header)
41+
Header& operator=(const Header& header)
4242
{
4343
m_context = header.m_context;
4444
m_buffer.wrap(header.m_buffer);

0 commit comments

Comments
 (0)