本文整理汇总了Java中com.alibaba.rocketmq.client.consumer.PullCallback类的典型用法代码示例。如果您正苦于以下问题:Java PullCallback类的具体用法?Java PullCallback怎么用?Java PullCallback使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
PullCallback类属于com.alibaba.rocketmq.client.consumer包,在下文中一共展示了PullCallback类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: pullMessage
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
public PullResult pullMessage(//
final String addr,//
final PullMessageRequestHeader requestHeader,//
final long timeoutMillis,//
final CommunicationMode communicationMode,//
final PullCallback pullCallback//
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:26,代码来源:MQClientAPIImpl.java
示例2: pullMessage
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
/**
* 从对应地址拉取消息
*
* @param addr broker地址
* @param requestHeader
* @param timeoutMillis 客户端超时时间
* @param communicationMode 通信模式:异步,同步,单向
* @param pullCallback
* @return
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public PullResult pullMessage(//
final String addr,//
final PullMessageRequestHeader requestHeader,//
final long timeoutMillis,//
final CommunicationMode communicationMode,//
final PullCallback pullCallback//
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
switch (communicationMode) {
case ONEWAY:
assert false; //拉取消息不允许是单向的
return null;
case ASYNC: //Push模式是异步拉取消息,同时需要传递一个callback
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:39,代码来源:MQClientAPIImpl.java
示例3: pullMessageAsync
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
private void pullMessageAsync(//
final String addr,// 1
final RemotingCommand request,//
final long timeoutMillis,//
final PullCallback pullCallback//
) throws RemotingException, InterruptedException {
//this.remotingClient = new NettyRemotingClient(),这里实际执行 NettyRemotingClient.invokeAsync
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
//DefaultMQPushConsumerImpl.pullMessage(pullCallback.onSuccess)中的回调执行
pullCallback.onSuccess(pullResult);
}
catch (Exception e) {
pullCallback.onException(e);
}
}
else {
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed", responseFuture.getCause()));
}
else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
responseFuture.getCause()));
}
else {
pullCallback.onException(new MQClientException("unknow reseaon", responseFuture.getCause()));
}
}
}
});
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:39,代码来源:MQClientAPIImpl.java
示例4: postProcess
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
@Override
public Object postProcess(Object res, PullCallback t, Object proxy, Method method, Object[] args) {
if (method.getName().equals("onSuccess")) {
doAsyncCap(1);
}
else if (method.getName().equals("onException")) {
doAsyncCap(0);
}
return null;
}
开发者ID:uavorg,项目名称:uavstack,代码行数:13,代码来源:RocketmqIT.java
示例5: pullMessage
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
/**
* 拉消息接口
*/
public PullResult pullMessage(//
final String addr,//
final PullMessageRequestHeader requestHeader,//
final long timeoutMillis,//
final CommunicationMode communicationMode,//
final PullCallback pullCallback//
) throws RemotingException, MQBrokerException, InterruptedException {
// 添加虚拟运行环境相关的projectGroupPrefix
if (!UtilAll.isBlank(projectGroupPrefix)) {
requestHeader.setConsumerGroup(VirtualEnvUtil.buildWithProjectGroup(
requestHeader.getConsumerGroup(), projectGroupPrefix));
requestHeader.setTopic(VirtualEnvUtil.buildWithProjectGroup(requestHeader.getTopic(),
projectGroupPrefix));
}
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:38,代码来源:MQClientAPIImpl.java
示例6: pullMessageAsync
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
private void pullMessageAsync(//
final String addr,// 1
final RemotingCommand request,//
final long timeoutMillis,//
final PullCallback pullCallback//
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
pullCallback.onSuccess(pullResult);
}
catch (Exception e) {
pullCallback.onException(e);
}
}
else {
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed", responseFuture
.getCause()));
}
else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response timeout "
+ responseFuture.getTimeoutMillis() + "ms", responseFuture.getCause()));
}
else {
pullCallback.onException(new MQClientException("unknow reseaon", responseFuture
.getCause()));
}
}
}
});
}
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:38,代码来源:MQClientAPIImpl.java
示例7: pullMessage
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
/**
* 拉消息接口
*/
public PullResult pullMessage(//
final String addr,//
final PullMessageRequestHeader requestHeader,//
final long timeoutMillis,//
final CommunicationMode communicationMode,//
final PullCallback pullCallback//
) throws RemotingException, MQBrokerException, InterruptedException {
// 添加虚拟运行环境相关的projectGroupPrefix
if (!UtilAll.isBlank(projectGroupPrefix)) {
requestHeader.setConsumerGroup(VirtualEnvUtil.buildWithProjectGroup(
requestHeader.getConsumerGroup(), projectGroupPrefix));
requestHeader.setTopic(VirtualEnvUtil.buildWithProjectGroup(requestHeader.getTopic(),
projectGroupPrefix));
}
RemotingCommand request =
RemotingCommand.createRequestCommand(MQRequestCode.PULL_MESSAGE_VALUE, requestHeader);
switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
开发者ID:brucechan0921,项目名称:RocketMQ-3.0.8,代码行数:38,代码来源:MQClientAPIImpl.java
示例8: pull
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums,
PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException {
pull(mq, subExpression, offset, maxNums, pullCallback,
this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:6,代码来源:DefaultMQPullConsumerImpl.java
示例9: pullBlockIfNotFound
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums,
PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true, this
.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:6,代码来源:DefaultMQPullConsumerImpl.java
示例10: pullKernelImpl
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
public PullResult pullKernelImpl(//
final MessageQueue mq,// 1
final String subExpression,// 2
final long subVersion,// 3
final long offset,// 4
final int maxNums,// 5
final int sysFlag,// 6
final long commitOffset,// 7
final long brokerSuspendMaxTimeMillis,// 8
final long timeoutMillis,// 9
final CommunicationMode communicationMode,// 10
final PullCallback pullCallback// 11
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
//PullAPIWrapper.pullKernelImpl中调用updateTopicRouteInfoFromNameServer进行TopicRoute信息更新,最终保存在MQClientInstance.topicRouteTable
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}
if (findBrokerResult != null) {
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) { //拉取broker选择了slave, 清理掉提交消费位点的系统标识位。
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
//开始从broker对应的brokerAddr拉取消息
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(//
brokerAddr,//
requestHeader,//
timeoutMillis,//
communicationMode,//同步从broker拉取消息还是异步拉取消息
pullCallback);
return pullResult;
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:62,代码来源:PullAPIWrapper.java
示例11: pullMessageAsync
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
/**
* 异步拉取消息
*
* @param addr
* @param request
* @param timeoutMillis
* @param pullCallback
* @throws RemotingException
* @throws InterruptedException
*/
private void pullMessageAsync(//
final String addr,// 1
final RemotingCommand request,//
final long timeoutMillis,//
final PullCallback pullCallback//
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
/**
* 如果responseFuture中包含Response内容
*/
if (response != null) {
try {
/**
* 解析Response成为PullRequest
*/
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
/**
* 回调callback的onSuccess方法
*/
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
/**
* 解析消息异常调用onException方法
*/
pullCallback.onException(e);
}
} else {
/**
* 如果ResponseFuture中不包含Response内容,则有可能是出异常了,需要遍历错误原因
*/
if (!responseFuture.isSendRequestOK()) {//请求发送失败
pullCallback.onException(new MQClientException("send request failed", responseFuture.getCause()));
} else if (responseFuture.isTimeout()) { //读取响应超时
pullCallback.onException(new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknow reseaon", responseFuture.getCause()));
}
}
}
});
}
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:58,代码来源:MQClientAPIImpl.java
示例12: pull
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums,
PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false);
}
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:5,代码来源:DefaultMQPullConsumerImpl.java
示例13: pullBlockIfNotFound
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums,
PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true);
}
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:5,代码来源:DefaultMQPullConsumerImpl.java
示例14: pullKernelImpl
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
public PullResult pullKernelImpl(//
final MessageQueue mq,// 1
final String subExpression,// 2
final long subVersion,// 3
final long offset,// 4
final int maxNums,// 5
final int sysFlag,// 6
final long commitOffset,// 7
final long brokerSuspendMaxTimeMillis,// 8
final long timeoutMillis,// 9
final CommunicationMode communicationMode,// 10
final PullCallback pullCallback// 11
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
// TODO 此处可能对Name Server压力过大,需要调优
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}
if (findBrokerResult != null) {
int sysFlagInner = sysFlag;
// Slave不允许实时提交消费进度,可以定时提交
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(//
brokerAddr,//
requestHeader,//
timeoutMillis,//
communicationMode,//
pullCallback);
return pullResult;
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:62,代码来源:PullAPIWrapper.java
示例15: pullKernelImpl
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
public PullResult pullKernelImpl(//
final MessageQueue mq,// 1
final String subExpression,// 2
final long subVersion,// 3
final long offset,// 4
final int maxNums,// 5
final int sysFlag,// 6
final long commitOffset,// 7
final long brokerSuspendMaxTimeMillis,// 8
final long timeoutMillis,// 9
final CommunicationMode communicationMode,// 10
final PullCallback pullCallback// 11
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
// TODO 此处可能对Name Server压力过大,需要调优
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}
if (findBrokerResult != null) {
int sysFlagInner = sysFlag;
// Slave不允许实时提交消费进度,可以定时提交
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(//
findBrokerResult.getBrokerAddr(),//
requestHeader,//
timeoutMillis,//
communicationMode,//
pullCallback);
return pullResult;
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
开发者ID:brucechan0921,项目名称:RocketMQ-3.0.8,代码行数:57,代码来源:PullAPIWrapper.java
示例16: preProcess
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
@Override
public void preProcess(PullCallback t, Object proxy, Method method, Object[] args) {
}
开发者ID:uavorg,项目名称:uavstack,代码行数:5,代码来源:RocketmqIT.java
示例17: catchInvokeException
import com.alibaba.rocketmq.client.consumer.PullCallback; //导入依赖的package包/类
@Override
public void catchInvokeException(PullCallback t, Object proxy, Method method, Object[] args, Throwable e) {
doAsyncCap(0);
}
开发者ID:uavorg,项目名称:uavstack,代码行数:7,代码来源:RocketmqIT.java
注:本文中的com.alibaba.rocketmq.client.consumer.PullCallback类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论