• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Message类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.alibaba.rocketmq.common.message.Message的典型用法代码示例。如果您正苦于以下问题:Java Message类的具体用法?Java Message怎么用?Java Message使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Message类属于com.alibaba.rocketmq.common.message包,在下文中一共展示了Message类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public static void main(String[] args){
	DefaultMQPushConsumer consumer =
			new DefaultMQPushConsumer("PushConsumer");
	consumer.setNamesrvAddr("127.0.0.1:9876");
	try {
		//订阅PushTopic下Tag为push的消息
		consumer.subscribe("PushTopic", "push");
		//程序第一次启动从消息队列头取数据
		consumer.setConsumeFromWhere(
				ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		consumer.registerMessageListener(
				new MessageListenerConcurrently() {
					public ConsumeConcurrentlyStatus consumeMessage(
							List<MessageExt> list,
							ConsumeConcurrentlyContext Context) {
						Message msg = list.get(0);
						System.out.println(msg.toString());
						return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
					}
				}
		);
		consumer.start();
	} catch (Exception e) {
		e.printStackTrace();
	}
}
 
开发者ID:youngMen1,项目名称:-Spring-SpringMVC-Mybatis-,代码行数:27,代码来源:Consumer.java


示例2: main

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public static void main(String[] args) {
	DefaultMQProducer producer = new DefaultMQProducer("Producer");
	producer.setNamesrvAddr("127.0.0.1:9876");
	try {
		producer.start();
		long time = System.currentTimeMillis();
		System.out.println("开始:" + time);
		for (int i = 1; i <= 100000; i ++) {
			Message msg = new Message("PushTopic", "push", i + "", "Just for test.".getBytes());
			SendResult result = producer.send(msg);
			//System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
		}
		System.out.println("结束,消耗:" + (System.currentTimeMillis() - time));
	} catch (Exception e) {
		e.printStackTrace();
	} finally {
		producer.shutdown();
	}
}
 
开发者ID:youngMen1,项目名称:-Spring-SpringMVC-Mybatis-,代码行数:20,代码来源:Producer.java


示例3: checkMessage

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
/**
 * Validate message
 *
 * @param msg
 * @param defaultMQProducer
 * @throws com.alibaba.rocketmq.client.exception.MQClientException
 */
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
        throws MQClientException {
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // topic
    Validators.checkTopic(msg.getTopic());
    // body
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }

    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }

    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:29,代码来源:Validators.java


示例4: executeLocalTransactionBranch

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
    int value = transactionIndex.getAndIncrement();

    if (value == 0) {
        throw new RuntimeException("Could not find db");
    }
    else if ((value % 5) == 0) {
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    else if ((value % 4) == 0) {
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    return LocalTransactionState.UNKNOW;
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:17,代码来源:TransactionExecuterImpl.java


示例5: main

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    producer.start();
    for (int i = 0; i < 1; i++)
        try {
            {
                Message msg = new Message("TopicTest1",// topic
                    "TagA",// tag
                    "key113",// key
                    ("Hello MetaQ").getBytes());// body
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);

                QueryResult queryMessage =
                        producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis());
                for (MessageExt m : queryMessage.getMessageList()) {
                    System.out.println(m);
                }
            }

        }
        catch (Exception e) {
            e.printStackTrace();
        }
    producer.shutdown();
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:27,代码来源:TestProducer.java


示例6: main

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("yyzGroup2");
        producer.setNamesrvAddr("10.2.223.157:9876;10.2.223.158:9876;10.2.223.159:9876");
       // producer.setNamesrvAddr("10.2.223.228:9876");
        producer.start();
        for(int i = 0; i < 111111; ++i) {
//            Thread.currentThread().sleep(50);
//            for (String item : array) {
            Message msg = new Message("yyztest2",// topic
                    "TAG",// tag
                    "ffff",// 注意, msgkey对帮助业务排查消息投递问题很有帮助,请设置成和消息有关的业务属性,比如订单id ,商品id .
                    "yang ya zhou".getBytes());// body //默认会设置等待消息存储成功。
            SendResult sendResult = null;
            try {//同步发送消息 ,并且等待消息存储成功,超时时间3s .
                System.out.println("send msg with msgKey:" + msg.getKeys());
                sendResult = producer.send(msg); //DefaultMQProducer.send
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(sendResult);
            Thread.sleep(300);
        }
        System.out.println(System.getProperty("user.home") );
        producer.shutdown();
    }
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:26,代码来源:Producer.java


示例7: main

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    producer.start();

    try {
        for (int i = 0; i < 6000000; i++) {
            Message msg = new Message("TopicFilter7",// topic
                "TagA",// tag
                "OrderID001",// key
                ("Hello MetaQ").getBytes());// body

            msg.putUserProperty("SequenceId", String.valueOf(i));

            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult);
        }
    }
    catch (Exception e) {
        e.printStackTrace();
    }

    producer.shutdown();
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:24,代码来源:Producer.java


示例8: main

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public static void main(String[] args) {
    DefaultMQProducer producer = new DefaultMQProducer("Producer");
    producer.setNamesrvAddr("127.0.0.1:9876");
    try {
        producer.start();
        long time = System.currentTimeMillis();
        System.out.println("开始:" + time);

        int a = 100000;

        for (int i = 1; i <= a; i++) {
            Message msg = new Message("PushTopic", "push", i + "", "Just for test.".getBytes());
            SendResult result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
        }
        System.out.println("结束,消耗:" + (System.currentTimeMillis() - time));
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        producer.shutdown();
    }
}
 
开发者ID:ChangyiHuang,项目名称:shuzheng,代码行数:23,代码来源:Producer.java


示例9: writeImpl

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
@Override
protected void writeImpl(PacketFuture pf) throws Exception {
    InsPacket packet = pf.send();
    LOG.info("write packet-{}", packet);

    packet.type(InsPacket.Type.GROUP.ordinal());
    ByteBuffer bytes = codec(packet.version()).encode(packet);
    String topic = packet.ins().toGroup().name();
    List<Co> toCo = packet.ins().toCo();
    if (toCo.isEmpty()) {
        Message multicast = new Message(topic, name(), packet.ins().id(), bytes.array());
        send(multicast, pf);
    } else {
        for (Co co : toCo) {
            Message unicast = new Message(topic, String.valueOf(co.hashCode()), packet.ins().id(), bytes.array());
            send(unicast, pf);
        }
    }
}
 
开发者ID:dzh,项目名称:coca,代码行数:20,代码来源:RMQGroupChannel.java


示例10: testScheduledMessageProducer

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
@Test
public void testScheduledMessageProducer() throws Exception {
    // Instantiate a producer to send scheduled messages
    DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
    // Launch producer
    producer.start();
    int totalMessagesToSend = 100;
    for (int i = 0; i < totalMessagesToSend; i++) {
        Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
        // This message will be delivered to consumer 10 seconds later.
        message.setDelayTimeLevel(3);
        // Send the message
        producer.send(message);
    }

    // Shutdown producer after use.
    producer.shutdown();
}
 
开发者ID:dzh,项目名称:coca,代码行数:19,代码来源:TestScheduledMessageProducer.java


示例11: testSyncProducer

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
@Test
public void testSyncProducer() throws Exception {
    // Instantiate with a producer group name.
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Launch the instance.
    producer.start();
    for (int i = 0; i < 100; i++) {
        // Create a message instance, specifying topic, tag and message body.
        Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
        // Call send message to deliver message to one of brokers.
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
    }
    // Shut down once the producer instance is not longer in use.
    producer.shutdown();
}
 
开发者ID:dzh,项目名称:coca,代码行数:19,代码来源:TestProducer.java


示例12: testAsyncProducer

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
@Test
public void testAsyncProducer() throws Exception {
    // Instantiate with a producer group name.
    DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
    // Launch the instance.
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);
    for (int i = 0; i < 100; i++) {
        final int index = i;
        // Create a message instance, specifying topic, tag and message body.
        Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
            }

            @Override
            public void onException(Throwable e) {
                System.out.printf("%-10d Exception %s %n", index, e);
                e.printStackTrace();
            }
        });
    }
}
 
开发者ID:dzh,项目名称:coca,代码行数:26,代码来源:TestProducer.java


示例13: testOnewayProducer

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
@Test
public void testOnewayProducer() throws Exception {
    // Instantiate with a producer group name.
    DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
    // Launch the instance.
    producer.start();
    for (int i = 0; i < 100; i++) {
        // Create a message instance, specifying topic, tag and message body.
        Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
        // Call send message to deliver message to one of brokers.
        producer.sendOneway(msg);

    }
    // Shut down once the producer instance is not longer in use.
    producer.shutdown();
}
 
开发者ID:dzh,项目名称:coca,代码行数:19,代码来源:TestProducer.java


示例14: main

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    producer.start();
    for (int i = 0; i < 1; i++)
        try {
            {
                Message msg = new Message("TopicTest",// topic
                    "TagA",// tag
                    "OrderID188",// key
                    ("Hello MetaQ").getBytes());// body

                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }

        }
        catch (Exception e) {
            e.printStackTrace();
        }

    producer.setInstanceName("");
    producer.setProducerGroup("");

    producer.shutdown();
}
 
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:26,代码来源:Producer.java


