Skip to content

Make it easier to configure the JsonDeserializer ObjectMapper without losing the JsonDeserializer configuratin loaded from application.yaml #1703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jbx1 opened this issue Feb 8, 2021 · 16 comments
Labels

Comments

@jbx1
Copy link

jbx1 commented Feb 8, 2021

Currently it is very cumbersome to configure the ObjectMapper of the JsonDeserializer. One would incorrectly assume that the ObjectMapper configuration set through application.yaml such as the following would work:

spring.jackson:
    deserialization.ADJUST_DATES_TO_CONTEXT_TIME_ZONE: false

But it doesn't, for some reason the JsonDeserializer uses its own ObjectMapper ignoring the one configured in the rest of the application. More details seem to be here #680

The only alternative here, as indicated in the documentation, is to define your own JsonDeserializer bean, like this:

@Bean 
Deserializer jsonDeserializer(ObjectMapper objectMapper) {
    return new JsonDeserializer(objectMapper);
}

However this means that the configuration passed through the application.yaml for JsonDeserializer is now lost.

After lots of searching and seeking help here: https://stackoverflow.com/questions/66061869/how-to-make-spring-kafka-jsondeserializer-retain-the-timezone-offset-when-deseri/

thanks to @artembilan the only solution to this is to do something like the following:

  @Bean
  DefaultKafkaConsumerFactory kafkaConsumerFactory(KafkaProperties properties, ObjectMapper objectMapper) {
    Map<String, Object> consumerProperties = properties.buildConsumerProperties();
    JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>(objectMapper);
    jsonDeserializer.configure(consumerProperties, false);

    return new DefaultKafkaConsumerFactory(consumerProperties,
            new StringDeserializer(), jsonDeserializer);
  }

For some reason the following typesafe way does not work either, which makes things even more confusing:

  @Bean
  ConsumerFactory<String, Object> kafkaConsumerFactory(
      KafkaProperties properties, ObjectMapper objectMapper) {

    Map<String, Object> configs = properties.buildConsumerProperties();
    JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>(objectMapper);
    jsonDeserializer.configure(configs, false);

    return new DefaultKafkaConsumerFactory<>(
        properties.buildConsumerProperties(), new StringDeserializer(), jsonDeserializer);
  }

This is not intuitive at all, and difficult to come up with on your own without knowing the internals (apart from the ugly Object generic type).

If it is not possible to use the same ObjectMapper used in the spring context, maybe the right properties for the ObjectMapper can be passed through the spring.kafka.consumer.properties just like the spring.json.value.default.type.

@artembilan
Copy link
Member

I think this one has to be handled on Spring Boot side: when we have a property like spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer (or the same for key), we just ignore such a property and auto-configure a JsonDeserializer instance (not forgetting to call configure()) and inject it into an auto-configured DefaultKafkaConsumerFactory. This way we would honor an dependency management from Spring Boot, a configuration in properties and will follow one of the main Spring Boot goal - less boilerplate code for infrastructure!

I mean the issue must go to Spring Boot, but that's my vision how to be.
Let's see what other my teammates say on the matter!

