diff --git a/log4j-layout-template-json-test/pom.xml b/log4j-layout-template-json-test/pom.xml index 7031fd4d80b..51bc8a9660b 100644 --- a/log4j-layout-template-json-test/pom.xml +++ b/log4j-layout-template-json-test/pom.xml @@ -43,6 +43,14 @@ org.apache.logging.log4j.layout.template.json.test org.apache.logging.log4j.core + + 8.15.1 + @@ -85,6 +93,7 @@ co.elastic.clients elasticsearch-java + ${elastic.version} test @@ -129,23 +138,6 @@ - - - org.apache.maven.plugins - maven-failsafe-plugin - - true - - - - - integration-test - verify - - - - - org.apache.maven.plugins maven-surefire-plugin @@ -170,15 +162,29 @@ docker + - false + + linux + + + env.CI + true + - 8.10.2 + + + false + false + - -Xms750m -Xmx750m + -Xms750m -Xmx750m + @@ -188,7 +194,6 @@ io.fabric8 docker-maven-plugin - all true true @@ -199,10 +204,11 @@ single-node false - ${elastic.java-opts} + ${elastic.javaOpts} - 9200:9200 + + localhost:elasticsearch.port:9200 custom @@ -214,7 +220,11 @@ cyan - recovered \[0\] indices into cluster_state + + + 9200 + + @@ -232,11 +242,13 @@ logstash - ${elastic.java-opts} + ${elastic.javaOpts} - 12222:12222 - 12345:12345 + + localhost:logstash.gelf.port:12222 + + localhost:logstash.tcp.port:12345 [LS] @@ -248,54 +260,71 @@ --pipeline.batch.size 1 -e - input { + "logstash" + use_tcp => true + use_udp => false + port => 12222 + type => "gelf" } + + # Documentation: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html tcp { - port => 12345 - codec => json - type => "tcp" + port => 12345 + codec => json + type => "tcp" } + } filter { if [type] == "gelf" { # These are GELF/Syslog logging levels as defined in RFC 3164. - # Map the integer level to its human readable format. + # Map the integer level to its human-readable format. + # Documentation: https://www.elastic.co/guide/en/logstash/current/plugins-filters-translate.html translate { - field => "[level]" - destination => "[levelName]" - dictionary => { - "0" => "EMERG" - "1" => "ALERT" - "2" => "CRITICAL" - "3" => "ERROR" - "4" => "WARN" - "5" => "NOTICE" - "6" => "INFO" - "7" => "DEBUG" + source => "[level]" + target => "[levelName]" + dictionary => { + "0" => "EMERG" + "1" => "ALERT" + "2" => "CRITICAL" + "3" => "ERROR" + "4" => "WARN" + "5" => "NOTICE" + "6" => "INFO" + "7" => "DEBUG" } } } } + # Documentation: https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html output { # (Un)comment for debugging purposes - # stdout { codec => rubydebug } + # stdout { codec => rubydebug } elasticsearch { - hosts => ["http://elasticsearch:9200"] - index => "log4j" + hosts => ["http://elasticsearch:9200"] + index => "log4j" } - } + } + + ]]> - Successfully started Logstash API endpoint + + localhost + + 12222 + 12345 + + @@ -327,6 +356,11 @@ **/*IT.java + + ${elasticsearch.port} + ${logstash.gelf.port} + ${logstash.tcp.port} + diff --git a/log4j-layout-template-json-test/src/test/java/org/apache/logging/log4j/layout/template/json/LogstashIT.java b/log4j-layout-template-json-test/src/test/java/org/apache/logging/log4j/layout/template/json/LogstashIT.java index abb86f8ebc0..17b83629755 100644 --- a/log4j-layout-template-json-test/src/test/java/org/apache/logging/log4j/layout/template/json/LogstashIT.java +++ b/log4j-layout-template-json-test/src/test/java/org/apache/logging/log4j/layout/template/json/LogstashIT.java @@ -16,6 +16,9 @@ */ package org.apache.logging.log4j.layout.template.json; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.ElasticsearchException; import co.elastic.clients.elasticsearch._types.HealthStatus; @@ -32,6 +35,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; +import java.net.Socket; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -41,6 +45,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -53,13 +58,11 @@ import org.apache.logging.log4j.core.config.DefaultConfiguration; import org.apache.logging.log4j.core.impl.Log4jLogEvent; import org.apache.logging.log4j.core.layout.GelfLayout; -import org.apache.logging.log4j.core.util.NetUtils; import org.apache.logging.log4j.layout.template.json.JsonTemplateLayout.EventTemplateAdditionalField; import org.apache.logging.log4j.layout.template.json.util.ThreadLocalRecyclerFactory; import org.apache.logging.log4j.message.SimpleMessage; import org.apache.logging.log4j.status.StatusLogger; -import org.assertj.core.api.Assertions; -import org.awaitility.Awaitility; +import org.apache.logging.log4j.util.Strings; import org.elasticsearch.client.RestClient; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -71,14 +74,14 @@ @Execution(ExecutionMode.SAME_THREAD) class LogstashIT { + private static final String LOG_PREFIX = LogstashIT.class.getSimpleName() + ' '; + private static final StatusLogger LOGGER = StatusLogger.getLogger(); private static final DefaultConfiguration CONFIGURATION = new DefaultConfiguration(); private static final Charset CHARSET = StandardCharsets.UTF_8; - private static final String HOST_NAME = NetUtils.getLocalHostname(); - private static final String SERVICE_NAME = "LogstashIT"; private static final String EVENT_DATASET = SERVICE_NAME + ".log"; @@ -88,7 +91,7 @@ class LogstashIT { .setCharset(CHARSET) .setCompressionType(GelfLayout.CompressionType.OFF) .setIncludeNullDelimiter(true) - .setHost(HOST_NAME) + .setHost(MavenHardcodedConstants.HOST_NAME) .build(); private static final JsonTemplateLayout JSON_TEMPLATE_GELF_LAYOUT = JsonTemplateLayout.newBuilder() @@ -99,13 +102,12 @@ class LogstashIT { .setEventTemplateAdditionalFields(new EventTemplateAdditionalField[] { EventTemplateAdditionalField.newBuilder() .setKey("host") - .setValue(HOST_NAME) + .setValue(MavenHardcodedConstants.HOST_NAME) .build() }) .build(); - // Note that EcsLayout doesn't support charset configuration, though it uses - // UTF-8 internally. + // Note that `EcsLayout` doesn't support charset configuration, though it uses UTF-8 internally. private static final EcsLayout ECS_LAYOUT = EcsLayout.newBuilder() .setConfiguration(CONFIGURATION) .setServiceName(SERVICE_NAME) @@ -140,44 +142,85 @@ class LogstashIT { private static ElasticsearchClient ES_CLIENT; /** - * Constants hardcoded in docker-maven-plugin configuration, do not change! + * Constants hardcoded in `docker-maven-plugin` configuration, do not change! */ private static final class MavenHardcodedConstants { private MavenHardcodedConstants() {} - private static final int LS_GELF_INPUT_PORT = 12222; + private static final String HOST_NAME = "localhost"; - private static final int LS_TCP_INPUT_PORT = 12345; + private static final int LS_GELF_INPUT_PORT = readPort("log4j.logstash.gelf.port"); - private static final int ES_PORT = 9200; + private static final int LS_TCP_INPUT_PORT = readPort("log4j.logstash.tcp.port"); + + private static final int ES_PORT = readPort("log4j.elasticsearch.port"); private static final String ES_INDEX_NAME = "log4j"; + + private static int readPort(final String propertyName) { + final String propertyValue = System.getProperty(propertyName); + final int port; + final String errorMessage = String.format( + "was expecting a valid port number in the system property `%s`, found: `%s`", + propertyName, propertyValue); + try { + if (Strings.isBlank(propertyValue) || (port = Integer.parseInt(propertyValue)) < 0 || port >= 0xFFFF) { + throw new IllegalArgumentException(errorMessage); + } + } catch (final NumberFormatException error) { + throw new IllegalArgumentException(errorMessage, error); + } + return port; + } } @BeforeAll - public static void initClient() throws IOException { + public static void initEsClient() { - LOGGER.info("instantiating the ES client"); - REST_CLIENT = RestClient.builder( - HttpHost.create(String.format("http://%s:%d", HOST_NAME, MavenHardcodedConstants.ES_PORT))) - .build(); + LOGGER.info(LOG_PREFIX + "instantiating the ES client"); + final String hostUri = + String.format("http://%s:%d", MavenHardcodedConstants.HOST_NAME, MavenHardcodedConstants.ES_PORT); + REST_CLIENT = RestClient.builder(HttpHost.create(hostUri)).build(); ES_TRANSPORT = new RestClientTransport(REST_CLIENT, new JacksonJsonpMapper()); ES_CLIENT = new ElasticsearchClient(ES_TRANSPORT); - LOGGER.info("verifying the ES connection"); - HealthResponse healthResponse = ES_CLIENT.cluster().health(); - Assertions.assertThat(healthResponse.status()).isNotEqualTo(HealthStatus.Red); + LOGGER.info(LOG_PREFIX + "verifying the ES connection to `{}`", hostUri); + await("ES cluster health") + .pollDelay(100, TimeUnit.MILLISECONDS) + .atMost(1, TimeUnit.MINUTES) + .untilAsserted(() -> { + final HealthResponse healthResponse = ES_CLIENT.cluster().health(); + assertThat(healthResponse.status()).isNotEqualTo(HealthStatus.Red); + }); + } + + @BeforeAll + public static void waitForLsInputSockets() { + waitForSocketBinding(MavenHardcodedConstants.LS_GELF_INPUT_PORT, "Logstash GELF input"); + waitForSocketBinding(MavenHardcodedConstants.LS_TCP_INPUT_PORT, "Logstash TCP input"); + } + + private static void waitForSocketBinding(final int port, final String name) { + LOGGER.info(LOG_PREFIX + "verifying socket binding at port {} for {}", port, name); + await("socket binding at port " + port) + .pollDelay(100, TimeUnit.MILLISECONDS) + .atMost(1, TimeUnit.MINUTES) + .untilAsserted(() -> { + try (final Socket socket = new Socket(MavenHardcodedConstants.HOST_NAME, port)) { + assertThat(socket.isConnected()).isTrue(); + } + }); } @BeforeEach void deleteIndex() throws IOException { - LOGGER.info("deleting the ES index"); + LOGGER.info(LOG_PREFIX + "deleting the ES index"); try { DeleteIndexResponse deleteIndexResponse = ES_CLIENT .indices() .delete(DeleteIndexRequest.of(builder -> builder.index(MavenHardcodedConstants.ES_INDEX_NAME))); - Assertions.assertThat(deleteIndexResponse.acknowledged()).isTrue(); + assertThat(deleteIndexResponse.acknowledged()).isTrue(); } catch (ElasticsearchException error) { if (!error.getMessage().contains("index_not_found_exception")) { throw new RuntimeException(error); @@ -209,15 +252,15 @@ private static void testEvents(final List logEvents) throws IOExceptio try { // Append events. - LOGGER.info("appending events"); + LOGGER.info(LOG_PREFIX + "appending events"); logEvents.forEach(appender::append); - LOGGER.info("completed appending events"); + LOGGER.info(LOG_PREFIX + "completed appending events"); // Wait all messages to arrive. - Awaitility.await() + await("message delivery") .atMost(Duration.ofSeconds(60)) .pollDelay(Duration.ofSeconds(2)) - .until(() -> checkDocumentCount(LOG_EVENT_COUNT)); + .untilAsserted(() -> assertDocumentCount(LOG_EVENT_COUNT)); // Verify indexed messages. final Set expectedMessages = logEvents.stream() @@ -227,7 +270,7 @@ private static void testEvents(final List logEvents) throws IOExceptio .map(source -> (String) source.get(ES_INDEX_MESSAGE_FIELD_NAME)) .filter(Objects::nonNull) .collect(Collectors.toSet()); - Assertions.assertThat(actualMessages).isEqualTo(expectedMessages); + assertThat(actualMessages).isEqualTo(expectedMessages); } finally { appender.stop(); @@ -280,16 +323,16 @@ void test_newlines() throws IOException { try { // Append the event. - LOGGER.info("appending events"); + LOGGER.info(LOG_PREFIX + "appending events"); appender.append(logEvent1); appender.append(logEvent2); - LOGGER.info("completed appending events"); + LOGGER.info(LOG_PREFIX + "completed appending events"); // Wait the message to arrive. - Awaitility.await() + await("message delivery") .atMost(Duration.ofSeconds(60)) .pollDelay(Duration.ofSeconds(2)) - .until(() -> checkDocumentCount(2)); + .untilAsserted(() -> assertDocumentCount(2)); // Verify indexed messages. final Set expectedMessages = Stream.of(logEvent1, logEvent2) @@ -299,7 +342,7 @@ void test_newlines() throws IOException { .map(source -> (String) source.get(ES_INDEX_MESSAGE_FIELD_NAME)) .filter(Objects::nonNull) .collect(Collectors.toSet()); - Assertions.assertThat(actualMessages).isEqualTo(expectedMessages); + assertThat(actualMessages).isEqualTo(expectedMessages); } finally { appender.stop(); @@ -336,7 +379,7 @@ void test_GelfLayout() throws IOException { Collections.emptySet()); // Compare persisted sources. - Assertions.assertThat(actualSourceByKey).isEqualTo(expectedSourceByKey); + assertThat(actualSourceByKey).isEqualTo(expectedSourceByKey); } @Test @@ -370,7 +413,7 @@ void test_EcsLayout() throws IOException { excludedKeys); // Compare persisted sources. - Assertions.assertThat(actualSourceByKey).isEqualTo(expectedSourceByKey); + assertThat(actualSourceByKey).isEqualTo(expectedSourceByKey); } private static Map appendAndCollect( @@ -384,15 +427,15 @@ private static Map appendAndCollect( try { // Append the event. - LOGGER.info("appending events"); + LOGGER.info(LOG_PREFIX + "appending events"); logEvents.forEach(appender::append); - LOGGER.info("completed appending events"); + LOGGER.info(LOG_PREFIX + "completed appending events"); // Wait the message to arrive. - Awaitility.await() + await("message delivery") .atMost(Duration.ofSeconds(60)) .pollDelay(Duration.ofSeconds(2)) - .until(() -> checkDocumentCount(LOG_EVENT_COUNT)); + .untilAsserted(() -> assertDocumentCount(LOG_EVENT_COUNT)); // Retrieve the persisted messages. return queryDocuments().stream().collect(Collectors.toMap(keyMapper, (final Map source) -> { @@ -406,10 +449,10 @@ private static Map appendAndCollect( } private static SocketAppender createStartedAppender(final Layout layout, final int port) { - LOGGER.info("creating the appender"); + LOGGER.info(LOG_PREFIX + "creating the appender"); final SocketAppender appender = SocketAppender.newBuilder() .setConfiguration(CONFIGURATION) - .setHost(HOST_NAME) + .setHost(MavenHardcodedConstants.HOST_NAME) .setPort(port) .setReconnectDelayMillis(100) .setName("LogstashItAppender") @@ -422,12 +465,27 @@ private static SocketAppender createStartedAppender(final Layout layout, fina return appender; } - private static boolean checkDocumentCount(int expectedCount) throws IOException { - final CountResponse countResponse = - ES_CLIENT.count(builder -> builder.index(MavenHardcodedConstants.ES_INDEX_NAME)); + private static void assertDocumentCount(final int expectedCount) throws IOException { + final CountResponse countResponse; + try { + countResponse = ES_CLIENT.count(builder -> builder.index(MavenHardcodedConstants.ES_INDEX_NAME)); + } + // Try to enrich the failure with the available list of indices + catch (final ElasticsearchException error) { + try { + if (error.getMessage().contains("index_not_found_exception")) { + final Set indexNames = + ES_CLIENT.cluster().health().indices().keySet(); + final String message = String.format("Could not find index! Available index names: %s", indexNames); + throw new AssertionError(message, error); + } + } catch (final Exception suppressed) { + error.addSuppressed(suppressed); + } + throw error; + } final long actualCount = countResponse.count(); - Assertions.assertThat(actualCount).isLessThanOrEqualTo(expectedCount); - return actualCount == expectedCount; + assertThat(actualCount).isEqualTo(expectedCount); } private static List> queryDocuments() throws IOException { @@ -441,7 +499,7 @@ private static List> queryDocuments() throws IOException { return searchResponse.hits().hits().stream() .map(hit -> { @SuppressWarnings("unchecked") - Map source = hit.source(); + final Map source = hit.source(); return source; }) .collect(Collectors.toList()); diff --git a/log4j-parent/pom.xml b/log4j-parent/pom.xml index debb807c3ad..304231465a5 100644 --- a/log4j-parent/pom.xml +++ b/log4j-parent/pom.xml @@ -79,7 +79,6 @@ 1.2.15 3.4.4 - 8.15.1 0.9.0 7.0.5 3.0.22 @@ -382,12 +381,6 @@ ${disruptor.version} - - co.elastic.clients - elasticsearch-java - ${elasticsearch-java.version} - - org.zapodot embedded-ldap-junit