示例15: serializer

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
protected Message serializer(Event event){
	
	Map<String,String> headers = event.getHeaders();
	String tempTopic = topic;
	
	if(RocketMQSinkConstants.DEFAULT_TOPIC.equals(tempTopic))
		topic = getHeader(headers,RocketMQSinkConstants.PRODUCER_TOPIC,tempTopic);
	String eventkeys = getHeader(headers,RocketMQSinkConstants.MESSAGE_KEYS,keys);
	String eventtags = getHeader(headers,RocketMQSinkConstants.MESSAGE_TAGS,tags);
	byte[] body = event.getBody();
	
    if (logger.isDebugEnabled()) {
        logger.debug("{Event} " + topic + " : " + eventkeys + " : "+ eventtags + " : "
          + new String(body));
        
      }
	
	Message msg = new Message(topic,eventtags,eventkeys,body);

	return msg;
}
 
开发者ID:bigdatafly,项目名称:flume-rocketmq-sink,代码行数:22,代码来源:RocketMQSink.java


示例16: checkMessage

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
/**
 * message 有效性检查
 * 
 * @param msg
 * @param defaultMQProducer
 * @throws com.alibaba.rocketmq.client.exception.MQClientException
 */
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
        throws MQClientException {
    if (null == msg) {
        throw new MQClientException("the message is null", null);
    }
    // topic
    Validators.checkTopic(msg.getTopic());
    // body
    if (null == msg.getBody()) {
        throw new MQClientException("the message body is null", null);
    }

    if (0 == msg.getBody().length) {
        throw new MQClientException("the message body length is zero", null);
    }

    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException("the message body size over max value, MAX: "
                + defaultMQProducer.getMaxMessageSize(), null);
    }
}
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:29,代码来源:Validators.java


示例17: tryToCompressMessage

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
private boolean tryToCompressMessage(final Message msg) {
    byte[] body = msg.getBody();
    if (body != null) {
        if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
            try {
                byte[] data = UtilAll.compress(body, zipCompressLevel);
                if (data != null) {
                    msg.setBody(data);
                    return true;
                }
            }
            catch (IOException e) {
                log.error("tryToCompressMessage exception", e);
                log.warn(msg.toString());
            }
        }
    }

    return false;
}
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:21,代码来源:DefaultMQProducerImpl.java


示例18: send

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
/**
 * KERNEL ASYNC -------------------------------------------------------
 */
public void send(Message msg, MessageQueue mq, SendCallback sendCallback) throws MQClientException,
        RemotingException, InterruptedException {
    // 有效性检查
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);

    if (!msg.getTopic().equals(mq.getTopic())) {
        throw new MQClientException("message's topic not equal mq's topic", null);
    }

    try {
        this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback);
    }
    catch (MQBrokerException e) {
        throw new MQClientException("unknow exception", e);
    }
}
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:21,代码来源:DefaultMQProducerImpl.java


示例19: sendMessageBack

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public void sendMessageBack(MessageExt msg, int delayLevel) throws RemotingException, MQBrokerException,
        InterruptedException, MQClientException {
    try {
        this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(msg,
            this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 3000);
    }
    catch (Exception e) {
        log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);

        Message newMsg =
                new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()),
                    msg.getBody());

        newMsg.setFlag(msg.getFlag());
        MessageAccessor.setProperties(newMsg, msg.getProperties());
        MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());

        this.mQClientFactory.getDefaultMQProducer().send(newMsg);
    }
}
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:21,代码来源:DefaultMQPushConsumerImpl.java


示例20: sendMessageBack

import com.alibaba.rocketmq.common.message.Message; //导入依赖的package包/类
public void sendMessageBack(MessageExt msg, int delayLevel) throws RemotingException, MQBrokerException,
        InterruptedException, MQClientException {
    try {
        this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(msg,
            this.defaultMQPullConsumer.getConsumerGroup(), delayLevel, 3000);
    }
    catch (Exception e) {
        log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e);

        Message newMsg =
                new Message(MixAll.getRetryTopic(this.defaultMQPullConsumer.getConsumerGroup()),
                    msg.getBody());

        newMsg.setFlag(msg.getFlag());
        MessageAccessor.setProperties(newMsg, msg.getProperties());
        MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());

        this.mQClientFactory.getDefaultMQProducer().send(newMsg);
    }
}
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:21,代码来源:DefaultMQPullConsumerImpl.java



注:本文中的com.alibaba.rocketmq.common.message.Message类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java RexBuilder类代码示例发布时间:2022-05-21
下一篇:
Java Application类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap