本文整理汇总了Java中com.alibaba.otter.canal.protocol.Message类的典型用法代码示例。如果您正苦于以下问题:Java Message类的具体用法?Java Message怎么用?Java Message使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Message类属于com.alibaba.otter.canal.protocol包,在下文中一共展示了Message类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: printSummary
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
private void printSummary(Message message, long batchId, int size) {
long memsize = 0;
for (CanalEntry.Entry entry : message.getEntries()) {
memsize += entry.getHeader().getEventLength();
}
String startPosition = null;
String endPosition = null;
if (!CollectionUtils.isEmpty(message.getEntries())) {
startPosition = buildPositionForDump(message.getEntries().get(0));
endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
}
logger.info(context_format, new Object[]{batchId, size, memsize, format.format(new Date()), startPosition,
endPosition});
}
开发者ID:zhongchengxcr,项目名称:canal-elasticsearch,代码行数:17,代码来源:SelectorTask.java
示例2: consumerMessage
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
/**
* 消费当前消息
*/
private void consumerMessage(Message message) {
log.debug("canal instance: " + handle.instanceName() + " get message entry size " + message.getEntries().size());
try {
for (CanalEntry.Entry e : message.getEntries()) {
if (e.getEntryType() != CanalEntry.EntryType.ROWDATA || !e.hasStoreValue()) continue;
CanalEntry.Header header = e.getHeader();
if (header.getExecuteTime() < startRtTime
|| header.getEventType().getNumber() > CanalEntry.EventType.DELETE_VALUE
|| !handle.startHandle(header)) continue;
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(e.getStoreValue());
if (rowChange.getIsDdl()) continue;
handle.rowChangeHandle(rowChange);
} catch (InvalidProtocolBufferException e1) {
log.error("canal instance: " + handle.instanceName() + " parse store value have exception: ", e1);
}
}
handle.ack(message.getId());
} finally {
handle.finishMessageHandle();
}
}
开发者ID:wxingyl,项目名称:search-commons,代码行数:26,代码来源:CanalExecutor.java
示例3: printSummary
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
private void printSummary(Message message, long batchId, int size) {
long memsize = 0;
for (Entry entry : message.getEntries()) {
memsize += entry.getHeader().getEventLength();
}
String startPosition = null;
String endPosition = null;
if (!CollectionUtils.isEmpty(message.getEntries())) {
startPosition = buildPositionForDump(message.getEntries().get(0));
endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
}
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
logger.info(context_format, new Object[] { batchId, size, memsize, format.format(new Date()), startPosition,
endPosition });
}
开发者ID:alibaba,项目名称:canal,代码行数:18,代码来源:AbstractCanalClientTest.java
示例4: receiveMessages
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
private Message receiveMessages() throws IOException {
Packet p = Packet.parseFrom(readNextPacket());
switch (p.getType()) {
case MESSAGES: {
if (!p.getCompression().equals(Compression.NONE)) {
throw new CanalClientException("compression is not supported in this connector");
}
Messages messages = Messages.parseFrom(p.getBody());
Message result = new Message(messages.getBatchId());
for (ByteString byteString : messages.getMessagesList()) {
result.addEntry(Entry.parseFrom(byteString));
}
return result;
}
case ACK: {
Ack ack = Ack.parseFrom(p.getBody());
throw new CanalClientException("something goes wrong with reason: " + ack.getErrorMessage());
}
default: {
throw new CanalClientException("unexpected packet type: " + p.getType());
}
}
}
开发者ID:alibaba,项目名称:canal,代码行数:25,代码来源:SimpleCanalConnector.java
示例5: get
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
public Message get(int batchSize) throws CanalClientException {
int times = 0;
while (times < retryTimes) {
try {
Message msg = currentConnector.get(batchSize);
return msg;
} catch (Throwable t) {
logger.warn(String.format("something goes wrong when getting data from server:%s",
currentConnector != null ? currentConnector.getAddress() : "null"), t);
times++;
restart();
logger.info("restart the connector for next round retry.");
}
}
throw new CanalClientException("failed to fetch the data after " + times + " times retry");
}
开发者ID:alibaba,项目名称:canal,代码行数:17,代码来源:ClusterCanalConnector.java
示例6: getWithoutAck
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
public Message getWithoutAck(int batchSize) throws CanalClientException {
int times = 0;
while (times < retryTimes) {
try {
Message msg = currentConnector.getWithoutAck(batchSize);
return msg;
} catch (Throwable t) {
logger.warn(String.format("something goes wrong when getWithoutAck data from server:%s",
currentConnector.getAddress()), t);
times++;
restart();
logger.info("restart the connector for next round retry.");
}
}
throw new CanalClientException("failed to fetch the data after " + times + " times retry");
}
开发者ID:alibaba,项目名称:canal,代码行数:17,代码来源:ClusterCanalConnector.java
示例7: testGet
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
@Test
public void testGet() {
int maxEmptyCount = 10;
int emptyCount = 0;
int totalCount = 0;
server.subscribe(clientIdentity);
while (emptyCount < maxEmptyCount) {
Message message = server.get(clientIdentity, 11);
if (CollectionUtils.isEmpty(message.getEntries())) {
emptyCount++;
try {
Thread.sleep(emptyCount * 300L);
} catch (InterruptedException e) {
Assert.fail();
}
System.out.println("empty count : " + emptyCount);
} else {
emptyCount = 0;
totalCount += message.getEntries().size();
}
}
System.out.println("!!!!!! testGet totalCount : " + totalCount);
server.unsubscribe(clientIdentity);
}
开发者ID:alibaba,项目名称:canal,代码行数:27,代码来源:BaseCanalServerWithEmbededTest.java
示例8: trans
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
@Override
public ElasticsearchMetadata trans(Message message) {
List<CanalEntry.Entry> entries = message.getEntries();
ElasticsearchMetadata elasticsearchMetadata = TotoroObjectPool.esMetadata();
elasticsearchMetadata.setBatchId(message.getId());
elasticsearchMetadata.setTransForm(this);
if (entries != null && entries.size() > 0) {
EsEntryArrayList esEntryList = TotoroObjectPool.esEntryArrayList();
entries.forEach(entry -> {
if (messageFilterChain.filter(entry)) {
try {
ElasticsearchMetadata.EsEntry esEntry = getElasticsearchMetadata(entry);
esEntryList.add(esEntry);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
});
elasticsearchMetadata.setEsEntries(esEntryList);
}
logger.info("Trans form complete message id =====> {}", message.getId());
if (logger.isDebugEnabled()) {
logger.debug("Trans form complete elasticsearch metadata =====> {}", elasticsearchMetadata.toString());
}
return elasticsearchMetadata;
}
开发者ID:zhongchengxcr,项目名称:canal-elasticsearch,代码行数:30,代码来源:TotoroTransForm.java
示例9: putMessage
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
public void putMessage(Message e) throws InterruptedException {
if (rollBack.state() == true) {
selectorMessageQueue.put(e);
} else {
logger.info("The rollback happened =============> discard message , batchId :{}", e.getId());
}
}
开发者ID:zhongchengxcr,项目名称:canal-elasticsearch,代码行数:8,代码来源:TotoroChannel.java
示例10: main
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
"totoro",
"",
"");
connector.connect();
connector.subscribe();
int emptyTimes = 0;
while (running) {
Message message = connector.getWithoutAck(5 * 1024);
if (message == null || message.getId() == -1L) {
applyWait(emptyTimes++);
} else {
//logger.info(message.toString());
long messageId = message.getId();
System.out.println("消息id:" + messageId);
Thread.sleep(1000);
connector.rollback();
}
}
}
开发者ID:zhongchengxcr,项目名称:canal-elasticsearch,代码行数:30,代码来源:LockTest.java
示例11: process
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
protected void process() {
int batchSize = 5 * 1024;
while (running) {
try {
MDC.put("destination", destination);
connector.connect();
connector.subscribe();
while (running) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// }
} else {
printSummary(message, batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} catch (Exception e) {
logger.error("process error!", e);
} finally {
connector.disconnect();
MDC.remove("destination");
}
}
}
开发者ID:alibaba,项目名称:canal,代码行数:33,代码来源:AbstractCanalClientTest.java
示例12: getWithoutAck
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
waitClientRunning();
try {
int size = (batchSize <= 0) ? 1000 : batchSize;
long time = (timeout == null || timeout < 0) ? -1 : timeout; // -1代表不做timeout控制
if (unit == null) {
unit = TimeUnit.MILLISECONDS;
}
writeWithHeader(Packet.newBuilder()
.setType(PacketType.GET)
.setBody(Get.newBuilder()
.setAutoAck(false)
.setDestination(clientIdentity.getDestination())
.setClientId(String.valueOf(clientIdentity.getClientId()))
.setFetchSize(size)
.setTimeout(time)
.setUnit(unit.ordinal())
.build()
.toByteString())
.build()
.toByteArray());
return receiveMessages();
} catch (IOException e) {
throw new CanalClientException(e);
}
}
开发者ID:alibaba,项目名称:canal,代码行数:28,代码来源:SimpleCanalConnector.java
示例13: testGetWithoutAck
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
@Test
public void testGetWithoutAck() {
int maxEmptyCount = 10;
int emptyCount = 0;
int totalCount = 0;
server.subscribe(clientIdentity);
while (emptyCount < maxEmptyCount) {
Message message = server.getWithoutAck(clientIdentity, 11);
if (CollectionUtils.isEmpty(message.getEntries())) {
emptyCount++;
try {
Thread.sleep(emptyCount * 300L);
} catch (InterruptedException e) {
Assert.fail();
}
System.out.println("empty count : " + emptyCount);
} else {
emptyCount = 0;
totalCount += message.getEntries().size();
server.ack(clientIdentity, message.getId());
}
}
System.out.println("!!!!!! testGetWithoutAck totalCount : " + totalCount);
server.unsubscribe(clientIdentity);
}
开发者ID:alibaba,项目名称:canal,代码行数:28,代码来源:BaseCanalServerWithEmbededTest.java
示例14: fetchRows
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
public Message fetchRows() {
return fetchRows(this.canalConf.getBatchSize());
}
开发者ID:lackhurt,项目名称:flume-canal-source,代码行数:4,代码来源:CanalClient.java
示例15: getTotoroTransForm
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
private TotoroTransForm getTotoroTransForm(Message message) {
TotoroTransForm transForm = (TotoroTransForm) TotoroObjectPool.transForm();
transForm.setEsAdapter(esAdapter);
transForm.setMessage(message);
return transForm;
}
开发者ID:zhongchengxcr,项目名称:canal-elasticsearch,代码行数:7,代码来源:TransFormTask.java
示例16: setMessage
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
public TotoroTransForm setMessage(Message message) {
this.message = message;
return this;
}
开发者ID:zhongchengxcr,项目名称:canal-elasticsearch,代码行数:5,代码来源:TotoroTransForm.java
示例17: takeMessage
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
public Message takeMessage() throws InterruptedException {
return selectorMessageQueue.take();
}
开发者ID:zhongchengxcr,项目名称:canal-elasticsearch,代码行数:4,代码来源:TotoroChannel.java
示例18: run
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
@Override
public void run() {
running = true;
totoroSelector.start();
totoroSelector.rollback();
logger.info("Selector task start .......");
Message message;
rollBack.set(true);
while (running) {
try {
//出现回滚立即停止
message = totoroSelector.selector();
/**
* 当前处理回滚的调度模型,可以保证在消费端出错的时候、正确处理回滚,并正确应答和继续消费数据。
*
* 当发生回滚时,首先 consumer task 会将 rollback 设置为true ,自己停止工作,等待唤醒
* 然后 trans task 也会同样挂起
* channel会拒绝接受 message 和 future ,对于已经提交的 future 会尝试取消
*
* 到此 除了 selector task 以外 的所有线程 全部尽最大努力去停止处理消息,但注意此时还没有回滚
*
* 因为 selector 是循环获取数据,每次循环都会判断 rollback 状态,一旦发现rollback状态,跳出循环
* 返回到 selector task里面 ,task会感知到回滚状态 ,清空渠道中的消息 ,并回滚 至最后一个未应答的
* 消费点,然后丢弃本条消息,重新获取一次消息(回滚的消息)
* 当上面所有工作 都做完了,便完成了回滚 ,selector task 改变回滚状态,重新正常工作
*
* 粗略测试结果 : 40000条数据,单机测试,当 batchId 能被2整除的时候回滚
* 在不真正消费数据的前提下(消费端直接应答),处理性能非常好
*
*/
if (rollBack.state() == false) {
totoroSelector.rollback();
logger.info("The rollback happened =============> discard message , batchId :{}", message.getId());
//丢弃刚才的消息
message = totoroSelector.selector();
channel.clearMessage();
rollBack.set(true);
}
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
message = null;//help gc
} else {
logger.info("Put message into channel =====> batchId :{}", message.getId());
if (logger.isDebugEnabled()) {
printSummary(message, batchId, size);
printEntry(message.getEntries());
}
//将消息放入管道
channel.putMessage(message);
}
} catch (InterruptedException e) {
logger.error("Selector task has been interrupted ", e);
running = false;
break;
}
}
}
开发者ID:zhongchengxcr,项目名称:canal-elasticsearch,代码行数:65,代码来源:SelectorTask.java
示例19: selector
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
@Override
public Message selector() throws InterruptedException {
if (!running) {
throw new RuntimeException("CanalEmbedSelector has benn not start");
}
Message message = null;
int emptyTimes = 0;
if (batchTimeout < 0) {
while (running) {
message = connector.getWithoutAck(batchSize);
if (message == null || message.getId() == -1L) {
if (rollBack.state() == false) {
break;
} else {
applyWait(emptyTimes++);
}
} else {
break;
}
}
if (!running) {
throw new InterruptedException();
}
} else {
while (running) {
message = connector.getWithoutAck(batchSize, batchTimeout, TimeUnit.SECONDS);
if (message == null || message.getId() == -1L) {
continue;
} else {
break;
}
}
if (!running) {
throw new InterruptedException();
}
}
return message;
}
开发者ID:zhongchengxcr,项目名称:canal-elasticsearch,代码行数:45,代码来源:CanalEmbedSelector.java
示例20: get
import com.alibaba.otter.canal.protocol.Message; //导入依赖的package包/类
public Message get(int batchSize) throws CanalClientException {
return get(batchSize, null, null);
}
开发者ID:alibaba,项目名称:canal,代码行数:4,代码来源:SimpleCanalConnector.java
注:本文中的com.alibaba.otter.canal.protocol.Message类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论