I have a kafka-topic and I would like to feed it with AVRO data (currently in JSON). I know the "proper" way to do it is to use schema-registry but for testing purposes I would like to make it work without it.
So I am sending AVRO data as Array[Byte]
as opposed to regular Json objects:
val writer = new SpecificDatumWriter[GenericData.Record]("mySchema.avsc")
val out = new ByteArrayOutputStream
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(myAvroData, encoder)
encoder.flush
out.close
out.toByteArray
The schema is embarked within each data; how can I make it work with kafka-connect? The kafka-connect configuration currently exhibits the following properties (data is written to s3 as json.gz files), and I want to write Parquet files:
{
"name": "someName",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "120",
"topics": "user_sync",
"s3.region": "someRegion",
"s3.bucket.name": "someBucket",
"s3.part.size": "5242880",
"s3.compression.type": "gzip",
"filename.offset.zero.pad.width": "20",
"flush.size": "5000",
"rotate.interval.ms": "600000",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "YYYY/MM/dd/HH",
"timezone" : "UTC",
"locale": "en",
"partition.duration.ms": "600000",
"timestamp.extractor": "RecordField",
"timestamp.field" : "ts",
"schema.compatibility": "NONE"
I suppose I need to change "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat
? But is it enough?
Thanks a lot!
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…