@@ -61,6 +61,8 @@ public class JsonDeserializer<T> : AsyncDeserializer<T, JsonSchema> where T : cl
61
61
62
62
private JsonSchemaValidator validator = new JsonSchemaValidator ( ) ;
63
63
64
+ private Schema schemaJson = null ;
65
+
64
66
private JsonSchema schema = null ;
65
67
66
68
private bool validate = true ;
@@ -142,6 +144,7 @@ public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, Schema schem
142
144
JsonSchemaResolver utils = new JsonSchemaResolver (
143
145
schemaRegistryClient , schema , this . jsonSchemaGeneratorSettings ) ;
144
146
JsonSchema jsonSchema = utils . GetResolvedSchema ( ) . Result ;
147
+ this . schemaJson = schema ;
145
148
this . schema = jsonSchema ;
146
149
}
147
150
@@ -184,8 +187,10 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
184
187
185
188
try
186
189
{
187
- Schema writerSchema = null ;
188
- JsonSchema writerSchemaJson = null ;
190
+ Schema writerSchemaJson = null ;
191
+ JsonSchema writerSchema = null ;
192
+ Schema readerSchemaJson ;
193
+ JsonSchema readerSchema ;
189
194
T value ;
190
195
IList < Migration > migrations = new List < Migration > ( ) ;
191
196
using ( var stream = new MemoryStream ( array ) )
@@ -201,22 +206,34 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
201
206
202
207
if ( schemaRegistryClient != null )
203
208
{
204
- ( writerSchema , writerSchemaJson ) = await GetSchema ( subject , writerId ) ;
209
+ ( writerSchemaJson , writerSchema ) = await GetSchema ( subject , writerId ) ;
205
210
if ( subject == null )
206
211
{
207
- subject = GetSubjectName ( topic , isKey , writerSchemaJson . Title ) ;
212
+ subject = GetSubjectName ( topic , isKey , writerSchema . Title ) ;
208
213
if ( subject != null )
209
214
{
210
215
latestSchema = await GetReaderSchema ( subject )
211
216
. ConfigureAwait ( continueOnCapturedContext : false ) ;
212
217
}
213
218
}
214
219
}
215
-
220
+
216
221
if ( latestSchema != null )
217
222
{
218
- migrations = await GetMigrations ( subject , writerSchema , latestSchema )
223
+ migrations = await GetMigrations ( subject , writerSchemaJson , latestSchema )
219
224
. ConfigureAwait ( continueOnCapturedContext : false ) ;
225
+ readerSchemaJson = latestSchema ;
226
+ readerSchema = await GetParsedSchema ( latestSchema ) ;
227
+ }
228
+ else if ( schema != null )
229
+ {
230
+ readerSchemaJson = schemaJson ;
231
+ readerSchema = schema ;
232
+ }
233
+ else
234
+ {
235
+ readerSchemaJson = writerSchemaJson ;
236
+ readerSchema = writerSchema ;
220
237
}
221
238
222
239
if ( migrations . Count > 0 )
@@ -231,9 +248,9 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
231
248
. ContinueWith ( t => ( JToken ) t . Result )
232
249
. ConfigureAwait ( continueOnCapturedContext : false ) ;
233
250
234
- if ( schema != null && validate )
251
+ if ( readerSchema != null && validate )
235
252
{
236
- var validationResult = validator . Validate ( json , schema ) ;
253
+ var validationResult = validator . Validate ( json , readerSchema ) ;
237
254
if ( validationResult . Count > 0 )
238
255
{
239
256
throw new InvalidDataException ( "Schema validation failed for properties: [" +
@@ -250,10 +267,10 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
250
267
using ( var jsonReader = new StreamReader ( jsonStream , Encoding . UTF8 ) )
251
268
{
252
269
string serializedString = jsonReader . ReadToEnd ( ) ;
253
-
254
- if ( schema != null && validate )
270
+
271
+ if ( readerSchema != null && validate )
255
272
{
256
- var validationResult = validator . Validate ( serializedString , schema ) ;
273
+ var validationResult = validator . Validate ( serializedString , readerSchema ) ;
257
274
258
275
if ( validationResult . Count > 0 )
259
276
{
@@ -266,14 +283,10 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
266
283
jsonSchemaGeneratorSettingsSerializerSettings ) ;
267
284
}
268
285
}
269
-
270
- // A schema is not required to deserialize json messages.
271
- // TODO: add validation capability.
272
286
}
273
- if ( writerSchema != null )
287
+
288
+ if ( readerSchema != null )
274
289
{
275
- Schema readerSchemaJson = latestSchema ?? writerSchema ;
276
- JsonSchema readerSchema = latestSchema != null ? await GetParsedSchema ( latestSchema ) : writerSchemaJson ;
277
290
FieldTransformer fieldTransformer = async ( ctx , transform , message ) =>
278
291
{
279
292
return await JsonUtils . Transform ( ctx , readerSchema , "$" , message , transform ) . ConfigureAwait ( false ) ;
@@ -283,8 +296,8 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
283
296
readerSchemaJson , value , fieldTransformer )
284
297
. ContinueWith ( t => ( T ) t . Result )
285
298
. ConfigureAwait ( continueOnCapturedContext : false ) ;
286
- }
287
-
299
+ }
300
+
288
301
return value ;
289
302
}
290
303
catch ( AggregateException e )
0 commit comments