Skip to content

GH-1781: Configure ObjectMapper in JsonSerializer/JsonDeserializer #1782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ subprojects { subproject ->
exclude group: 'org.hamcrest'
}
testImplementation "org.hamcrest:hamcrest-core:$hamcrestVersion"
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
optionalApi "org.assertj:assertj-core:$assertjVersion"
}

Expand Down
44 changes: 44 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3795,6 +3795,7 @@ Also since version 2.7 `ConsumerPartitionPausedEvent` and `ConsumerPartitionResu
[[serdes]]
==== Serialization, Deserialization, and Message Conversion

[[serdes-overview]]
===== Overview

Apache Kafka provides a high-level API for serializing and deserializing record values as well as their keys.
Expand Down Expand Up @@ -3920,6 +3921,49 @@ They have no effect if you have provided `Serializer` and `Deserializer` instanc
Starting with version 2.2, the type information headers (if added by the serializer) are removed by the deserializer.
You can revert to the previous behavior by setting the `removeTypeHeaders` property to `false`, either directly on the deserializer or with the configuration property described earlier.

[[serdes-customize-object-mapper]]
====== Customize the `ObjectMapper`

Occasionally, the only way to configure the way information is serialized is by customizing the `ObjectMapper` used in (de)serialization.
When instantiated by Apache Kafka, the Spring JSON Serializer and Deserializer use a locally declared `ObjectMapper`.
The only way to customize the `ObjectMapper` was to inject instances into the producer and consumer factories as discussed <<serdes-overview,here>> rather than by using properties.
Since version 2.5.13, a new interface `ObjectMapperCustomizer` has been introduced and implemented by the `JsonSerializer<T>` and `JsonDeserializer<T>` classes.
This makes it simpler to configure the `ObjectMapper` without losing the convenience of the Spring property configuration capabilities.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested rewording - the Apache Foundation require us to mention Apache whenever we reference Kafka.

$ git diff
diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc
index 2b1cd6a2..faa0a1e1 100644
--- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc
+++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc
@@ -3795,6 +3795,7 @@ Also since version 2.7 `ConsumerPartitionPausedEvent` and `ConsumerPartitionResu
 [[serdes]]
 ==== Serialization, Deserialization, and Message Conversion
 
+[[serdes-overview]]
 ===== Overview
 
 Apache Kafka provides a high-level API for serializing and deserializing record values as well as their keys.
@@ -3924,8 +3925,9 @@ You can revert to the previous behavior by setting the `removeTypeHeaders` prope
 ====== Customize the ObjectMapper
 
 Occasionally, the only way to configure the way information is serialized is by customizing the ObjectMapper used in (de)serialization.
-Because Kafka uses its own ObjectMapper separate from the normal Spring-Boot ObjectMapper, customizing the ObjectMapper typically involved (re)implementing various Kafka factories.
-Since 2.7.1, a new interface `ObjectMapperCustomizer` has been introduced and implemented by the `JsonSerializer<T>` and `JsonDeserializer<T>` classes.
+When instantiated by Apache Kafka, the Spring JSON Serializer and Deserializer use a locally declared `ObjectMapper`.
+The only way to customize the `ObjectMapper` was to inject instances into the producer and consumer factories as discussed <<serdes-overview,here>> rather than by using properties.
+Since version 2.5.13, a new interface `ObjectMapperCustomizer` has been introduced and implemented by the `JsonSerializer<T>` and `JsonDeserializer<T>` classes.
 This makes it simpler to configure the `ObjectMapper` without losing the convenience of the Spring property configuration capabilities.
 
 The following is an example of how to provide a simple customized serializer class (derived from `JsonSerializer<T>`) to do the configuration.
@@ -3958,7 +3960,7 @@ public class CustomJsonSerializer<T> extends JsonSerializer<T> {
 ----
 ====
 
-Then in your properties file, configure your `CustomJsonSerializer<T>` class as the `value-serializer`:
+Then in your Spring Boot properties file, for example, configure your `CustomJsonSerializer<T>` class as the `value-serializer`:
 
 ====
 [source, properties]

The following is an example of how to provide a simple customized serializer class (derived from `JsonSerializer<T>`) to do the configuration.

====
[source, java]
----
public class CustomJsonSerializer<T> extends JsonSerializer<T> {

/**
* constructor for custom json serializer.
*/
public CustomJsonSerializer() {
super();
}

@Override
public ObjectMapper customizeObjectMapper(ObjectMapper mapper) {
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return mapper;
}
}
----
====

Then in your Spring Boot properties file, for example, configure your `CustomJsonSerializer<T>` class as the `value-serializer`:

====
[source, properties]
----
spring.kafka.producer.value-serializer=com.example.CustomJsonSerializer
----
====

The same general pattern can be applied to create and configure a `CustomJsonDeserializer<T>` class to configure how Json data is deserialized.

[[serdes-mapping-types]]
====== Mapping Types

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
* @author Torsten Schleede
* @author Ivan Ponomarev
*/
public class JsonDeserializer<T> implements Deserializer<T> {
public class JsonDeserializer<T> implements Deserializer<T>, ObjectMapperCustomizer {

private static final String KEY_DEFAULT_TYPE_STRING = "spring.json.key.default.type";

Expand Down Expand Up @@ -114,7 +114,7 @@ public class JsonDeserializer<T> implements Deserializer<T> {

protected Jackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); // NOSONAR

private ObjectReader reader;
protected CachedObjectReader cachedReader;

private boolean typeMapperExplicitlySet = false;

Expand Down Expand Up @@ -244,6 +244,7 @@ public JsonDeserializer(@Nullable Class<? super T> targetType, ObjectMapper obje

Assert.notNull(objectMapper, "'objectMapper' must not be null.");
this.objectMapper = objectMapper;
this.cachedReader = new CachedObjectReader();
JavaType javaType = null;
if (targetType == null) {
Class<?> genericType = ResolvableType.forClass(getClass()).getSuperType().resolveGeneric(0);
Expand Down Expand Up @@ -288,6 +289,7 @@ public JsonDeserializer(@Nullable JavaType targetType, ObjectMapper objectMapper

Assert.notNull(objectMapper, "'objectMapper' must not be null.");
this.objectMapper = objectMapper;
this.cachedReader = new CachedObjectReader();
initialize(targetType, useHeadersIfPresent);
}

Expand Down Expand Up @@ -369,6 +371,7 @@ public void setTypeResolver(JsonTypeResolver typeResolver) {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
customizeObjectMapper(configs, this.objectMapper);
setUseTypeMapperForKey(isKey);
setUpTypePrecedence(configs);
setupTarget(configs, isKey);
Expand Down Expand Up @@ -451,10 +454,6 @@ private void initialize(@Nullable JavaType type, boolean useHeadersIfPresent) {
Assert.isTrue(this.targetType != null || useHeadersIfPresent,
"'targetType' cannot be null if 'useHeadersIfPresent' is false");

if (this.targetType != null) {
this.reader = this.objectMapper.readerFor(this.targetType);
}

addTargetPackageToTrusted();
this.typeMapper.setTypePrecedence(useHeadersIfPresent ? TypePrecedence.TYPE_ID : TypePrecedence.INFERRED);
}
Expand Down Expand Up @@ -527,7 +526,7 @@ public T deserialize(String topic, Headers headers, byte[] data) {
this.typeMapper.removeHeaders(headers);
}
if (deserReader == null) {
deserReader = this.reader;
deserReader = this.cachedReader.getReader(this.objectMapper, this.targetType);
}
Assert.state(deserReader != null, "No type information in headers and no default type provided");
try {
Expand All @@ -544,7 +543,7 @@ public T deserialize(String topic, @Nullable byte[] data) {
if (data == null) {
return null;
}
ObjectReader localReader = this.reader;
ObjectReader localReader = this.cachedReader.getReader(this.objectMapper, this.targetType);
if (this.typeResolver != null) {
JavaType javaType = this.typeResolver.resolveType(topic, data, null);
if (javaType != null) {
Expand Down Expand Up @@ -718,4 +717,27 @@ private JsonTypeResolver buildTypeResolver(String methodProperty) {
};
}

protected class CachedObjectReader {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?
Why just regular volatile property which we set first time call in that deserialize() not enough for us?
Looks like a this.objectMapper and this.targetType always produce the same state, so we even don't need to worry about synchronization.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problems are:

  • can't create ObjectWriter in ctor, because there's no way to provide customization of ObjectMapper in ctor
  • can't create ObjectWriter in configure() function, because unit tests indicate it is not guaranteed that configure() will get called
  • construct ObjectWriter at point of need, and possibly suffer performance issues

Therefore making no assumptions about how costly it is to construct an ObjectWriter. Considering that the current serializer constructs the ObjectWriter just once in the constructor, I took the conservative approach and tried to at least cache the object so that if conditions are right it would still only get constructed once. On the other hand, hashcode does have a cost, so if you don't think it matters to performance, I can pull it out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Probably you just didn't see through my suggestion:

private volatile ObjectReader objectReader;
...
public T deserialize(...) {
    if (this.objectReader == null) {
		this.objectReader = this.objectMapper.readerFor(this.targetType);
	}
}

Does it make sense?
Technically your caching is just the same, but without volatile.
And that's why I don't see a reason in that extra instance if we just can deal with volatile and plain if().

Sorry for not being clear enough from the beginning.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is certainly simpler, but I'm not familiar enough with all the supported code flows to decide if it's sufficient or not.
Are there any flows where a serializer might get reconfigured? With this code, once the ObjectReader is constructed, changes to the ObjectMapper will not have any effect on the serialization (which was the original reason for the PR). If that's an acceptable constraint, I'll make the change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so: you call customizeObjectMapper() from the configure(Map<String, ?> configs, boolean isKey) which is a Kafka Client API, which is called once from Kafka Client instance, e.g.:

 if (valueSerializer == null) {
                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                                           Serializer.class);
                this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
            } else {
                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = valueSerializer;
            }

And you see: they also set config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);, which means do not look into that property any more. So, any changes to the ObjectMapper at runtime are not going to be visible anyway.

Sounds reasonable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the flip side, changes to the ObjectMapper may or may not be reflected in the hashcode. If not, then the CachedObjectReader won't work any better (or worse) than a volatile.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will update.


private ObjectReader reader = null;
private int mapperHash = -1;
private int typeHash = -1;

public ObjectReader getReader(ObjectMapper mapper, JavaType type) {
if (null == mapper) {
return null;
}
// If not already cached
if ((null == this.reader) ||
(mapper.hashCode() != this.mapperHash) ||
((null == type && -1 != this.typeHash) ||
((null != type) && (type.hashCode() != this.typeHash)))) {
this.mapperHash = mapper.hashCode();
this.typeHash = (null == type) ? -1 : type.hashCode();
this.reader = mapper.readerFor(type);
}
return this.reader;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* @author Gary Russell
* @author Elliot Kennedy
*/
public class JsonSerializer<T> implements Serializer<T> {
public class JsonSerializer<T> implements Serializer<T>, ObjectMapperCustomizer {

/**
* Kafka config property for disabling adding type headers.
Expand All @@ -66,7 +66,9 @@ public class JsonSerializer<T> implements Serializer<T> {

protected boolean addTypeInfo = true; // NOSONAR

private ObjectWriter writer;
protected CachedObjectWriter cachedWriter;

private JavaType targetType;

protected Jackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); // NOSONAR

Expand All @@ -91,7 +93,8 @@ public JsonSerializer(TypeReference<? super T> targetType, ObjectMapper objectMa
public JsonSerializer(JavaType targetType, ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "'objectMapper' must not be null.");
this.objectMapper = objectMapper;
this.writer = objectMapper.writerFor(targetType);
this.targetType = targetType;
this.cachedWriter = new CachedObjectWriter();
}

public boolean isAddTypeInfo() {
Expand Down Expand Up @@ -136,6 +139,7 @@ public void setUseTypeMapperForKey(boolean isKey) {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
customizeObjectMapper(configs, this.objectMapper);
setUseTypeMapperForKey(isKey);
if (configs.containsKey(ADD_TYPE_INFO_HEADERS)) {
Object config = configs.get(ADD_TYPE_INFO_HEADERS);
Expand Down Expand Up @@ -192,7 +196,9 @@ public byte[] serialize(String topic, @Nullable T data) {
return null;
}
try {
return this.writer.writeValueAsBytes(data);
ObjectWriter writer = this.cachedWriter.getWriter(this.objectMapper, this.targetType);
Assert.state(writer != null, "No type information provided");
return writer.writeValueAsBytes(data);
}
catch (IOException ex) {
throw new SerializationException("Can't serialize data [" + data + "] for topic [" + topic + "]", ex);
Expand Down Expand Up @@ -278,4 +284,28 @@ public JsonSerializer<T> typeMapper(Jackson2JavaTypeMapper mapper) {
return this;
}

protected class CachedObjectWriter {

private ObjectWriter writer = null;
private int mapperHash = -1;
private int typeHash = -1;

public ObjectWriter getWriter(ObjectMapper mapper, JavaType type) {
if (null == mapper) {
return null;
}

// If not already cached
if ((null == this.writer) ||
(mapper.hashCode() != this.mapperHash) ||
((null == type && -1 != this.typeHash) ||
((null != type) && (type.hashCode() != this.typeHash)))) {
this.mapperHash = mapper.hashCode();
this.typeHash = (null == type) ? -1 : type.hashCode();
this.writer = mapper.writerFor(type);
}
return this.writer;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.support.serializer;

import java.util.Map;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
* Provide hook to configure the ObjectMapper used in Json(De)Serializer.
*
* This will get called from the JsonSerializer.configure() function.
* @author Carl Nygard
* @since 2.5.13
*
*/
public interface ObjectMapperCustomizer {

/**
* Customize (or replace) the ObjectMapper used in serialization.
* @param configs map of configuration values
* @param objectMapper configurable (or replaceable) ObjectMapper instance
* @return the configured (or possibly new) ObjectMapper instance.
* @since 2.5.13
*/
default ObjectMapper customizeObjectMapper(Map<String, ?> configs, ObjectMapper objectMapper) {
// by default, we do nothing
return objectMapper;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.support.serializer;

import java.util.Map;

import org.springframework.lang.Nullable;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;

public class CustomJsonDeserializer<T> extends JsonDeserializer<T> {

/**
* constructor for custom json serializer.
*/
public CustomJsonDeserializer() {
super();
}
public CustomJsonDeserializer(@Nullable JavaType targetType) {
super(targetType);
}
public CustomJsonDeserializer(@Nullable Class<? super T> targetType) {
super(targetType);
}
public CustomJsonDeserializer(@Nullable TypeReference<? super T> targetType) {
super(targetType);
}

@Override
public ObjectMapper customizeObjectMapper(Map<String, ?> configs, ObjectMapper mapper) {
mapper.disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE);
return mapper;
}

}
Loading