diff --git a/log4j-layout-template-json-test/pom.xml b/log4j-layout-template-json-test/pom.xml
index 7031fd4d80b..51bc8a9660b 100644
--- a/log4j-layout-template-json-test/pom.xml
+++ b/log4j-layout-template-json-test/pom.xml
@@ -43,6 +43,14 @@
org.apache.logging.log4j.layout.template.json.test
org.apache.logging.log4j.core
+
+ 8.15.1
+
@@ -85,6 +93,7 @@
co.elastic.clients
elasticsearch-java
+ ${elastic.version}
test
@@ -129,23 +138,6 @@
-
-
- org.apache.maven.plugins
- maven-failsafe-plugin
-
- true
-
-
-
-
- integration-test
- verify
-
-
-
-
-
org.apache.maven.plugins
maven-surefire-plugin
@@ -170,15 +162,29 @@
docker
+
- false
+
+ linux
+
+
+ env.CI
+ true
+
- 8.10.2
+
+
+ false
+ false
+
- -Xms750m -Xmx750m
+ -Xms750m -Xmx750m
+
@@ -188,7 +194,6 @@
io.fabric8
docker-maven-plugin
- all
true
true
@@ -199,10 +204,11 @@
single-node
false
- ${elastic.java-opts}
+ ${elastic.javaOpts}
- 9200:9200
+
+ localhost:elasticsearch.port:9200
custom
@@ -214,7 +220,11 @@
cyan
- recovered \[0\] indices into cluster_state
+
+
+ 9200
+
+
@@ -232,11 +242,13 @@
logstash
- ${elastic.java-opts}
+ ${elastic.javaOpts}
- 12222:12222
- 12345:12345
+
+ localhost:logstash.gelf.port:12222
+
+ localhost:logstash.tcp.port:12345
[LS]
@@ -248,54 +260,71 @@
--pipeline.batch.size
1
-e
- input {
+ "logstash"
+ use_tcp => true
+ use_udp => false
+ port => 12222
+ type => "gelf"
}
+
+ # Documentation: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html
tcp {
- port => 12345
- codec => json
- type => "tcp"
+ port => 12345
+ codec => json
+ type => "tcp"
}
+
}
filter {
if [type] == "gelf" {
# These are GELF/Syslog logging levels as defined in RFC 3164.
- # Map the integer level to its human readable format.
+ # Map the integer level to its human-readable format.
+ # Documentation: https://www.elastic.co/guide/en/logstash/current/plugins-filters-translate.html
translate {
- field => "[level]"
- destination => "[levelName]"
- dictionary => {
- "0" => "EMERG"
- "1" => "ALERT"
- "2" => "CRITICAL"
- "3" => "ERROR"
- "4" => "WARN"
- "5" => "NOTICE"
- "6" => "INFO"
- "7" => "DEBUG"
+ source => "[level]"
+ target => "[levelName]"
+ dictionary => {
+ "0" => "EMERG"
+ "1" => "ALERT"
+ "2" => "CRITICAL"
+ "3" => "ERROR"
+ "4" => "WARN"
+ "5" => "NOTICE"
+ "6" => "INFO"
+ "7" => "DEBUG"
}
}
}
}
+ # Documentation: https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html
output {
# (Un)comment for debugging purposes
- # stdout { codec => rubydebug }
+ # stdout { codec => rubydebug }
elasticsearch {
- hosts => ["http://elasticsearch:9200"]
- index => "log4j"
+ hosts => ["http://elasticsearch:9200"]
+ index => "log4j"
}
- }
+ }
+
+ ]]>
- Successfully started Logstash API endpoint
+
+ localhost
+
+ 12222
+ 12345
+
+
@@ -327,6 +356,11 @@
**/*IT.java
+
+ ${elasticsearch.port}
+ ${logstash.gelf.port}
+ ${logstash.tcp.port}
+
diff --git a/log4j-layout-template-json-test/src/test/java/org/apache/logging/log4j/layout/template/json/LogstashIT.java b/log4j-layout-template-json-test/src/test/java/org/apache/logging/log4j/layout/template/json/LogstashIT.java
index abb86f8ebc0..17b83629755 100644
--- a/log4j-layout-template-json-test/src/test/java/org/apache/logging/log4j/layout/template/json/LogstashIT.java
+++ b/log4j-layout-template-json-test/src/test/java/org/apache/logging/log4j/layout/template/json/LogstashIT.java
@@ -16,6 +16,9 @@
*/
package org.apache.logging.log4j.layout.template.json;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.HealthStatus;
@@ -32,6 +35,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
+import java.net.Socket;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@@ -41,6 +45,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -53,13 +58,11 @@
import org.apache.logging.log4j.core.config.DefaultConfiguration;
import org.apache.logging.log4j.core.impl.Log4jLogEvent;
import org.apache.logging.log4j.core.layout.GelfLayout;
-import org.apache.logging.log4j.core.util.NetUtils;
import org.apache.logging.log4j.layout.template.json.JsonTemplateLayout.EventTemplateAdditionalField;
import org.apache.logging.log4j.layout.template.json.util.ThreadLocalRecyclerFactory;
import org.apache.logging.log4j.message.SimpleMessage;
import org.apache.logging.log4j.status.StatusLogger;
-import org.assertj.core.api.Assertions;
-import org.awaitility.Awaitility;
+import org.apache.logging.log4j.util.Strings;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -71,14 +74,14 @@
@Execution(ExecutionMode.SAME_THREAD)
class LogstashIT {
+ private static final String LOG_PREFIX = LogstashIT.class.getSimpleName() + ' ';
+
private static final StatusLogger LOGGER = StatusLogger.getLogger();
private static final DefaultConfiguration CONFIGURATION = new DefaultConfiguration();
private static final Charset CHARSET = StandardCharsets.UTF_8;
- private static final String HOST_NAME = NetUtils.getLocalHostname();
-
private static final String SERVICE_NAME = "LogstashIT";
private static final String EVENT_DATASET = SERVICE_NAME + ".log";
@@ -88,7 +91,7 @@ class LogstashIT {
.setCharset(CHARSET)
.setCompressionType(GelfLayout.CompressionType.OFF)
.setIncludeNullDelimiter(true)
- .setHost(HOST_NAME)
+ .setHost(MavenHardcodedConstants.HOST_NAME)
.build();
private static final JsonTemplateLayout JSON_TEMPLATE_GELF_LAYOUT = JsonTemplateLayout.newBuilder()
@@ -99,13 +102,12 @@ class LogstashIT {
.setEventTemplateAdditionalFields(new EventTemplateAdditionalField[] {
EventTemplateAdditionalField.newBuilder()
.setKey("host")
- .setValue(HOST_NAME)
+ .setValue(MavenHardcodedConstants.HOST_NAME)
.build()
})
.build();
- // Note that EcsLayout doesn't support charset configuration, though it uses
- // UTF-8 internally.
+ // Note that `EcsLayout` doesn't support charset configuration, though it uses UTF-8 internally.
private static final EcsLayout ECS_LAYOUT = EcsLayout.newBuilder()
.setConfiguration(CONFIGURATION)
.setServiceName(SERVICE_NAME)
@@ -140,44 +142,85 @@ class LogstashIT {
private static ElasticsearchClient ES_CLIENT;
/**
- * Constants hardcoded in docker-maven-plugin configuration, do not change!
+ * Constants hardcoded in `docker-maven-plugin` configuration, do not change!
*/
private static final class MavenHardcodedConstants {
private MavenHardcodedConstants() {}
- private static final int LS_GELF_INPUT_PORT = 12222;
+ private static final String HOST_NAME = "localhost";
- private static final int LS_TCP_INPUT_PORT = 12345;
+ private static final int LS_GELF_INPUT_PORT = readPort("log4j.logstash.gelf.port");
- private static final int ES_PORT = 9200;
+ private static final int LS_TCP_INPUT_PORT = readPort("log4j.logstash.tcp.port");
+
+ private static final int ES_PORT = readPort("log4j.elasticsearch.port");
private static final String ES_INDEX_NAME = "log4j";
+
+ private static int readPort(final String propertyName) {
+ final String propertyValue = System.getProperty(propertyName);
+ final int port;
+ final String errorMessage = String.format(
+ "was expecting a valid port number in the system property `%s`, found: `%s`",
+ propertyName, propertyValue);
+ try {
+ if (Strings.isBlank(propertyValue) || (port = Integer.parseInt(propertyValue)) < 0 || port >= 0xFFFF) {
+ throw new IllegalArgumentException(errorMessage);
+ }
+ } catch (final NumberFormatException error) {
+ throw new IllegalArgumentException(errorMessage, error);
+ }
+ return port;
+ }
}
@BeforeAll
- public static void initClient() throws IOException {
+ public static void initEsClient() {
- LOGGER.info("instantiating the ES client");
- REST_CLIENT = RestClient.builder(
- HttpHost.create(String.format("http://%s:%d", HOST_NAME, MavenHardcodedConstants.ES_PORT)))
- .build();
+ LOGGER.info(LOG_PREFIX + "instantiating the ES client");
+ final String hostUri =
+ String.format("http://%s:%d", MavenHardcodedConstants.HOST_NAME, MavenHardcodedConstants.ES_PORT);
+ REST_CLIENT = RestClient.builder(HttpHost.create(hostUri)).build();
ES_TRANSPORT = new RestClientTransport(REST_CLIENT, new JacksonJsonpMapper());
ES_CLIENT = new ElasticsearchClient(ES_TRANSPORT);
- LOGGER.info("verifying the ES connection");
- HealthResponse healthResponse = ES_CLIENT.cluster().health();
- Assertions.assertThat(healthResponse.status()).isNotEqualTo(HealthStatus.Red);
+ LOGGER.info(LOG_PREFIX + "verifying the ES connection to `{}`", hostUri);
+ await("ES cluster health")
+ .pollDelay(100, TimeUnit.MILLISECONDS)
+ .atMost(1, TimeUnit.MINUTES)
+ .untilAsserted(() -> {
+ final HealthResponse healthResponse = ES_CLIENT.cluster().health();
+ assertThat(healthResponse.status()).isNotEqualTo(HealthStatus.Red);
+ });
+ }
+
+ @BeforeAll
+ public static void waitForLsInputSockets() {
+ waitForSocketBinding(MavenHardcodedConstants.LS_GELF_INPUT_PORT, "Logstash GELF input");
+ waitForSocketBinding(MavenHardcodedConstants.LS_TCP_INPUT_PORT, "Logstash TCP input");
+ }
+
+ private static void waitForSocketBinding(final int port, final String name) {
+ LOGGER.info(LOG_PREFIX + "verifying socket binding at port {} for {}", port, name);
+ await("socket binding at port " + port)
+ .pollDelay(100, TimeUnit.MILLISECONDS)
+ .atMost(1, TimeUnit.MINUTES)
+ .untilAsserted(() -> {
+ try (final Socket socket = new Socket(MavenHardcodedConstants.HOST_NAME, port)) {
+ assertThat(socket.isConnected()).isTrue();
+ }
+ });
}
@BeforeEach
void deleteIndex() throws IOException {
- LOGGER.info("deleting the ES index");
+ LOGGER.info(LOG_PREFIX + "deleting the ES index");
try {
DeleteIndexResponse deleteIndexResponse = ES_CLIENT
.indices()
.delete(DeleteIndexRequest.of(builder -> builder.index(MavenHardcodedConstants.ES_INDEX_NAME)));
- Assertions.assertThat(deleteIndexResponse.acknowledged()).isTrue();
+ assertThat(deleteIndexResponse.acknowledged()).isTrue();
} catch (ElasticsearchException error) {
if (!error.getMessage().contains("index_not_found_exception")) {
throw new RuntimeException(error);
@@ -209,15 +252,15 @@ private static void testEvents(final List logEvents) throws IOExceptio
try {
// Append events.
- LOGGER.info("appending events");
+ LOGGER.info(LOG_PREFIX + "appending events");
logEvents.forEach(appender::append);
- LOGGER.info("completed appending events");
+ LOGGER.info(LOG_PREFIX + "completed appending events");
// Wait all messages to arrive.
- Awaitility.await()
+ await("message delivery")
.atMost(Duration.ofSeconds(60))
.pollDelay(Duration.ofSeconds(2))
- .until(() -> checkDocumentCount(LOG_EVENT_COUNT));
+ .untilAsserted(() -> assertDocumentCount(LOG_EVENT_COUNT));
// Verify indexed messages.
final Set expectedMessages = logEvents.stream()
@@ -227,7 +270,7 @@ private static void testEvents(final List logEvents) throws IOExceptio
.map(source -> (String) source.get(ES_INDEX_MESSAGE_FIELD_NAME))
.filter(Objects::nonNull)
.collect(Collectors.toSet());
- Assertions.assertThat(actualMessages).isEqualTo(expectedMessages);
+ assertThat(actualMessages).isEqualTo(expectedMessages);
} finally {
appender.stop();
@@ -280,16 +323,16 @@ void test_newlines() throws IOException {
try {
// Append the event.
- LOGGER.info("appending events");
+ LOGGER.info(LOG_PREFIX + "appending events");
appender.append(logEvent1);
appender.append(logEvent2);
- LOGGER.info("completed appending events");
+ LOGGER.info(LOG_PREFIX + "completed appending events");
// Wait the message to arrive.
- Awaitility.await()
+ await("message delivery")
.atMost(Duration.ofSeconds(60))
.pollDelay(Duration.ofSeconds(2))
- .until(() -> checkDocumentCount(2));
+ .untilAsserted(() -> assertDocumentCount(2));
// Verify indexed messages.
final Set expectedMessages = Stream.of(logEvent1, logEvent2)
@@ -299,7 +342,7 @@ void test_newlines() throws IOException {
.map(source -> (String) source.get(ES_INDEX_MESSAGE_FIELD_NAME))
.filter(Objects::nonNull)
.collect(Collectors.toSet());
- Assertions.assertThat(actualMessages).isEqualTo(expectedMessages);
+ assertThat(actualMessages).isEqualTo(expectedMessages);
} finally {
appender.stop();
@@ -336,7 +379,7 @@ void test_GelfLayout() throws IOException {
Collections.emptySet());
// Compare persisted sources.
- Assertions.assertThat(actualSourceByKey).isEqualTo(expectedSourceByKey);
+ assertThat(actualSourceByKey).isEqualTo(expectedSourceByKey);
}
@Test
@@ -370,7 +413,7 @@ void test_EcsLayout() throws IOException {
excludedKeys);
// Compare persisted sources.
- Assertions.assertThat(actualSourceByKey).isEqualTo(expectedSourceByKey);
+ assertThat(actualSourceByKey).isEqualTo(expectedSourceByKey);
}
private static Map appendAndCollect(
@@ -384,15 +427,15 @@ private static Map appendAndCollect(
try {
// Append the event.
- LOGGER.info("appending events");
+ LOGGER.info(LOG_PREFIX + "appending events");
logEvents.forEach(appender::append);
- LOGGER.info("completed appending events");
+ LOGGER.info(LOG_PREFIX + "completed appending events");
// Wait the message to arrive.
- Awaitility.await()
+ await("message delivery")
.atMost(Duration.ofSeconds(60))
.pollDelay(Duration.ofSeconds(2))
- .until(() -> checkDocumentCount(LOG_EVENT_COUNT));
+ .untilAsserted(() -> assertDocumentCount(LOG_EVENT_COUNT));
// Retrieve the persisted messages.
return queryDocuments().stream().collect(Collectors.toMap(keyMapper, (final Map source) -> {
@@ -406,10 +449,10 @@ private static Map appendAndCollect(
}
private static SocketAppender createStartedAppender(final Layout> layout, final int port) {
- LOGGER.info("creating the appender");
+ LOGGER.info(LOG_PREFIX + "creating the appender");
final SocketAppender appender = SocketAppender.newBuilder()
.setConfiguration(CONFIGURATION)
- .setHost(HOST_NAME)
+ .setHost(MavenHardcodedConstants.HOST_NAME)
.setPort(port)
.setReconnectDelayMillis(100)
.setName("LogstashItAppender")
@@ -422,12 +465,27 @@ private static SocketAppender createStartedAppender(final Layout> layout, fina
return appender;
}
- private static boolean checkDocumentCount(int expectedCount) throws IOException {
- final CountResponse countResponse =
- ES_CLIENT.count(builder -> builder.index(MavenHardcodedConstants.ES_INDEX_NAME));
+ private static void assertDocumentCount(final int expectedCount) throws IOException {
+ final CountResponse countResponse;
+ try {
+ countResponse = ES_CLIENT.count(builder -> builder.index(MavenHardcodedConstants.ES_INDEX_NAME));
+ }
+ // Try to enrich the failure with the available list of indices
+ catch (final ElasticsearchException error) {
+ try {
+ if (error.getMessage().contains("index_not_found_exception")) {
+ final Set indexNames =
+ ES_CLIENT.cluster().health().indices().keySet();
+ final String message = String.format("Could not find index! Available index names: %s", indexNames);
+ throw new AssertionError(message, error);
+ }
+ } catch (final Exception suppressed) {
+ error.addSuppressed(suppressed);
+ }
+ throw error;
+ }
final long actualCount = countResponse.count();
- Assertions.assertThat(actualCount).isLessThanOrEqualTo(expectedCount);
- return actualCount == expectedCount;
+ assertThat(actualCount).isEqualTo(expectedCount);
}
private static List