@garyrussell
Copy link
Contributor

  ConsumerFactory<String, Object> kafkaConsumerFactory(
      KafkaProperties properties, ObjectMapper objectMapper) {

The Boot team doesn't consider KafkaProperties as a public API and discourage its use in user code.

The preferred mechanism is to use something like this...

@Component
class Customizer {

    Customizer(DefaultKafkaConsumerFactory<?, ?> factory, ObjectMapper mapper) {
        factory.setValueDeserializer(...);
    }

}

i.e. customize the auto-configured factory.

But yes, auto-configuration of the serializer/deserializer would have to be done in Boot.

@garyrussell
Copy link
Contributor

In spring-cloud-stream we added this customizer - the binder detects if one of these beans is present and calls it.

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/32939e30fe79e9108a7bde61ed679e727dd8ebc3/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/ClientFactoryCustomizer.java#L29-L37

Perhaps Boot could do something similar.

@jbx1
Copy link
Author

jbx1 commented Feb 18, 2021

  ConsumerFactory<String, Object> kafkaConsumerFactory(
      KafkaProperties properties, ObjectMapper objectMapper) {

The Boot team doesn't consider KafkaProperties as a public API and discourage its use in user code.

The preferred mechanism is to use something like this...

@Component
class Customizer {

    Customizer(DefaultKafkaConsumerFactory<?, ?> factory, ObjectMapper mapper) {
        factory.setValueDeserializer(...);
    }

}

i.e. customize the auto-configured factory.

But yes, auto-configuration of the serializer/deserializer would have to be done in Boot.

Can you point to the right documentation for this? Can't find how to actually get this to work.

@garyrussell
Copy link
Contributor

"Can't find how to actually get this to work." is not very helpful; you should state what issues you saw.

Turns out I was wrong, Boot wires the CF as a CF, not a DKCF (which is a bit surprising because they prefer to narrow types as much as possible).

This is not currently documented, but it's just normal Spring bean wiring. This works...

@SpringBootApplication
public class Kgh1703Application {

	public static void main(String[] args) {
		SpringApplication.run(Kgh1703Application.class, args);
	}

	@KafkaListener(id = "kgh1703", topics = "kgh1703")
	public void listen(String in) {
		System.out.println(in);
	}

}

@Configuration
class CustomizeIt {

	CustomizeIt(ConsumerFactory<Object, Object> cf, ObjectMapper mapper) {
		((DefaultKafkaConsumerFactory<Object, Object>) cf).setValueDeserializer(new JsonDeserializer<>(mapper));
	}

}
2021-02-18 10:17:04.010  INFO 61928 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
        ...
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        ...
	value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer

@artembilan
Copy link
Member

See my comment in the mentioned SO thread:

configure(properties.buildConsumerProperties(), false).

The main point of that question is to keep as much as possible configuration in the YAML file and let Spring Boot to auto-wire properly JsonDeserializer. So, that setValueDeserializer(new JsonDeserializer<>(mapper)) is not enough - the configure() must be called to take props from the YAML.

@garyrussell
Copy link
Contributor

I don't see much value in configuring the deserializer via properties when you can create it directly. Such properties would rarely change and are only done via properties because Kafka instantiates it.

The Boot team discourages direct use of KafkaProperties in user code; they do not guarantee it won't change, even with point releases.

@artembilan
Copy link
Member

Well, this issue is really just an outcome from that SO thread discussion...
I don't tell to use KafkaProperties in the user code, I say that such a feature for JsonDeserializer must be done in Spring Boot auto-configuration for Kafka. Meanwhile what we suggest is just a workaround.

We probably just need to move this issue to Spring Boot since there is really nothing more to do from this project perspective.

@jbx1
Copy link
Author

jbx1 commented Feb 18, 2021

@configuration
class CustomizeIt {

CustomizeIt(ConsumerFactory<Object, Object> cf, ObjectMapper mapper) {
((DefaultKafkaConsumerFactory<Object, Object>) cf).setValueDeserializer(new JsonDeserializer<>(mapper));
}

}

Correct me if I am wrong, but this still doesn't seem to solve the original problem. The JsonDeserializer is still not getting the properties it was originally configured with from the application.yml file. The whole point here is that it might have already been preconfigured with properties, for example the spring.json.value.default.type.

@garyrussell
Copy link
Contributor

That is correct.

I don't see much value in configuring the deserializer via properties when you can create it directly. Such properties would rarely change and are only done via properties because Kafka instantiates it.

@jbx1
Copy link
Author

jbx1 commented Feb 18, 2021

That is correct.

I don't see much value in configuring the deserializer via properties when you can create it directly. Such properties would rarely change and are only done via properties because Kafka instantiates it.

Well that's quite a subjective opinion. If it is not supposed to be the way, then why provide it in the first place. It is going to be confusing for another developer (or even myself after a few months) to try setting a property and it doesn't work because somewhere hidden in the code there is a new bean that is instantiating a new deserializer and not giving any consideration of any other properties that should be set. So essentially the ones listed here: https://docs.spring.io/spring-kafka/api/constant-values.html will be ignored.

Yes, these properties rarely change, but sometimes they do. Case in point this whole thread started because I noticed that my application wasn't deserializing the right timezones, so I naively tried to just set ADJUST_DATES_TO_CONTEXT_TIME_ZONE, expecting it to just work as it usually does in other contexts, but in this case it didn't.

@garyrussell
Copy link
Contributor

Fair point. I don't personally agree with the Boot position (that KafkaProperties should not be accessed by user code), but they asked me to remove any references to it from the project documentation.

This should work for you (and comply with Boot's position)...

@Configuration
class CustomizeIt {

	CustomizeIt(ConsumerFactory<Object, Object> cf, ObjectMapper mapper) {
		JsonDeserializer<Object> valueDeserializer = new JsonDeserializer<>(mapper);
		valueDeserializer.configure(cf.getConfigurationProperties(), false);
		((DefaultKafkaConsumerFactory<Object, Object>) cf).setValueDeserializer(valueDeserializer);
	}

}

@erizzo
Copy link

erizzo commented Aug 26, 2021

In a Spring Cloud Stream (Boot) application I've tried both the @Configuration constructor and a ClientFactoryCustomizer bean, but neither is working. I've verified in a debugger that the constructor is called and the injected ObjectMapper has my custom mix-in. However, the ClientFactoryCustomizer bean is never called.

@Configuration
class KafkaClientConfig {

	KafkaClientConfig(ConsumerFactory<Object, Object> consumerFactory, ObjectMapper mapper) {
		JsonDeserializer<Object> valueDeserializer = new JsonDeserializer<>(mapper);
		valueDeserializer.configure(consumerFactory.getConfigurationProperties(), false);
		((DefaultKafkaConsumerFactory<Object, Object>) consumerFactory).setValueDeserializer(valueDeserializer);
	}

	@Bean
	public ClientFactoryCustomizer kafkaClientCustomizer(ObjectMapper mapper) {
		return new ClientFactoryCustomizer() {
			@Override
			public void configure(ConsumerFactory<?, ?> cf) {
				Deserializer<Object> valueDeserializer = new JsonDeserializer<>(mapper);
				valueDeserializer.configure(cf.getConfigurationProperties(), false);
				((DefaultKafkaConsumerFactory<Object, Object>) cf).setValueDeserializer(valueDeserializer);
			}
		};

	}
}

Based on another discussion thread, I also defined the following bean (an org.apache.kafka class), which is also being called but does not work, either.

    @Bean
    public Deserializer<?> kafkaDeserializer(ObjectMapper objectMapper) {
    	return new JsonDeserializer<>(objectMapper);
    }

None of these resolves the deserialization errors (which are solved by my custom mix-in that I add to the app context's ObjectMapper. I know the mixin works because I added it a while back to resolve the deserialization error in other places in the app (not deserializing messages from Kafka).

@garyrussell
Copy link
Contributor

Don't comment on old, closed, issues; the ClientFactoryCustomizer has nothing to do with this project. However, I just tested it and the customizer is called as expected...

@SpringBootApplication
public class Kgh17031Application {

	public static void main(String[] args) {
		SpringApplication.run(Kgh17031Application.class, args);
	}

	@Bean
	Consumer<String> input() {
		return System.out::println;
	}

	@Bean
	ClientFactoryCustomizer cust() {
		return new ClientFactoryCustomizer() {

			@Override
			public void configure(ConsumerFactory<?, ?> cf) {
				System.out.println("here");
			}

		};
	}

}

If you can't figure out what's wrong, I suggest you ask a question on Stack Overflow, showing a Mimimal, Complete, Reproducible Example that exhibits the behavior you see.

If you can prove it to be a bug, open an issue against the binder, not here.

@hartmut-co-uk
Copy link

hartmut-co-uk commented Feb 9, 2022

@garyrussell what are the preconditions for a default ConsumerFactory bean to be initialised, following the spring.kafka.* auto-config?

No matter what I try, I'm getting
a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
a bean of type 'org.springframework.kafka.core.DefaultKafkaConsumerFactory' that could not be found.

on the contrary, the KafkaProperties can be injected...

sample code:

@SpringBootApplication
class PocKafkaApplication

fun main(args: Array<String>) {
    runApplication<PocKafkaApplication>(*args)
}

@EnableKafka
@Configuration
class KafkaConsumerConfig(val properties: KafkaProperties, val factory: DefaultKafkaConsumerFactory<Any?, Any?>) {

    @Bean
    fun notesConsumerFactory(): ConsumerFactory<String?, Note?> = DefaultKafkaConsumerFactory<String?, Note?>(
//        properties.buildConsumerProperties(), // this works fine
        factory.configurationProperties,  // this is how it should be done???
        StringDeserializer(),
        JsonDeserializer(Note::class.java)
    )

    @Bean
    fun notesKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Note>? {
        val factory: ConcurrentKafkaListenerContainerFactory<String, Note> =
            ConcurrentKafkaListenerContainerFactory<String, Note>()
        factory.consumerFactory = notesConsumerFactory()
        return factory
    }
}

@Service
class NoteConsumer() {

    @KafkaListener(topics = ["notes-1"], containerFactory = "notesKafkaListenerContainerFactory", batch = "true")
    fun notesListener(data: List<ConsumerRecord<String, Note?>>) {
        println(data.size)
        data.mapNotNull { r -> r.value() }
            .forEach { println(it.id) }
    }
}

data class Note(
    val id: Long,
    val text: String,
)

@garyrussell
Copy link
Contributor

Don't ask questions in closed issues; use the Discussions tab (or Stack Overflow).

See KafkaAutoConfiguration; the only condition is @ConditionalOnMissingBean(ConsumerFactory.class); so it's hard to say what's wrong without having an actual (minimal) project to try.

If you run with --debug you should get a log of auto configuration activity.

If you can't figure it out; start a discussion and provide a minimal, complete, reproducible example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants