• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java MessageExt类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.alibaba.rocketmq.common.message.MessageExt的典型用法代码示例。如果您正苦于以下问题:Java MessageExt类的具体用法?Java MessageExt怎么用?Java MessageExt使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



MessageExt类属于com.alibaba.rocketmq.common.message包,在下文中一共展示了MessageExt类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public static void main(String[] args){
	DefaultMQPushConsumer consumer =
			new DefaultMQPushConsumer("PushConsumer");
	consumer.setNamesrvAddr("127.0.0.1:9876");
	try {
		//订阅PushTopic下Tag为push的消息
		consumer.subscribe("PushTopic", "push");
		//程序第一次启动从消息队列头取数据
		consumer.setConsumeFromWhere(
				ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		consumer.registerMessageListener(
				new MessageListenerConcurrently() {
					public ConsumeConcurrentlyStatus consumeMessage(
							List<MessageExt> list,
							ConsumeConcurrentlyContext Context) {
						Message msg = list.get(0);
						System.out.println(msg.toString());
						return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
					}
				}
		);
		consumer.start();
	} catch (Exception e) {
		e.printStackTrace();
	}
}
 
开发者ID:youngMen1,项目名称:-Spring-SpringMVC-Mybatis-,代码行数:27,代码来源:Consumer.java


示例2: makeMessageToCosumeAgain

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            for (MessageExt msg : msgs) {
                this.msgTreeMapTemp.remove(msg.getQueueOffset());
                this.msgTreeMap.put(msg.getQueueOffset(), msg);
            }
        }
        finally {
            this.lockTreeMap.writeLock().unlock();
        }
    }
    catch (InterruptedException e) {
        log.error("makeMessageToCosumeAgain exception", e);
    }
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:18,代码来源:ProcessQueue.java


示例3: consumeMessageDirectly

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final ConsumeMessageDirectlyResultRequestHeader requestHeader =
            (ConsumeMessageDirectlyResultRequestHeader) request
                .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);

    final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));

    ConsumeMessageDirectlyResult result =
            this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName());

    if (null != result) {
        response.setCode(ResponseCode.SUCCESS);
        response.setBody(result.encode());
    }
    else {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));
    }

    return response;
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:23,代码来源:ClientRemotingProcessor.java


示例4: viewMessage

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis) throws RemotingException,
        MQBrokerException, InterruptedException {
    ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader();
    requestHeader.setOffset(phyoffset);
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
    case ResponseCode.SUCCESS: {
        ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
        MessageExt messageExt = MessageDecoder.decode(byteBuffer);
        return messageExt;
    }
    default:
        break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:21,代码来源:MQClientAPIImpl.java


示例5: checkLocalTransactionState

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
    System.out.println("server checking TrMsg " + msg.toString());

    int value = transactionIndex.getAndIncrement();
    if ((value % 6) == 0) {
        throw new RuntimeException("Could not find db");
    }
    else if ((value % 5) == 0) {
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    else if ((value % 4) == 0) {
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    return LocalTransactionState.UNKNOW;
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:18,代码来源:TransactionCheckListenerImpl.java


示例6: main

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

    consumer.subscribe("TopicTest", "*");

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();

    System.out.println("Consumer Started.");
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:22,代码来源:Consumer.java


示例7: main

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    producer.start();
    for (int i = 0; i < 1; i++)
        try {
            {
                Message msg = new Message("TopicTest1",// topic
                    "TagA",// tag
                    "key113",// key
                    ("Hello MetaQ").getBytes());// body
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);

                QueryResult queryMessage =
                        producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis());
                for (MessageExt m : queryMessage.getMessageList()) {
                    System.out.println(m);
                }
            }

        }
        catch (Exception e) {
            e.printStackTrace();
        }
    producer.shutdown();
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:27,代码来源:TestProducer.java


示例8: main

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
    String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
    consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl",
        filterCode);

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();

    System.out.println("Consumer Started.");
}
 
开发者ID:y123456yz,项目名称:reading-and-annotate-rocketmq-3.4.6,代码行数:21,代码来源:Consumer.java


示例9: preProcess

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public void preProcess(MessageListenerOrderly t, Object proxy, Method method, Object[] args) {

    List<MessageExt> msgs = (List<MessageExt>) args[0];
    String url = address + "/" + msgs.get(0).getTopic();
    Map<String, Object> params = new HashMap<String, Object>();
    params.put(CaptureConstants.INFO_CLIENT_REQUEST_URL, url);
    params.put(CaptureConstants.INFO_CLIENT_REQUEST_ACTION, "Consumer." + method.getName());
    params.put(CaptureConstants.INFO_CLIENT_APPID, applicationId);
    params.put(CaptureConstants.INFO_CLIENT_TYPE, "rabbitmq.client");
    params.put(CaptureConstants.INFO_CAPCONTEXT_TAG, method.getName());

    if (logger.isDebugable()) {
        logger.debug("Invoke START:" + url + ",op=Consumer." + method.getName(), null);
    }

    UAVServer.instance().runMonitorCaptureOnServerCapPoint(CaptureConstants.CAPPOINT_APP_CLIENT,
            Monitor.CapturePhase.PRECAP, params);

}
 
开发者ID:uavorg,项目名称:uavstack,代码行数:22,代码来源:RocketmqIT.java


示例10: handleRocketMqMessage

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
private void handleRocketMqMessage(List<MessageExt> msgs, MQMessageListener messageListner) {

        for (MessageExt msg : msgs) {
            MQMessage ceMessage = new MQMessage();

            byte[] msgBody = msg.getBody();
            ceMessage.setMessage(msgBody);
            try {
                messageListner.handle(ceMessage);
            }
            catch (Exception e) {
                log.err(this, "MsgId=" + msg.getMsgId() + ",Topic=" + msg.getTopic() + ",MsgBornTimeStamp="
                        + msg.getBornTimestamp() + "处理异常:" + e.getMessage(), e);
            }
        }

    }
 
开发者ID:uavorg,项目名称:uavstack,代码行数:18,代码来源:RocketMQConsumer.java


示例11: consume

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public void consume(String topic, String subExpression) throws MQClientException {

        consumer.subscribe(topic, "*");
//        consumer.subscribe(topic, "TagA || TagB");
//        consumer.subscribe(topic, "*");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                for(MessageExt msg : msgs) {
                    if (msg.getTags() != null && msg.getTags().equals("TagA")) {
                        // 执行TagA的消费
                    }
                    else if (msg.getTags() != null && msg.getTags().equals("TagB")) {
                        // 执行TagB的消费
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

    }
 
开发者ID:TFdream,项目名称:mq-in-action,代码行数:27,代码来源:MqPushConsumer.java


示例12: consumeMessage

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
		ConsumeConcurrentlyContext context) {
	// TODO Auto-generated method stub
	for (MessageExt msg : msgs) {  
           byte[] body = msg.getBody();  
           if (body.length == 2 && body[0] == 0 && body[1] == 0) {  
                 
               LOG.error("Young:Got the end signal");  
               _collector.emit("stop",new Values("stop"));  
               continue;  
           }  
           if (msg.getTopic().equals(RaceConfig.MqPayTopic)) {  
               return doPayTopic(body);  
           }else if (msg.getTopic().equals(RaceConfig.MqTaobaoTradeTopic)) {  
               putTaobaoTradeToTair(body);  
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
           } else if (msg.getTopic().equals(RaceConfig.MqTmallTradeTopic)) {  
               putTmallTradeToTair(body);  
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
           }else {  
               return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
           }  
       }  
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
}
 
开发者ID:yangliguang,项目名称:preliminary.demo,代码行数:27,代码来源:RaceSentenceSpout.java


示例13: consumeMessage

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
	for (MessageExt msg : msgs) {  
           byte[] body = msg.getBody();  
           if (body.length == 2 && body[0] == 0 && body[1] == 0) {  
                 
               LOG.error("Young:Got the end signal");  
               _collector.emit("stop",new Values("stop"));  
               continue;  
           }  
           if (msg.getTopic().equals(RaceConfig.MqPayTopic)) {  
               return doPayTopic(body);  
           }else if (msg.getTopic().equals(RaceConfig.MqTaobaoTradeTopic)) {  
               putTaobaoTradeToTair(body);  
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
           } else if (msg.getTopic().equals(RaceConfig.MqTmallTradeTopic)) {  
               putTmallTradeToTair(body);  
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
           }else {  
               return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
           }  
       }  
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
}
 
开发者ID:yangliguang,项目名称:preliminary.demo,代码行数:25,代码来源:SpoutLocal.java


示例14: testScheduledMessageConsumer

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@Before
public void testScheduledMessageConsumer() throws Exception {
    // Instantiate message consumer
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
    // Subscribe topics
    consumer.subscribe("TestTopic", "*");
    // Register message listener
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
            for (MessageExt message : messages) {
                // Print approximate delay time period
                System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                        + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // Launch consumer
    consumer.start();
}
 
开发者ID:dzh,项目名称:coca,代码行数:22,代码来源:TestScheduledMessageProducer.java


示例15: testBroadcastConsumer

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@Before
public void testBroadcastConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

    // set to broadcast mode
    consumer.setMessageModel(MessageModel.BROADCASTING);

    consumer.subscribe("TopicTest", "TagA || TagC || TagD");

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();
    System.out.printf("Broadcast Consumer Started.%n");
}
 
开发者ID:dzh,项目名称:coca,代码行数:24,代码来源:TestBroadcastProducer.java


示例16: consumeMessage

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@Override
public ConsumeConcurrentlyStatus consumeMessage(String strBody, MessageExt msg,
		ConsumeConcurrentlyContext context)
{
	// TODO 待完善 日志系统
	for (Map<String, String> map : matching)
	{
		Map<String, String> params = new HashMap<String, String>();

		params.put("Topic", msg.getTopic());
		params.put("Tags", msg.getTags());

		if (externalCall == null) params.put("Body", strBody);
		else params.put("Body", externalCall.MessageConsumer(strBody, msg, context));

		return sendMqTags(map, msg.getTags(), params);
	}
	return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}
 
开发者ID:atliwen,项目名称:rocketMqCurrency,代码行数:21,代码来源:ExternalCallConcurrentlyStatus.java


示例17: viewMessage

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis) throws RemotingException,
        MQBrokerException, InterruptedException {
    ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader();
    requestHeader.setOffset(phyoffset);
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
            MessageExt messageExt = MessageDecoder.decode(byteBuffer);
            return messageExt;
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
 
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:21,代码来源:MQClientAPIImpl.java


示例18: testStartupTwice

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
@Test
public void testStartupTwice() throws MQClientException {
    DefaultMQPushConsumer consumer = getConsumer("S_fundmng_demo_producer", "TopicTest-fundmng");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
                                                        final ConsumeConcurrentlyContext context) {
            System.out.println("Consumer1:" + JSON.toJSONString(msgs));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.println("start 1");
    consumer.start();
    System.out.println("start 2");

    LockSupport.park();
}
 
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:18,代码来源:DoubleConsumerTest.java


示例19: consumed

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
public boolean consumed(final MessageExt msg, final String group) throws RemotingException, MQClientException, InterruptedException,
        MQBrokerException {
    ConsumeStats cstats = this.examineConsumeStats(group);

    ClusterInfo ci = this.examineBrokerClusterInfo();

    Iterator<Entry<MessageQueue, OffsetWrapper>> it = cstats.getOffsetTable().entrySet().iterator();
    while (it.hasNext()) {
        Entry<MessageQueue, OffsetWrapper> next = it.next();
        MessageQueue mq = next.getKey();
        if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == msg.getQueueId()) {
            BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName());
            if (brokerData != null) {
                String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) {
                    if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) {
                        return true;
                    }
                }
            }
        }
    }

    return false;
}
 
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:26,代码来源:DefaultMQAdminExtImpl.java


示例20: createBodyFile

import com.alibaba.rocketmq.common.message.MessageExt; //导入依赖的package包/类
private static String createBodyFile(MessageExt msg) throws IOException {
    DataOutputStream dos = null;

    try {
        String bodyTmpFilePath = "/tmp/rocketmq/msgbodys";
        File file = new File(bodyTmpFilePath);
        if (!file.exists()) {
            file.mkdirs();
        }
        bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId();
        dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath));
        dos.write(msg.getBody());
        return bodyTmpFilePath;
    }
    finally {
        if (dos != null)
            dos.close();
    }
}
 
开发者ID:medusar,项目名称:rocketmq-commet,代码行数:20,代码来源:QueryMsgByIdSubCommand.java



注:本文中的com.alibaba.rocketmq.common.message.MessageExt类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java TObjectDoubleProcedure类代码示例发布时间:2022-05-21
下一篇:
Java LittleEndianDataInputStream类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap