You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I use below sql create source table:
create source table topic1(
uuid int,
data var
) with (
type = 'kafka',
kafka_topic = 'flow_message_test',
"auto.offset.reset" = latest,
kafka_broker = '10.16.98.10:9092',
kafka_group_id = 'test_json_parser',
value_type = 'json'
);
Data are as follows:
{
"uuid": 10455,
"data": ["{"applyid":"1590224987","base_info":"{\"city_id\":201}}"]
}
That sql grammar check pass. running throw exceptions blow:
java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String
at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:33)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:96)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:47)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
The text was updated successfully, but these errors were encountered:
Uh oh!
There was an error while loading. Please reload this page.
I use below sql create source table:
create source table topic1(
uuid int,
data var
) with (
type = 'kafka',
kafka_topic = 'flow_message_test',
"auto.offset.reset" = latest,
kafka_broker = '10.16.98.10:9092',
kafka_group_id = 'test_json_parser',
value_type = 'json'
);
Data are as follows:
{
"uuid": 10455,
"data": ["{"applyid":"1590224987","base_info":"{\"city_id\":201}}"]
}
That sql grammar check pass. running throw exceptions blow:
java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String
at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:33)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:96)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:47)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
The text was updated successfully, but these errors were encountered: