• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java MessageListenerConcurrently类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently的典型用法代码示例。如果您正苦于以下问题:Java MessageListenerConcurrently类的具体用法?Java MessageListenerConcurrently怎么用?Java MessageListenerConcurrently使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



MessageListenerConcurrently类属于com.alibaba.rocketmq.client.consumer.listener包,在下文中一共展示了MessageListenerConcurrently类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
public static void main(String[] args){
	DefaultMQPushConsumer consumer =
			new DefaultMQPushConsumer("PushConsumer");
	consumer.setNamesrvAddr("127.0.0.1:9876");
	try {
		//订阅PushTopic下Tag为push的消息
		consumer.subscribe("PushTopic", "push");
		//程序第一次启动从消息队列头取数据
		consumer.setConsumeFromWhere(
				ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		consumer.registerMessageListener(
				new MessageListenerConcurrently() {
					public ConsumeConcurrentlyStatus consumeMessage(
							List<MessageExt> list,
							ConsumeConcurrentlyContext Context) {
						Message msg = list.get(0);
						System.out.println(msg.toString());
						return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
					}
				}
		);
		consumer.start();
	} catch (Exception e) {
		e.printStackTrace();
	}
}
 
开发者ID:youngMen1,项目名称:-Spring-SpringMVC-Mybatis-,代码行数:27,代码来源:Consumer.java


示例2: ConsumeMessageConcurrentlyService

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerConcurrently messageListener) {
    this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
    this.messageListener = messageListener;

    this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
    this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
    this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

    this.consumeExecutor = new ThreadPoolExecutor(//
        this.defaultMQPushConsumer.getConsumeThreadMin(),//
        this.defaultMQPushConsumer.getConsumeThreadMax(),//
        1000 * 60,//
        TimeUnit.MILLISECONDS,//
        this.consumeRequestQueue,//
        new ThreadFactoryImpl("ConsumeMessageThread_"));

    //创建只有一条线程的线程池,他可以在指定延迟后执行线程任务  ScheduledExecutorService定时周期执行指定的任务 线程真正运行submitConsumeRequestLater
    this.scheduledExecutorService =
            Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
                "ConsumeMessageScheduledThread_"));
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:23,代码来源:ConsumeMessageConcurrentlyService.java


示例3: main

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

    consumer.subscribe("TopicTest", "*");

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();

    System.out.println("Consumer Started.");
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:22,代码来源:Consumer.java


示例4: main

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
    String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
    consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl",
        filterCode);

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();

    System.out.println("Consumer Started.");
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:21,代码来源:Consumer.java


示例5: preProcess

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public void preProcess(MessageListenerConcurrently t, Object proxy, Method method, Object[] args) {

    List<MessageExt> msgs = (List<MessageExt>) args[0];
    String url = address + "/" + msgs.get(0).getTopic();

    Map<String, Object> params = new HashMap<String, Object>();
    params.put(CaptureConstants.INFO_CLIENT_REQUEST_URL, url);
    params.put(CaptureConstants.INFO_CLIENT_REQUEST_ACTION, "Consumer." + method.getName());
    params.put(CaptureConstants.INFO_CLIENT_APPID, applicationId);
    params.put(CaptureConstants.INFO_CLIENT_TYPE, "rocketmq.client");
    params.put(CaptureConstants.INFO_CAPCONTEXT_TAG, method.getName());

    if (logger.isDebugable()) {
        logger.debug("Invoke START:" + url + ",op=Consumer." + method.getName(), null);
    }

    UAVServer.instance().runMonitorCaptureOnServerCapPoint(CaptureConstants.CAPPOINT_APP_CLIENT,
            Monitor.CapturePhase.PRECAP, params);

}
 
开发者ID:uavorg,项目名称:uavstack,代码行数:23,代码来源:RocketmqIT.java


示例6: consume

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
public void consume(String topic, String subExpression) throws MQClientException {

        consumer.subscribe(topic, "*");
//        consumer.subscribe(topic, "TagA || TagB");
//        consumer.subscribe(topic, "*");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                for(MessageExt msg : msgs) {
                    if (msg.getTags() != null && msg.getTags().equals("TagA")) {
                        // 执行TagA的消费
                    }
                    else if (msg.getTags() != null && msg.getTags().equals("TagB")) {
                        // 执行TagB的消费
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

    }
 
开发者ID:TFdream,项目名称:mq-in-action,代码行数:27,代码来源:MqPushConsumer.java


示例7: testScheduledMessageConsumer

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
@Before
public void testScheduledMessageConsumer() throws Exception {
    // Instantiate message consumer
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
    // Subscribe topics
    consumer.subscribe("TestTopic", "*");
    // Register message listener
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
            for (MessageExt message : messages) {
                // Print approximate delay time period
                System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                        + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // Launch consumer
    consumer.start();
}
 
开发者ID:dzh,项目名称:coca,代码行数:22,代码来源:TestScheduledMessageProducer.java


示例8: testBroadcastConsumer

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
@Before
public void testBroadcastConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

    // set to broadcast mode
    consumer.setMessageModel(MessageModel.BROADCASTING);

    consumer.subscribe("TopicTest", "TagA || TagC || TagD");

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();
    System.out.printf("Broadcast Consumer Started.%n");
}
 
开发者ID:dzh,项目名称:coca,代码行数:24,代码来源:TestBroadcastProducer.java


示例9: ConsumeMessageConcurrentlyService

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
                                         MessageListenerConcurrently messageListener) {
    this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
    this.messageListener = messageListener;

    this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
    this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
    this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

    /***
     * 消费消息的线程池
     */
    this.consumeExecutor = new ThreadPoolExecutor(//
            this.defaultMQPushConsumer.getConsumeThreadMin(),//
            this.defaultMQPushConsumer.getConsumeThreadMax(),//
            1000 * 60,//
            TimeUnit.MILLISECONDS,//
            this.consumeRequestQueue,//
            new ThreadFactoryImpl("ConsumeMessageThread_"));

    this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
}
 
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:23,代码来源:ConsumeMessageConcurrentlyService.java


示例10: testStartupTwice

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
@Test
public void testStartupTwice() throws MQClientException {
    DefaultMQPushConsumer consumer = getConsumer("S_fundmng_demo_producer", "TopicTest-fundmng");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
                                                        final ConsumeConcurrentlyContext context) {
            System.out.println("Consumer1:" + JSON.toJSONString(msgs));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.println("start 1");
    consumer.start();
    System.out.println("start 2");

    LockSupport.park();
}
 
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:18,代码来源:DoubleConsumerTest.java


示例11: RocketMQTracerConsumer

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
public RocketMQTracerConsumer() throws MQClientException {
	this.consumer = new DefaultMQPushConsumer("MonitorConsumer");
	this.consumer.setNamesrvAddr("127.0.0.1:9876");
	this.consumer.subscribe("dst_span_topic", "*");

	this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
	this.consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
			Map _r = new HashMap<>();
			for(MessageExt mess : msgs){
				JSONArray array = (JSONArray)JSONArray.parse(mess.getBody());
				for(int i = 0;i<array.size();i++ ){
					_r = (Map)array.get(i);
					_r.put("index_name", "application");
					_r.put("type_name", "monitor");
					storage.doStorage(_r);
				}
			}
			
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
	});

	this.consumer.start();
}
 
开发者ID:O2O-Market,项目名称:Market-monitor,代码行数:27,代码来源:RocketMQTracerConsumer.java


示例12: ConsumeMessageConcurrentlyService

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerConcurrently messageListener) {
    this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
    this.messageListener = messageListener;

    this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
    this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
    this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

    this.consumeExecutor = new ThreadPoolExecutor(//
        this.defaultMQPushConsumer.getConsumeThreadMin(),//
        this.defaultMQPushConsumer.getConsumeThreadMax(),//
        1000 * 60,//
        TimeUnit.MILLISECONDS,//
        this.consumeRequestQueue,//
        new ThreadFactoryImpl("ConsumeMessageThread_"));

    this.scheduledExecutorService =
            Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
                "ConsumeMessageScheduledThread_"));
}
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:22,代码来源:ConsumeMessageConcurrentlyService.java


示例13: createMessageListener

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
private MessageListener createMessageListener() {
    return new MessageListenerConcurrently() {
        AtomicLong consumeTimes = new AtomicLong(0);


        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {

            this.consumeTimes.incrementAndGet();
            if ((this.consumeTimes.get() % 11) == 0) {
                System.out.println("Delay 0========Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            else if ((this.consumeTimes.get() % 22) == 0) {
                System.out.println(Thread.currentThread().getName()
                        + "Delay 5========Receive New Messages: " + msgs);
                context.setDelayLevelWhenNextConsume(5);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            System.out.println("No Delay========Receive New Messages: " + msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    };
}
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:26,代码来源:PushConsumerTest.java


示例14: main

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    /**
     * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
     * 如果非第一次启动,那么按照上次消费的位置继续消费
     */
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

    consumer.subscribe("TopicTest", "*");

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();

    System.out.println("Consumer Started.");
}
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:25,代码来源:Consumer.java


示例15: main

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
    consumer.setNamesrvAddr("10.235.170.7:9877");
    /**
     * 使用Java代码,在服务器做消息过滤
     */
    consumer.subscribe("TopicFilter7", MessageFilterImpl.class.getCanonicalName());

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    /**
     * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
     */
    consumer.start();

    System.out.println("Consumer Started.");
}
 
开发者ID:diwayou,项目名称:rocketmq-all-trans,代码行数:26,代码来源:Consumer.java


示例16: main

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

    consumer.subscribe("TopicTest", "*");

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();

    System.out.println("Consumer Started.");
}
 
开发者ID:brucechan0921,项目名称:RocketMQ-3.0.8,代码行数:20,代码来源:Consumer.java


示例17: startConsumer

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
private void startConsumer() throws Exception {
	
	consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
	consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);
	consumer.subscribe(TOPIC, TAG);
	consumer.setConsumeFromWhere(CONSUME_FROM_FIRST_OFFSET);
	
	consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
			Message message = msgs.get(0);
			return consumeMixMessage(message);
		}
	});
	
	consumer.start();
}
 
开发者ID:colddew,项目名称:mix-web,代码行数:18,代码来源:RocketMQService.java


示例18: main

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException, MQClientException {
    CommandLine commandLine = buildCommandline(args);
    if (commandLine != null) {
        String group = commandLine.getOptionValue('g');
        String topic = commandLine.getOptionValue('t');
        String subscription = commandLine.getOptionValue('s');
        final String returnFailedHalf = commandLine.getOptionValue('f');

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
        consumer.setInstanceName(Long.toString(System.currentTimeMillis()));

        consumer.subscribe(topic, subscription);

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            AtomicLong consumeTimes = new AtomicLong(0);


            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                long currentTimes = this.consumeTimes.incrementAndGet();

                System.out.printf("%-8d %s\n", currentTimes, msgs);

                if (Boolean.parseBoolean(returnFailedHalf)) {
                    if ((currentTimes % 2) == 0) {
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:40,代码来源:Consumer.java


示例19: main

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("yyzGroup23");
    consumer.setNamesrvAddr("10.2.223.157:9876;10.2.223.158:9876;10.2.223.159:9876");
   // consumer.setNamesrvAddr("10.2.223.228:9876");
    //consumer.subscribe("my-topic-2", "*", new GroovyScript(groovyScript));
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    //consumer.setMessageModel(MessageModel.CLUSTERING);
    consumer.subscribe("yyztest2", "*");

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override  //consumeMessage在ConsumeMessageConcurrentlyService中的接口consumeMessageDirectly中執行該函數
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            if(msgs == null || msgs.size() == 0){
                System.out.println("not get msgs"); /* Add Trace Log */
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            System.out.println("consumeMessage recv {} Msgs:"+msgs.size());
            for (MessageExt msg : msgs) { /* Add Trace Log */
                System.out.println("Msgid:{}" +msg.getMsgId());
            }

            for(MessageExt messageExt: msgs) {
                System.out.println("recv msg with topic:" + messageExt.getTopic() + ",msgTag:" + messageExt.getTags() +  ", body:" + new String(messageExt.getBody()));
            }
             /* 如果返回不是成功,则该msgs消息会在内存中,offset还是在上次的位置 */
            //业务处理消息后,对返回值的检查在ConsumeRequest.run-> ConsumeMessageConcurrentlyService.processConsumeResult 中
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();

    System.out.println("Consumer Started.");
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:37,代码来源:PushConsumer.java


示例20: postProcess

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; //导入依赖的package包/类
@Override
public Object postProcess(Object res, MessageListenerConcurrently t, Object proxy, Method method,
        Object[] args) {

    doCap(1, method.getName());
    return null;
}
 
开发者ID:uavorg,项目名称:uavstack,代码行数:8,代码来源:RocketmqIT.java



注:本文中的com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java IPageSite类代码示例发布时间:2022-05-21
下一篇:
Java Xembler类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap