Skip to content

Commit 83d4369

Browse files
Merge branch 'refs/heads/main' into lauren-wiebenga-s360
2 parents abdb1ac + 8b66fd2 commit 83d4369

File tree

3 files changed

+104
-10
lines changed

3 files changed

+104
-10
lines changed

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ private FunctionInvocationWrapper invocationWrapperInstance(String functionDefin
394394
@SuppressWarnings("rawtypes")
395395
public class FunctionInvocationWrapper implements Function<Object, Object>, Consumer<Object>, Supplier<Object>, Runnable {
396396

397-
private final Object target;
397+
private Object target;
398398

399399
private Type inputType;
400400

@@ -658,13 +658,21 @@ public <V> Function<Object, V> andThen(Function<? super Object, ? extends V> aft
658658
|| FunctionTypeUtils.isMultipleArgumentType(((FunctionInvocationWrapper) after).outputType)) {
659659
throw new UnsupportedOperationException("Composition of functions with multiple arguments is not supported at the moment");
660660
}
661+
FunctionInvocationWrapper afterWrapper = (FunctionInvocationWrapper) after;
662+
663+
//see GH-1141 for this code snippet
664+
if ((this.getTarget() instanceof Supplier || this.getTarget() instanceof Function) && FunctionTypeUtils.isPublisher(this.getOutputType())
665+
&& afterWrapper.getTarget() instanceof Consumer && !FunctionTypeUtils.isPublisher(afterWrapper.getInputType())) {
666+
Consumer wrapper = new ConsumerWrapper((Consumer) afterWrapper.getTarget());
667+
afterWrapper.target = wrapper;
668+
afterWrapper.inputType = this.outputType;
669+
}
670+
//
661671

662672
this.setSkipOutputConversion(true);
663673
((FunctionInvocationWrapper) after).setSkipOutputConversion(true);
664674
Function rawComposedFunction = v -> ((FunctionInvocationWrapper) after).doApply(doApply(v));
665675

666-
FunctionInvocationWrapper afterWrapper = (FunctionInvocationWrapper) after;
667-
668676
Type composedFunctionType;
669677
if (afterWrapper.outputType == null) {
670678
composedFunctionType = (this.inputType == null) ?
@@ -1551,4 +1559,20 @@ public Object apply(Object t) {
15511559
return t;
15521560
}
15531561
}
1562+
1563+
@SuppressWarnings({ "unchecked", "rawtypes" })
1564+
private static class ConsumerWrapper implements Consumer<Flux<Object>> {
1565+
1566+
private final Consumer targetConsumer;
1567+
1568+
ConsumerWrapper(Consumer targetConsumer) {
1569+
this.targetConsumer = targetConsumer;
1570+
}
1571+
1572+
@Override
1573+
public void accept(Flux messageFlux) {
1574+
messageFlux.doOnNext(this.targetConsumer).subscribe();
1575+
}
1576+
1577+
}
15541578
}

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.function.Supplier;
2525
import java.util.stream.Collectors;
2626

27+
import com.fasterxml.jackson.databind.DeserializationFeature;
2728
import com.fasterxml.jackson.databind.ObjectMapper;
2829
import com.fasterxml.jackson.databind.SerializationFeature;
2930
import com.google.gson.Gson;
@@ -213,14 +214,9 @@ private JsonMapper gson(ApplicationContext context) {
213214
}
214215

215216
private JsonMapper jackson(ApplicationContext context) {
216-
ObjectMapper mapper;
217-
try {
218-
mapper = context.getBean(ObjectMapper.class);
219-
}
220-
catch (Exception e) {
221-
mapper = new ObjectMapper();
222-
}
217+
ObjectMapper mapper = new ObjectMapper();
223218
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
219+
mapper.configure(DeserializationFeature.FAIL_ON_TRAILING_TOKENS, true);
224220
return new JacksonMapper(mapper);
225221
}
226222
}

spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,37 @@ public void testCompositionWithNonExistingFunction() throws Exception {
140140
assertThat(registration.getNames().iterator().next()).isEqualTo("echo1");
141141
}
142142

