killme2008/Metamorphosis

发送消息的主题不能自定义吗?

Closed this issue · 7 comments

按照wiki文档中消息主题章节的介绍,为了自定义消息主题,在server.ini中添加了[topic=*]的配置,并且设置了numPartitions属性和[system]中的一致,但是在发送消息的时候,还是抛出There is no aviable partition for topic meta-test,maybe you don't publish it at first?异常,我是采用Spring配置管理的MetaqTemplate来发送Metaq消息的,求解释怎么解决

FAQ 里已经说明了

如何发送任意topic?

消息生产者在发送消息前必须publish要发送的topic,这是为了从zookeeper查找到提供该topic服务的broker。如果你想发送任意的topic类型到broker,你可以这样做:

定义一个topic为*
保证*的numPartitions跟[system]模块配置的一致,生产者发送的任意topic都将默认使用[system]配置的分区数,而发送端在发送前选择分区依赖的是*的分区数,因此需要保证两者相等。
消息生产者设置默认topic为*,设置的方法是MessageProducer的setDefaultTopic方法,但是Message的topic即可为任意topic:

producer.setDefaultTopic("*");

切记:不要publish任何要发送消息的topic,否则将进入正常发送流程,导致找不到分区可用。

使用的Spring配置的发送消息的MetaqTemplate的配置如下:
<bean id ="metaqTemplate" class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate"> <property name="messageSessionFactory" ref="sessionFactory"/> <property name="messageBodyConverter" ref="messageBodyConverter"/> <property name="shareProducer" value="true"/> <property name="defaultTopic" value="*"/> </bean>
server.ini中的配置如下:
[system] brokerId=0 numPartitions=1 serverPort=8123 ........ ....... [topic=*] numPartitions=1 stat=true
所以发送消息的时候没有创建MessageProducer对象实例,而是直接使用metaqTemplate.send(MessageBuilder.withTopic(topic).withBody(msg));方式,这时候就抛出
There is no aviable partition for topic meta-test,maybe you don't publish it at first?这个错误,难道我自定义消息主题,必须使用手动创建MessageProducer实例,然后producer.setDefaultTopic("*"),这样?

确保你没有 publish 过 meta-test

发送代码如下,你看看我哪里像调用过publish方法?
`
public class MetaqPublisher {

Logger logger= LoggerFactory.getLogger(MetaqPublisher.class);


@Autowired
@Qualifier("metaqTemplate")
private MetaqTemplate metaqTemplate;

public MetaqTemplate getMetaqTemplate() {
    return metaqTemplate;
}

public void setMetaqTemplate(MetaqTemplate metaqTemplate) {
    this.metaqTemplate = metaqTemplate;
}

public void sendMsg(String topic,Object msg) {

    try {
            logger.info("请求发送消息了,消息topic:{},消息内容:{}",topic,msg);
            SendResult result=metaqTemplate.send(MessageBuilder.withTopic(topic).withBody(msg));
            if(result.isSuccess()){
                logger.info("消息发送成功了,broker id:{},偏移量:{}",result.getPartition().getBrokerId(),result.getOffset());
            }else {
               logger.info("消息发送失败了,失败原因:{}",result.getErrorMessage());
            }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}`

我这里的入参topic就要开发者自定义,而且server.ini配置文件中,我就留了一个[topic=*],其他的topic配置都删了,不存在

你先尝试下代码能不能发送,确保走代码是可以的,理论上 spring 跟代码是没有区别的,只是一层包装。

2016-06-29 11:44 GMT+08:00 smart-vole notifications@github.com:

发送代码如下,你看看我哪里像调用过publish方法?
`
public class MetaqPublisher {

Logger logger= LoggerFactory.getLogger(MetaqPublisher.class);

@Autowired
@qualifier("metaqTemplate")
private MetaqTemplate metaqTemplate;

public MetaqTemplate getMetaqTemplate() {
return metaqTemplate;
}

public void setMetaqTemplate(MetaqTemplate metaqTemplate) {
this.metaqTemplate = metaqTemplate;
}

public void sendMsg(String topic,Object msg) {

try {
        logger.info("请求发送消息了,消息topic:{},消息内容:{}",topic,msg);
        SendResult result=metaqTemplate.send(MessageBuilder.withTopic(topic).withBody(msg));
        if(result.isSuccess()){
            logger.info("消息发送成功了,broker id:{},偏移量:{}",result.getPartition().getBrokerId(),result.getOffset());
        }else {
           logger.info("消息发送失败了,失败原因:{}",result.getErrorMessage());
        }
} catch (Exception e) {
    e.printStackTrace();
}

}

}`

我这里的入参topic就要开发者自定义,而且server.ini配置文件中,我就留了一个[topic=*],其他的topic配置都删了,不存在


You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub
#126 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAA3PiLDEzUGnmTZcYZilobpXDACtMwpks5qQeoJgaJpZM4JAuTB
.

庄晓丹
Email: killme2008@gmail.com xzhuang@avos.com
Site: http://fnil.net
Twitter: @killme2008

走代码是可以的,但是Spring还是不行,不知道你们是不是单元测试过Spring这种自定义消息主题发送的方式.....

直接走代码发送示例如下:

` public class Publisher {

public static void main(String[] args) throws Exception {
    final MetaClientConfig metaClientConfig = new MetaClientConfig();
    final ZkUtils.ZKConfig zkConfig = new ZkUtils.ZKConfig();
    zkConfig.zkConnect = "127.0.0.1:2181";
    metaClientConfig.setZkConfig(zkConfig);
    MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
    MessageProducer producer = sessionFactory.createProducer();
    producer.setDefaultTopic("*");
    SendResult sendResult=producer.sendMessage(new Message("myTopic", "AAA".getBytes()));
    if (!sendResult.isSuccess())
    {
        System.err.println("自定义主题消息发送失败了,失败原因:" + sendResult.getErrorMessage());
    }
    else {
        System.out.println("自定义主题消息发送成功了,Broker id: " + sendResult.getPartition());
    }
}

}`

打印的日志信息如下:

`13:22:03.381 [notify-remoting-reactor-0] DEBUG c.t.gecko.service.impl.GeckoHandler - 连接建立,远端信息:192.168.20.162:8123

13:22:03.383 [notify-remoting-reactor-0] DEBUG c.t.g.core.core.impl.AbstractSession - session started

13:22:03.384 [main] WARN c.t.m.c.producer.ProducerZooKeeper - Connected to meta://192.168.20.162:8123

13:22:03.384 [main] WARN c.t.m.c.producer.ProducerZooKeeper - End receiving broker changes for topic *

13:22:03.474 [notify-remoting-reactor-3] DEBUG c.t.m.n.MetamorphosisWireFormatType$MetaCodecFactory - Receive command:result 200 25 -2147483648
13:22:03.477 [notify-remoting-reactor-3] DEBUG c.t.g.core.core.impl.AbstractSession - read 52 bytes from channel

自定义主题消息发送成功了,Broker id: 0-1

13:22:03.805 [notify-remoting-ScanAllConnection-1-thread-1] DEBUG
c.t.g.service.impl.DefaultConnection - 移除0个无效回调`

但是走Spring代理就是不行,而且查看Spring的代码,metaqTemplate.send(MessageBuilder builder)
方法,你们的实现如下:
`
public SendResult send(MessageBuilder builder) throws InterruptedException {

    Message msg = builder.build(this.messageBodyConverter);
    String topic = msg.getTopic();
    MessageProducer producer = this.getOrCreateProducer(topic);

    try {
        return producer.sendMessage(msg);
    } catch (MetaClientException var6) {
        return new SendResult(false, (Partition)null, -1L, ExceptionUtils.getFullStackTrace(var6));
    }
}`

其实方法的第3行,也是创建了MessageProducer的实例,在public MessageProducer getOrCreateProducer(final String topic) {....}这个方法的最后,其实还是进行了setDefaultTopic()和publish()操作,你们的代码如下:

` if(this.sharedProducer == null) {
synchronized(this) {
if(this.sharedProducer == null) {
this.sharedProducer = this.messageSessionFactory.createProducer();
if(!StringUtils.isBlank(this.defaultTopic)) {
this.sharedProducer.setDefaultTopic(this.defaultTopic);
}
}
}
}

        this.sharedProducer.publish(topic);
        return this.sharedProducer;`