Skip to content

Commit c97943a

Browse files
committed
[#noissue] Fixed redis pubsub test
1 parent f99c438 commit c97943a

File tree

2 files changed

+34
-33
lines changed

2 files changed

+34
-33
lines changed

redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubFluxClientImpl.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,8 @@ public Flux<S> request(D demand) {
6060

6161
static class LongTermSubConsumer<S> implements SubConsumer<SupplyMessage<S>> {
6262

63-
private static final Comparator<SupplyMessage<?>> supplyComparator = new Comparator<>() {
64-
@Override
65-
public int compare(SupplyMessage<?> o1, SupplyMessage<?> o2) {
66-
return o1.getSequence() - o2.getSequence();
67-
}
68-
};
63+
private static final Comparator<SupplyMessage<?>> supplyComparator =
64+
Comparator.comparing(el -> el.getSequence());
6965

7066
final Sinks.Many<S> sink;
7167
final Identifier demandId;
@@ -94,15 +90,15 @@ public boolean consume(SupplyMessage<S> supply) {
9490
if (supply.getSequence() == nextSequence) {
9591
consume0(supply);
9692
nextSequence += 1;
97-
} else {
98-
supplies.add(supply);
9993
while (supplies.peek() != null && supplies.peek().getSequence() == nextSequence) {
10094
final SupplyMessage<S> pended = supplies.poll();
10195
if (pended != null) {
10296
consume0(pended);
10397
nextSequence += 1;
10498
}
10599
}
100+
} else {
101+
supplies.add(supply);
106102
}
107103
}
108104
return true;

redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisReqResTest.java

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.navercorp.pinpoint.pubsub.endpoint.PubSubServerFactory;
2222
import com.navercorp.pinpoint.pubsub.endpoint.PubSubServiceDescriptor;
2323
import com.navercorp.pinpoint.redis.stream.RedisStreamConfig;
24+
import org.junit.jupiter.api.AfterAll;
25+
import org.junit.jupiter.api.BeforeAll;
2426
import org.junit.jupiter.api.DisplayName;
2527
import org.junit.jupiter.api.Test;
2628
import org.springframework.context.ApplicationContext;
@@ -41,6 +43,28 @@
4143
@DisplayName("req/res based on redis")
4244
public class RedisReqResTest {
4345

46+
private static GenericContainer<?> redisContainer;
47+
48+
@BeforeAll
49+
@SuppressWarnings("resource")
50+
public static void beforeAll() {
51+
redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0"))
52+
.withExposedPorts(6379)
53+
.withReuse(true);
54+
redisContainer.start();
55+
56+
System.setProperty("spring.data.redis.host", redisContainer.getHost());
57+
System.setProperty("spring.redis.host", redisContainer.getHost());
58+
System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString());
59+
System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString());
60+
}
61+
62+
@AfterAll
63+
public static void afterAll() {
64+
redisContainer.stop();
65+
redisContainer.close();
66+
}
67+
4468
@DisplayName("req/res based on redis pubsub")
4569
@Test
4670
public void testRedisPubSub() {
@@ -54,31 +78,11 @@ public void testRedisStreamPubSub() {
5478
}
5579

5680
private void testConfigClass(Class<?> configClass) {
57-
runWithRedisContainer(() -> {
58-
final ApplicationContext context = new AnnotationConfigApplicationContext(configClass);
59-
testServerClientFactory(
60-
context.getBean(PubSubServerFactory.class),
61-
context.getBean(PubSubClientFactory.class)
62-
);
63-
});
64-
}
65-
66-
@SuppressWarnings("resource")
67-
private void runWithRedisContainer(Runnable r) {
68-
try (final GenericContainer<?> redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0"))
69-
.withExposedPorts(6379)
70-
.withReuse(true)
71-
) {
72-
redisContainer.start();
73-
System.setProperty("spring.data.redis.host", redisContainer.getHost());
74-
System.setProperty("spring.redis.host", redisContainer.getHost());
75-
System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString());
76-
System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString());
77-
78-
r.run();
79-
80-
redisContainer.stop();
81-
}
81+
final ApplicationContext context = new AnnotationConfigApplicationContext(configClass);
82+
testServerClientFactory(
83+
context.getBean(PubSubServerFactory.class),
84+
context.getBean(PubSubClientFactory.class)
85+
);
8286
}
8387

8488
private void testServerClientFactory(
@@ -99,6 +103,7 @@ private void testServerClientFactory(
99103
PubSubServiceDescriptor.flux("range", Integer.class, Integer.class);
100104
serverFactory.build(el -> Flux.range(0, el), rangeService).afterPropertiesSet();
101105
assertThat(syncRequestFlux(clientFactory, rangeService, 5)).isEqualTo(List.of(0, 1, 2, 3, 4));
106+
assertThat(syncRequestFlux(clientFactory, rangeService, 3)).isEqualTo(List.of(0, 1, 2));
102107
}
103108

104109
private <D, S> S syncRequestMono(

0 commit comments

Comments
 (0)