Using a partially open content model in JSONSchema. It means that there will be some mandatory fields(defined in properties) and non-mandatory fields(as defined in the patternProperties) accepted by the topic. please refer to the Console producer for JSON schema and data example.
DB Table properties are as below. Now in connector config, I have used "fields.whitelist": "noOfVisits,downloadCount". The connector should pick those fields from the Topic message and store those in DB if those fields are present in the message right? but for each case in DB, it's storing null. Please refer to the DB Table as shown below for the same example input messages.
---> Console Producer:
./kafka-json-schema-console-producer
--broker-list localhost:9092
--topic DAILY_TEST_STATS_51
--property key.schema='{"type":"string"}' --property parse.key=true --property "key.separator=>"
--property value.schema='{
"type": "object",
"properties": {
"viewerId": {
"type": "integer",
"optional": false
}
},
"patternProperties": {
"^[a-zA-Z]*$": {
"type": "integer"
}
},
"additionalProperties": false
}'
"203">{"viewerId":203,"noOfVisits":10,"downloadCount":111}
"204">{"viewerId":204,"downloadCount":111}
"205">{"viewerId":205,"noOfVisits":10}
"206">{"viewerId":206,"noOfVisits":10,"downloadCount":111}
"207">{"viewerId":207,"noOfVisits":10,"downloadCount":111}
"208">{"viewerId":208,"noOfVisits":10,"downloadCount":111}
---> DB Table properties:
CREATE TABLE IF NOT EXISTS `Temp51` (
`viewerId` int(10) NOT NULL,
`noOfVisits` int(5) NULL,
`downloadCount` int(5) NULL,
PRIMARY KEY (`viewerId`)
);
---> Connector Config:
{
"name": "temp-jdbc-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081",
"connection.url": "jdbc:mysql://localhost:3306/StreamTest1",
"connection.user": "root",
"connection.password": "root",
"tasks.max": "1",
"insert.mode": "upsert",
"topics": "DAILY_TEST_STATS_51",
"pk.mode": "record_value",
"pk.fields": "viewerId",
"table.name.format": "Temp51",
"fields.whitelist": "noOfVisits,downloadCount"
}
}
---> DB table: Temp51
# viewerId, noOfVisits, downloadCount
203, ,
204, ,
205, ,
206, ,
207, ,
208, ,