Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
451 views
in Technique[技术] by (71.8m points)

mysql - Facing an issue while storing the data in SQL DB using JDBC sink connector

Using a partially open content model in JSONSchema. It means that there will be some mandatory fields(defined in properties) and non-mandatory fields(as defined in the patternProperties) accepted by the topic. please refer to the Console producer for JSON schema and data example. DB Table properties are as below. Now in connector config, I have used "fields.whitelist": "noOfVisits,downloadCount". The connector should pick those fields from the Topic message and store those in DB if those fields are present in the message right? but for each case in DB, it's storing null. Please refer to the DB Table as shown below for the same example input messages.

---> Console Producer:

./kafka-json-schema-console-producer 
    --broker-list localhost:9092 
    --topic DAILY_TEST_STATS_51 
    --property key.schema='{"type":"string"}' --property parse.key=true --property "key.separator=>" 
    --property value.schema='{
  "type": "object",
  "properties": {
    "viewerId": {
      "type": "integer",
      "optional": false
    }
  },
  "patternProperties": {
    "^[a-zA-Z]*$": {
      "type": "integer"
    }
  },
  "additionalProperties": false
}'
"203">{"viewerId":203,"noOfVisits":10,"downloadCount":111}
"204">{"viewerId":204,"downloadCount":111}
"205">{"viewerId":205,"noOfVisits":10}
"206">{"viewerId":206,"noOfVisits":10,"downloadCount":111}
"207">{"viewerId":207,"noOfVisits":10,"downloadCount":111}
"208">{"viewerId":208,"noOfVisits":10,"downloadCount":111}

---> DB Table properties:

CREATE TABLE IF NOT EXISTS `Temp51` (
  `viewerId` int(10) NOT NULL,
  `noOfVisits` int(5) NULL,
  `downloadCount` int(5) NULL,
  PRIMARY KEY (`viewerId`)
);

---> Connector Config:

{
    "name": "temp-jdbc-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
        "value.converter.schemas.enable": "true",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "connection.url": "jdbc:mysql://localhost:3306/StreamTest1",
        "connection.user": "root",
        "connection.password": "root",
        "tasks.max": "1",
        "insert.mode": "upsert",
        "topics": "DAILY_TEST_STATS_51",
        "pk.mode": "record_value",
        "pk.fields": "viewerId",
        "table.name.format": "Temp51",
        "fields.whitelist": "noOfVisits,downloadCount"
    }
}

---> DB table: Temp51

# viewerId, noOfVisits, downloadCount
203, , 
204, , 
205, , 
206, , 
207, , 
208, , 

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...