本文整理汇总了Java中org.apache.kafka.connect.data.SchemaAndValue类的典型用法代码示例。如果您正苦于以下问题:Java SchemaAndValue类的具体用法?Java SchemaAndValue怎么用?Java SchemaAndValue使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
SchemaAndValue类属于org.apache.kafka.connect.data包,在下文中一共展示了SchemaAndValue类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: toSourceRecord
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
/**
* Convert a message into a Kafka Connect SourceRecord.
*
* @param context the JMS context to use for building messages
* @param topic the Kafka topic
* @param messageBodyJms whether to interpret MQ messages as JMS messages
* @param message the message
*
* @return the Kafka Connect SourceRecord
*
* @throws JMSException Message could not be converted
*/
@Override public SourceRecord toSourceRecord(JMSContext context, String topic, boolean messageBodyJms, Message message) throws JMSException {
byte[] payload;
if (message instanceof BytesMessage) {
payload = message.getBody(byte[].class);
}
else if (message instanceof TextMessage) {
String s = message.getBody(String.class);
payload = s.getBytes(UTF_8);
}
else {
log.error("Unsupported JMS message type {}", message.getClass());
throw new ConnectException("Unsupported JMS message type");
}
SchemaAndValue sv = converter.toConnectData(topic, payload);
return new SourceRecord(null, null, topic, sv.schema(), sv.value());
}
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-source,代码行数:30,代码来源:JsonRecordBuilder.java
示例2: toConnectData
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
JsonNode jsonValue;
try {
jsonValue = deserializer.deserialize(topic, value);
} catch (SerializationException e) {
throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e);
}
if (enableSchemas && (jsonValue == null || !jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has("schema") || !jsonValue.has("payload")))
throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." +
" If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.");
// The deserialized data should either be an envelope object containing the schema and the payload or the schema
// was stripped during serialization and we need to fill in an all-encompassing schema.
if (!enableSchemas) {
ObjectNode envelope = JsonNodeFactory.instance.objectNode();
envelope.set("schema", null);
envelope.set("payload", jsonValue);
jsonValue = envelope;
}
return jsonToConnect(jsonValue);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:JsonConverter.java
示例3: convertMessages
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
for (ConsumerRecord<byte[], byte[]> msg : msgs) {
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
SinkRecord record = new SinkRecord(msg.topic(), msg.partition(),
keyAndSchema.schema(), keyAndSchema.value(),
valueAndSchema.schema(), valueAndSchema.value(),
msg.offset(),
ConnectUtils.checkAndConvertTimestamp(msg.timestamp()),
msg.timestampType());
record = transformationChain.apply(record);
if (record != null) {
messageBatch.add(record);
}
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:18,代码来源:WorkerSinkTask.java
示例4: parseConnectorStatus
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
private ConnectorStatus parseConnectorStatus(String connector, byte[] data) {
try {
SchemaAndValue schemaAndValue = converter.toConnectData(topic, data);
if (!(schemaAndValue.value() instanceof Map)) {
log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
return null;
}
@SuppressWarnings("unchecked")
Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
String trace = (String) statusMap.get(TRACE_KEY_NAME);
String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
int generation = ((Long) statusMap.get(GENERATION_KEY_NAME)).intValue();
return new ConnectorStatus(connector, state, trace, workerUrl, generation);
} catch (Exception e) {
log.error("Failed to deserialize connector status", e);
return null;
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:KafkaStatusBackingStore.java
示例5: parseTaskStatus
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
private TaskStatus parseTaskStatus(ConnectorTaskId taskId, byte[] data) {
try {
SchemaAndValue schemaAndValue = converter.toConnectData(topic, data);
if (!(schemaAndValue.value() instanceof Map)) {
log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
return null;
}
@SuppressWarnings("unchecked")
Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
String trace = (String) statusMap.get(TRACE_KEY_NAME);
String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
int generation = ((Long) statusMap.get(GENERATION_KEY_NAME)).intValue();
return new TaskStatus(taskId, state, workerUrl, generation, trace);
} catch (Exception e) {
log.error("Failed to deserialize task status", e);
return null;
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KafkaStatusBackingStore.java
示例6: expectOnePoll
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@SuppressWarnings("unchecked")
private IExpectationSetters<Object> expectOnePoll() {
// Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of
// returning empty data, we return one record. The expectation is that the data will be ignored by the
// response behavior specified using the return value of this method.
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
// "Sleep" so time will progress
time.sleep(1L);
ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
Collections.singletonMap(
new TopicPartition(TOPIC, PARTITION),
Arrays.asList(
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
)));
recordsReturned++;
return records;
}
});
EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
sinkTask.put(EasyMock.anyObject(Collection.class));
return EasyMock.expectLastCall();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:WorkerSinkTaskThreadedTest.java
示例7: readConnectorState
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Test
public void readConnectorState() {
byte[] value = new byte[0];
KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
Converter converter = mock(Converter.class);
KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
Map<String, Object> statusMap = new HashMap<>();
statusMap.put("worker_id", WORKER_ID);
statusMap.put("state", "RUNNING");
statusMap.put("generation", 0L);
expect(converter.toConnectData(STATUS_TOPIC, value))
.andReturn(new SchemaAndValue(null, statusMap));
replayAll();
store.read(consumerRecord(0, "status-connector-conn", value));
ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
assertEquals(status, store.get(CONNECTOR));
verifyAll();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:KafkaStatusBackingStoreTest.java
示例8: readTaskState
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Test
public void readTaskState() {
byte[] value = new byte[0];
KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
Converter converter = mock(Converter.class);
KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
Map<String, Object> statusMap = new HashMap<>();
statusMap.put("worker_id", WORKER_ID);
statusMap.put("state", "RUNNING");
statusMap.put("generation", 0L);
expect(converter.toConnectData(STATUS_TOPIC, value))
.andReturn(new SchemaAndValue(null, statusMap));
replayAll();
store.read(consumerRecord(0, "status-task-conn-0", value));
TaskStatus status = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
assertEquals(status, store.get(TASK));
verifyAll();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:KafkaStatusBackingStoreTest.java
示例9: expectStart
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords,
final Map<byte[], Struct> deserializations) throws Exception {
storeLog.start();
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
for (ConsumerRecord<String, byte[]> rec : preexistingRecords)
capturedConsumedCallback.getValue().onCompletion(null, rec);
return null;
}
});
for (Map.Entry<byte[], Struct> deserializationEntry : deserializations.entrySet()) {
// Note null schema because default settings for internal serialization are schema-less
EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(deserializationEntry.getKey())))
.andReturn(new SchemaAndValue(null, structToMap(deserializationEntry.getValue())));
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:18,代码来源:KafkaConfigBackingStoreTest.java
示例10: expectConvertWriteRead
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
final String dataFieldName, final Object dataFieldValue) {
final Capture<Struct> capturedRecord = EasyMock.newCapture();
if (serialized != null)
EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
.andReturn(serialized);
storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
PowerMock.expectLastCall();
EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
.andAnswer(new IAnswer<SchemaAndValue>() {
@Override
public SchemaAndValue answer() throws Throwable {
if (dataFieldName != null)
assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName));
// Note null schema because default settings for internal serialization are schema-less
return new SchemaAndValue(null, serialized == null ? null : structToMap(capturedRecord.getValue()));
}
});
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KafkaConfigBackingStoreTest.java
示例11: deserializeImpl
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
protected Struct deserializeImpl(String topic, byte[] payload) {
/*
* if (isKey){ return deserializeKey(topic, op); }else{ return
* deserializeVal(topic, op); }
*/
if (!isKey && payload == null){
// A delete mutation...
return null;
}
try {
SchemaAndValue res = converter.toConnectData(topic, payload);
Schema schema = res.schema();
Object val = res.value();
Struct struct = (Struct) val;
// Schema will be null for delete mutation vals
if (schema != struct.schema()) {
throw new SerializationException(
"Object schema doesn't match given schema");
}
return struct;
} catch (RuntimeException e) {
throw new SerializationException(
"Error deserializing Avro message ", e);
}
}
开发者ID:rogers,项目名称:change-data-capture,代码行数:27,代码来源:SpecificAvroMutationDeserializer.java
示例12: toConnectData
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
/** {@inheritDoc} */
@Override public SchemaAndValue toConnectData(String topic, byte[] bytes) {
CacheEvent evt;
try {
evt = deserializer.deserialize(topic, bytes);
}
catch (SerializationException e) {
throw new DataException("Failed to convert to Kafka Connect data due to a serialization error", e);
}
if (evt == null) {
return SchemaAndValue.NULL;
}
return new SchemaAndValue(null, evt);
}
开发者ID:apache,项目名称:ignite,代码行数:17,代码来源:CacheEventConverter.java
示例13: apply
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Override
public R apply(R r) {
final SchemaAndValue headerValue = extractHeader(r);
return r.newRecord(
r.topic(),
r.kafkaPartition(),
headerValue.schema(),
headerValue.value(),
r.valueSchema(),
r.value(),
r.timestamp()
);
}
开发者ID:jcustenborder,项目名称:kafka-connect-rabbitmq,代码行数:15,代码来源:ExtractHeader.java
示例14: toConnectData
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
try {
return new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, deserializer.deserialize(topic, value));
} catch (SerializationException e) {
throw new DataException("Failed to deserialize string: ", e);
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:StringConverter.java
示例15: testBytesToStringNonUtf8Encoding
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Test
public void testBytesToStringNonUtf8Encoding() throws UnsupportedEncodingException {
converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true);
SchemaAndValue data = converter.toConnectData(TOPIC, SAMPLE_STRING.getBytes("UTF-16"));
assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
assertEquals(SAMPLE_STRING, data.value());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:StringConverterTest.java
示例16: jsonToConnect
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
private SchemaAndValue jsonToConnect(JsonNode jsonValue) {
if (jsonValue == null)
return SchemaAndValue.NULL;
if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
throw new DataException("JSON value converted to Kafka Connect must be in envelope containing schema");
Schema schema = asConnectSchema(jsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
return new SchemaAndValue(schema, convertToConnect(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:JsonConverter.java
示例17: testConnectSchemaMetadataTranslation
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Test
public void testConnectSchemaMetadataTranslation() {
// this validates the non-type fields are translated and handled properly
assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes()));
assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": null }".getBytes()));
assertEquals(new SchemaAndValue(SchemaBuilder.bool().defaultValue(true).build(), true),
converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"default\": true }, \"payload\": null }".getBytes()));
assertEquals(new SchemaAndValue(SchemaBuilder.bool().required().name("bool").version(2).doc("the documentation").parameter("foo", "bar").build(), true),
converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\", \"parameters\": { \"foo\": \"bar\" }}, \"payload\": true }".getBytes()));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:JsonConverterTest.java
示例18: bytesToConnect
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Test
public void bytesToConnect() throws UnsupportedEncodingException {
ByteBuffer reference = ByteBuffer.wrap("test-string".getBytes("UTF-8"));
String msg = "{ \"schema\": { \"type\": \"bytes\" }, \"payload\": \"dGVzdC1zdHJpbmc=\" }";
SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
ByteBuffer converted = ByteBuffer.wrap((byte[]) schemaAndValue.value());
assertEquals(reference, converted);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:JsonConverterTest.java
示例19: mapToConnectStringKeys
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Test
public void mapToConnectStringKeys() {
byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { \"key1\": 12, \"key2\": 15} }".getBytes();
Map<String, Integer> expected = new HashMap<>();
expected.put("key1", 12);
expected.put("key2", 15);
assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toConnectData(TOPIC, mapJson));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:JsonConverterTest.java
示例20: mapToConnectNonStringKeys
import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Test
public void mapToConnectNonStringKeys() {
byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"int32\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": [ [1, 12], [2, 15] ] }".getBytes();
Map<Integer, Integer> expected = new HashMap<>();
expected.put(1, 12);
expected.put(2, 15);
assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toConnectData(TOPIC, mapJson));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:JsonConverterTest.java
注:本文中的org.apache.kafka.connect.data.SchemaAndValue类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论