Информация об ошибке
guest7623 opened this issue · 7 comments
Думаю здесь надо вывести в лог или интерактивно ошибку, а так по тихому возвращается пустая строка и не ясно почему
Доработка в работе
Можете сделать тестовый релиз с выводом ошибки в этом месте (очень нужен)) )?
В соседней ветке я выложил на yandex disk тестовую сборку с выводом в лог именно в этом месте. Код репозитория я тоже обновил. Но если проблема в ssl - и в лог не выводится информация, то нужно смотреть именно настройки. В настоящий момент я собрал тестовый стенд с ssl, sasl на последней актуальной версии kafka, но не успел еще проверить работу компоненты. До конца недели в планах проверить.
Настройки одинаковые в компоненте 1с и в python клиенте , но python клиент читает топик, а компонента 1с нет
Python клиент:
from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'kafka-1.eaist2-f.mos.ru:9192,kafka-2.eaist2-f.mos.ru:9192,kafka-3.eaist2-f.mos.ru:9192',
'ssl.endpoint.identification.algorithm': 'https',
'group.id': 'mygroup1',
'auto.offset.reset': 'beginning',
'enable.ssl.certificate.verification': 'false',
'security.protocol': 'sasl_ssl',
'ssl.ca.location': 'c:\kafka\eaist-ca.cer',
'ssl.key.location': 'c:\kafka\EISNSI_eaist-private.pem',
'ssl.key.password': 'pass1',
'ssl.certificate.location': 'c:\kafka\cert-signed',
'sasl.mechanism': 'PLAIN',
'sasl.username': 'EISNSI',
'sasl.password': 'pass2'
})
c.subscribe(['eaistf2.eaist.uas2.kafka-output.EISNSI.okei.GetDictionaryItem.v-1'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print('Consumer error: {}'.format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
break
c.close()
1c:
&НаСервере
Процедура ПолучитьСообщенияНаСервере()
Попытка
Компонента = Новый("AddIn.Integration.simpleKafka1C");
Исключение
Подключено = ПодключитьВнешнююКомпоненту("ОбщийМакет.БП_SimpleKafkaAdapter", "Integration", ТипВнешнейКомпоненты.Native, ТипПодключенияВнешнейКомпоненты.Изолированно);
Если Подключено Тогда
Компонента = Новый("AddIn.Integration.simpleKafka1C");
КонецЕсли;
КонецПопытки;
НастройкиПодключения = Константы.БП_ЕАИСТ_KafkaНастройкиПодключения.Получить().Получить();
Компонента.УстановитьПараметр("ssl.endpoint.identification.algorithm", "https");
Компонента.УстановитьПараметр("group.id", "mygroup1");
Компонента.УстановитьПараметр("auto.offset.reset", "beginning");
Компонента.УстановитьПараметр("enable.ssl.certificate.verification", "false");
Компонента.УстановитьПараметр("security.protocol", "sasl_ssl");
Компонента.УстановитьПараметр("ssl.ca.location", "c:\kafka\eaist-ca.cer");
Компонента.УстановитьПараметр("ssl.key.location", "c:\kafka\EISNSI_eaist-private.pem");
Компонента.УстановитьПараметр("ssl.key.password", "pass1");
Компонента.УстановитьПараметр("ssl.certificate.location", "c:\kafka\cert-signed");
Компонента.УстановитьПараметр("sasl.mechanism", "PLAIN");
Компонента.УстановитьПараметр("sasl.username", "EISNSI");
Компонента.УстановитьПараметр("sasl.password", "pass2");
КаталогЛогов = "c:\Users\USR1CV8\kafka_logs\";
Если ЗначениеЗаполнено(КаталогЛогов) Тогда
Компонента.УстановитьФайлЛогирования(СтрШаблон("%1%2%3.log", КаталогЛогов , ПолучитьРазделительПути(), Новый УникальныйИдентификатор));
КонецЕсли;
Результат = Компонента.ИнициализироватьКонсьюмера("kafka-1.eaist2-f.mos.ru:9192,kafka-2.eaist2-f.mos.ru:9192,kafka-3.eaist2-f.mos.ru:9192", "eaistf2.eaist.uas2.kafka-output.EISNSI.okei.GetDictionaryItem.v-1");
// установка таймаута для ожидания сообщений
Компонента.УстановитьТаймаутОжидания(1000);
Если Не Результат Тогда
ТекстОшибки = "Не удалось инициализировать консьюмера для топика";
ЗаписьЖурналаРегистрации("Интеграция Kafka. Consumer", УровеньЖурналаРегистрации.Ошибка,,, ТекстОшибки);
Сообщить(ТекстОшибки);
Возврат;
КонецЕсли;
ЧтениеJSON = Новый ЧтениеJSON;
пока истина цикл
Попытка
Сообщение = Компонента.Слушать();
Сообщить(Сообщение);
Исключение
Компонента = Неопределено;
сообщить(ОписаниеОшибки());
Прервать;
КонецПопытки;
Прервать;
КонецЦикла;
Компонента.ОстановитьКонсьюмера();
Компонента = Неопределено;
КонецПроцедуры
Удалось прочитать топик с помощью компоненты 1с - проблема оказалась в таймауте, стоял 1000 мс, поставил 15000 мс и тогда прочитал.
Логирование таймаута тоже стоит добавить в код, сейчас насколько понял его нет
Добавлено логирование в релизе 1.1.0
Коллбеки при доставке и статистика так же доступны
statistics.interval.ms - значение настройки, отличное от 0 включает вывод статистики работы клиента и брокера. Расшифровка статистики https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md
debug - настройка вывода в лог информации:
broker,topic,msg
- детальная информация о продюссере
consumer,cgrp,topic,fetch
- консьюмер.
Полное описание доступно здесь https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md