本文整理汇总了Java中com.alibaba.rocketmq.common.message.Message类的典型用法代码示例。如果您正苦于以下问题:Java Message类的具体用法?Java Message怎么用?Java Message使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Message类属于com.alibaba.rocketmq.common.message包,在下文中一共展示了Message类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public static void main(String[] args){
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
try {
//订阅PushTopic下Tag为push的消息
consumer.subscribe("PushTopic", "push");
//程序第一次启动从消息队列头取数据
consumer.setConsumeFromWhere(
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> list,
ConsumeConcurrentlyContext Context) {
Message msg = list.get(0);
System.out.println(msg.toString());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
开发者ID:youngMen1,项目名称:-Spring-SpringMVC-Mybatis-,代码行数:27,代码来源:Consumer.java
示例2: main
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.setNamesrvAddr("127.0.0.1:9876");
try {
producer.start();
long time = System.currentTimeMillis();
System.out.println("开始:" + time);
for (int i = 1; i <= 100000; i ++) {
Message msg = new Message("PushTopic", "push", i + "", "Just for test.".getBytes());
SendResult result = producer.send(msg);
//System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
}
System.out.println("结束,消耗:" + (System.currentTimeMillis() - time));
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
开发者ID:youngMen1,项目名称:-Spring-SpringMVC-Mybatis-,代码行数:20,代码来源:Producer.java
示例3: checkMessage
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
/**
* Validate message
*
* @param msg
* @param defaultMQProducer
* @throws com.alibaba.rocketmq.client.exception.MQClientException
*/
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// topic
Validators.checkTopic(msg.getTopic());
// body
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:29,代码来源:Validators.java
示例4: executeLocalTransactionBranch
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
int value = transactionIndex.getAndIncrement();
if (value == 0) {
throw new RuntimeException("Could not find db");
}
else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:17,代码来源:TransactionExecuterImpl.java
示例5: main
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 1; i++)
try {
{
Message msg = new Message("TopicTest1",// topic
"TagA",// tag
"key113",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
QueryResult queryMessage =
producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis());
for (MessageExt m : queryMessage.getMessageList()) {
System.out.println(m);
}
}
}
catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:27,代码来源:TestProducer.java
示例6: main
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("yyzGroup2");
producer.setNamesrvAddr("10.2.223.157:9876;10.2.223.158:9876;10.2.223.159:9876");
// producer.setNamesrvAddr("10.2.223.228:9876");
producer.start();
for(int i = 0; i < 111111; ++i) {
// Thread.currentThread().sleep(50);
// for (String item : array) {
Message msg = new Message("yyztest2",// topic
"TAG",// tag
"ffff",// 注意, msgkey对帮助业务排查消息投递问题很有帮助,请设置成和消息有关的业务属性,比如订单id ,商品id .
"yang ya zhou".getBytes());// body //默认会设置等待消息存储成功。
SendResult sendResult = null;
try {//同步发送消息 ,并且等待消息存储成功,超时时间3s .
System.out.println("send msg with msgKey:" + msg.getKeys());
sendResult = producer.send(msg); //DefaultMQProducer.send
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(sendResult);
Thread.sleep(300);
}
System.out.println(System.getProperty("user.home") );
producer.shutdown();
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:26,代码来源:Producer.java
示例7: main
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
try {
for (int i = 0; i < 6000000; i++) {
Message msg = new Message("TopicFilter7",// topic
"TagA",// tag
"OrderID001",// key
("Hello MetaQ").getBytes());// body
msg.putUserProperty("SequenceId", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
}
catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:24,代码来源:Producer.java
示例8: main
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.setNamesrvAddr("127.0.0.1:9876");
try {
producer.start();
long time = System.currentTimeMillis();
System.out.println("开始:" + time);
int a = 100000;
for (int i = 1; i <= a; i++) {
Message msg = new Message("PushTopic", "push", i + "", "Just for test.".getBytes());
SendResult result = producer.send(msg);
System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
}
System.out.println("结束,消耗:" + (System.currentTimeMillis() - time));
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
开发者ID:ChangyiHuang,项目名称:shuzheng,代码行数:23,代码来源:Producer.java
示例9: writeImpl
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
@Override
protected void writeImpl(PacketFuture pf) throws Exception {
InsPacket packet = pf.send();
LOG.info("write packet-{}", packet);
packet.type(InsPacket.Type.GROUP.ordinal());
ByteBuffer bytes = codec(packet.version()).encode(packet);
String topic = packet.ins().toGroup().name();
List<Co> toCo = packet.ins().toCo();
if (toCo.isEmpty()) {
Message multicast = new Message(topic, name(), packet.ins().id(), bytes.array());
send(multicast, pf);
} else {
for (Co co : toCo) {
Message unicast = new Message(topic, String.valueOf(co.hashCode()), packet.ins().id(), bytes.array());
send(unicast, pf);
}
}
}
开发者ID:dzh,项目名称:coca,代码行数:20,代码来源:RMQGroupChannel.java
示例10: testScheduledMessageProducer
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
@Test
public void testScheduledMessageProducer() throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
开发者ID:dzh,项目名称:coca,代码行数:19,代码来源:TestScheduledMessageProducer.java
示例11: testSyncProducer
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
@Test
public void testSyncProducer() throws Exception {
// Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
// Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// Shut down once the producer instance is not longer in use.
producer.shutdown();
}
开发者ID:dzh,项目名称:coca,代码行数:19,代码来源:TestProducer.java
示例12: testAsyncProducer
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
@Test
public void testAsyncProducer() throws Exception {
// Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
// Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
}
开发者ID:dzh,项目名称:coca,代码行数:26,代码来源:TestProducer.java
示例13: testOnewayProducer
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
@Test
public void testOnewayProducer() throws Exception {
// Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
// Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
// Shut down once the producer instance is not longer in use.
producer.shutdown();
}
开发者ID:dzh,项目名称:coca,代码行数:19,代码来源:TestProducer.java
示例14: main
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 1; i++)
try {
{
Message msg = new Message("TopicTest",// topic
"TagA",// tag
"OrderID188",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
}
catch (Exception e) {
e.printStackTrace();
}
producer.setInstanceName("");
producer.setProducerGroup("");
producer.shutdown();
}
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:26,代码来源:Producer.java
示例15: serializer
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
protected Message serializer(Event event){
Map<String,String> headers = event.getHeaders();
String tempTopic = topic;
if(RocketMQSinkConstants.DEFAULT_TOPIC.equals(tempTopic))
topic = getHeader(headers,RocketMQSinkConstants.PRODUCER_TOPIC,tempTopic);
String eventkeys = getHeader(headers,RocketMQSinkConstants.MESSAGE_KEYS,keys);
String eventtags = getHeader(headers,RocketMQSinkConstants.MESSAGE_TAGS,tags);
byte[] body = event.getBody();
if (logger.isDebugEnabled()) {
logger.debug("{Event} " + topic + " : " + eventkeys + " : "+ eventtags + " : "
+ new String(body));
}
Message msg = new Message(topic,eventtags,eventkeys,body);
return msg;
}
开发者ID:bigdatafly,项目名称:flume-rocketmq-sink,代码行数:22,代码来源:RocketMQSink.java
示例16: checkMessage
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
/**
* message 有效性检查
*
* @param msg
* @param defaultMQProducer
* @throws com.alibaba.rocketmq.client.exception.MQClientException
*/
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
throws MQClientException {
if (null == msg) {
throw new MQClientException("the message is null", null);
}
// topic
Validators.checkTopic(msg.getTopic());
// body
if (null == msg.getBody()) {
throw new MQClientException("the message body is null", null);
}
if (0 == msg.getBody().length) {
throw new MQClientException("the message body length is zero", null);
}
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException("the message body size over max value, MAX: "
+ defaultMQProducer.getMaxMessageSize(), null);
}
}
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:29,代码来源:Validators.java
示例17: tryToCompressMessage
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
private boolean tryToCompressMessage(final Message msg) {
byte[] body = msg.getBody();
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
byte[] data = UtilAll.compress(body, zipCompressLevel);
if (data != null) {
msg.setBody(data);
return true;
}
}
catch (IOException e) {
log.error("tryToCompressMessage exception", e);
log.warn(msg.toString());
}
}
}
return false;
}
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:21,代码来源:DefaultMQProducerImpl.java
示例18: send
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
/**
* KERNEL ASYNC -------------------------------------------------------
*/
public void send(Message msg, MessageQueue mq, SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException {
// 有效性检查
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
if (!msg.getTopic().equals(mq.getTopic())) {
throw new MQClientException("message's topic not equal mq's topic", null);
}
try {
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback);
}
catch (MQBrokerException e) {
throw new MQClientException("unknow exception", e);
}
}
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:21,代码来源:DefaultMQProducerImpl.java
示例19: sendMessageBack
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public void sendMessageBack(MessageExt msg, int delayLevel) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
try {
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 3000);
}
catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
Message newMsg =
new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()),
msg.getBody());
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
}
}
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:21,代码来源:DefaultMQPushConsumerImpl.java
示例20: sendMessageBack
import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public void sendMessageBack(MessageExt msg, int delayLevel) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
try {
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(msg,
this.defaultMQPullConsumer.getConsumerGroup(), delayLevel, 3000);
}
catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e);
Message newMsg =
new Message(MixAll.getRetryTopic(this.defaultMQPullConsumer.getConsumerGroup()),
msg.getBody());
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
}
}
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:21,代码来源:DefaultMQPullConsumerImpl.java
注:本文中的com.alibaba.rocketmq.common.message.Message类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论