143+
@Test
144+
public void testCompositionReactiveSupplierWithImplicitConsumer() throws Exception {
145+
FunctionCatalog catalog = this.configureCatalog(CompositionReactiveSupplierWithConsumer.class);
146+
FunctionInvocationWrapper function = catalog.lookup("supplyPrimitive|consume");
147+
function.apply(null);
148+
assertThat(CompositionReactiveSupplierWithConsumer.results.size()).isEqualTo(2);
149+
assertThat(CompositionReactiveSupplierWithConsumer.results.get(0)).isEqualTo(1);
150+
assertThat(CompositionReactiveSupplierWithConsumer.results.get(1)).isEqualTo(2);
151+
CompositionReactiveSupplierWithConsumer.results.clear();
152+
153+
function = catalog.lookup("supplyMessage|consume");
154+
function.apply(null);
155+
assertThat(CompositionReactiveSupplierWithConsumer.results.size()).isEqualTo(2);
156+
assertThat(CompositionReactiveSupplierWithConsumer.results.get(0)).isEqualTo(1);
157+
assertThat(CompositionReactiveSupplierWithConsumer.results.get(1)).isEqualTo(2);
158+
CompositionReactiveSupplierWithConsumer.results.clear();
159+
160+
function = catalog.lookup("functionMessage|consume");
161+
function.apply(Flux.fromArray(new Message[] {MessageBuilder.withPayload("ricky").build(), MessageBuilder.withPayload("bubbles").build()}));
162+
assertThat(CompositionReactiveSupplierWithConsumer.results.size()).isEqualTo(2);
163+
assertThat(CompositionReactiveSupplierWithConsumer.results.get(0)).isEqualTo("RICKY");
164+
assertThat(CompositionReactiveSupplierWithConsumer.results.get(1)).isEqualTo("BUBBLES");
165+
CompositionReactiveSupplierWithConsumer.results.clear();
166+
167+
function = catalog.lookup("functionPrimitive|consume");
168+
function.apply(Flux.fromArray(new String[] {"ricky", "bubbles"}));
169+
assertThat(CompositionReactiveSupplierWithConsumer.results.size()).isEqualTo(2);
170+
assertThat(CompositionReactiveSupplierWithConsumer.results.get(0)).isEqualTo("RICKY");
171+
assertThat(CompositionReactiveSupplierWithConsumer.results.get(1)).isEqualTo("BUBBLES");
172+
}
173+
143174
@SuppressWarnings({ "rawtypes", "unchecked" })
144175
@Test
145176
public void testMessageWithArrayAsPayload() throws Exception {
@@ -1539,6 +1570,49 @@ public Function<String, String> echo2() {
15391570
}
15401571
}
15411572

1573+
@EnableAutoConfiguration
1574+
@Configuration // s-c-f-1141
1575+
@SuppressWarnings({"unchecked", "rawtypes"})
1576+
public static class CompositionReactiveSupplierWithConsumer {
1577+
private static List results = new ArrayList<>();
1578+
1579+
@Bean
1580+
public Function<Flux<String>, Flux<String>> functionPrimitive() {
1581+
return flux -> flux.map(v -> v.toUpperCase());
1582+
}
1583+
1584+
@Bean
1585+
public Function<Flux<Message<String>>, Flux<Message<String>>> functionMessage() {
1586+
return flux -> flux.map(v -> MessageBuilder.withPayload(v.getPayload().toUpperCase()).build());
1587+
}
1588+
1589+
@Bean
1590+
public Supplier<Flux<Message<Integer>>> supplyMessage() {
1591+
return () -> {
1592+
return Flux.fromArray(
1593+
new Message[] { MessageBuilder.withPayload(1).build(), MessageBuilder.withPayload(2).build() });
1594+
};
1595+
}
1596+
1597+
@Bean
1598+
public Supplier<Flux<Integer>> supplyPrimitive() {
1599+
return () -> {
1600+
return Flux.fromArray(
1601+
new Integer[] { 1, 2});
1602+
};
1603+
}
1604+
1605+
@Bean
1606+
public Consumer consume() {
1607+
return v -> {
1608+
if (v instanceof Message vMessage) {
1609+
v = vMessage.getPayload();
1610+
}
1611+
results.add(v);
1612+
};
1613+
}
1614+
}
1615+
15421616
@EnableAutoConfiguration
15431617
@Configuration
15441618
public static class MessageWithArrayAsPayload {

0 commit comments

Comments
 (0)