NuclearAPK/Simple-Kafka_Adapter

Информация об ошибке

guest7623 opened this issue · 7 comments

https://github.com/NuclearAPK/Simple-Kafka_Adapter/blob/a2230fa97d1cc5f0b983e8400207942c836da8cb/src/SimpleKafka1C.cpp#LL374C14-L374C14

Думаю здесь надо вывести в лог или интерактивно ошибку, а так по тихому возвращается пустая строка и не ясно почему

Доработка в работе

Можете сделать тестовый релиз с выводом ошибки в этом месте (очень нужен)) )?

В соседней ветке я выложил на yandex disk тестовую сборку с выводом в лог именно в этом месте. Код репозитория я тоже обновил. Но если проблема в ssl - и в лог не выводится информация, то нужно смотреть именно настройки. В настоящий момент я собрал тестовый стенд с ssl, sasl на последней актуальной версии kafka, но не успел еще проверить работу компоненты. До конца недели в планах проверить.

Настройки одинаковые в компоненте 1с и в python клиенте , но python клиент читает топик, а компонента 1с нет
Python клиент:

from confluent_kafka import Consumer

c = Consumer({
    'bootstrap.servers': 'kafka-1.eaist2-f.mos.ru:9192,kafka-2.eaist2-f.mos.ru:9192,kafka-3.eaist2-f.mos.ru:9192',
    'ssl.endpoint.identification.algorithm': 'https',
    'group.id': 'mygroup1',
    'auto.offset.reset': 'beginning',
    'enable.ssl.certificate.verification': 'false',
    'security.protocol': 'sasl_ssl',
    'ssl.ca.location': 'c:\kafka\eaist-ca.cer',    
    'ssl.key.location': 'c:\kafka\EISNSI_eaist-private.pem',
    'ssl.key.password': 'pass1',
    'ssl.certificate.location': 'c:\kafka\cert-signed',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': 'EISNSI',
    'sasl.password': 'pass2'
})

c.subscribe(['eaistf2.eaist.uas2.kafka-output.EISNSI.okei.GetDictionaryItem.v-1'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print('Consumer error: {}'.format(msg.error()))
        continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))
    break

c.close()

1c:

&НаСервере
Процедура ПолучитьСообщенияНаСервере()
	
	Попытка
		Компонента = Новый("AddIn.Integration.simpleKafka1C");
	Исключение
		Подключено = ПодключитьВнешнююКомпоненту("ОбщийМакет.БП_SimpleKafkaAdapter", "Integration", ТипВнешнейКомпоненты.Native, ТипПодключенияВнешнейКомпоненты.Изолированно);
		Если Подключено Тогда
			Компонента = Новый("AddIn.Integration.simpleKafka1C");
		КонецЕсли;
	КонецПопытки;
	
	НастройкиПодключения = Константы.БП_ЕАИСТ_KafkaНастройкиПодключения.Получить().Получить();
	
	Компонента.УстановитьПараметр("ssl.endpoint.identification.algorithm", "https");
	Компонента.УстановитьПараметр("group.id", "mygroup1");
	Компонента.УстановитьПараметр("auto.offset.reset", "beginning");
	Компонента.УстановитьПараметр("enable.ssl.certificate.verification", "false");	
	Компонента.УстановитьПараметр("security.protocol", "sasl_ssl");
	Компонента.УстановитьПараметр("ssl.ca.location", "c:\kafka\eaist-ca.cer");
	Компонента.УстановитьПараметр("ssl.key.location", "c:\kafka\EISNSI_eaist-private.pem");
	Компонента.УстановитьПараметр("ssl.key.password", "pass1");
	Компонента.УстановитьПараметр("ssl.certificate.location", "c:\kafka\cert-signed");
	Компонента.УстановитьПараметр("sasl.mechanism", "PLAIN");
	Компонента.УстановитьПараметр("sasl.username", "EISNSI");
	Компонента.УстановитьПараметр("sasl.password", "pass2");
	
	КаталогЛогов = "c:\Users\USR1CV8\kafka_logs\";
	Если ЗначениеЗаполнено(КаталогЛогов) Тогда
		Компонента.УстановитьФайлЛогирования(СтрШаблон("%1%2%3.log", КаталогЛогов , ПолучитьРазделительПути(), Новый УникальныйИдентификатор));
	КонецЕсли;
		
	Результат = Компонента.ИнициализироватьКонсьюмера("kafka-1.eaist2-f.mos.ru:9192,kafka-2.eaist2-f.mos.ru:9192,kafka-3.eaist2-f.mos.ru:9192", "eaistf2.eaist.uas2.kafka-output.EISNSI.okei.GetDictionaryItem.v-1");
		
	// установка таймаута для ожидания сообщений
	Компонента.УстановитьТаймаутОжидания(1000);	
	
	Если Не Результат Тогда  
		ТекстОшибки = "Не удалось инициализировать консьюмера для топика";  
		ЗаписьЖурналаРегистрации("Интеграция Kafka. Consumer", УровеньЖурналаРегистрации.Ошибка,,, ТекстОшибки);
		Сообщить(ТекстОшибки);
		Возврат;
	КонецЕсли;
	
	ЧтениеJSON = Новый ЧтениеJSON;
	
	пока истина цикл
		Попытка
			Сообщение = Компонента.Слушать(); 
			Сообщить(Сообщение);
		Исключение                	
			Компонента = Неопределено;                                                         
			сообщить(ОписаниеОшибки());
			Прервать;
		КонецПопытки;
		Прервать;
	КонецЦикла;
	
	Компонента.ОстановитьКонсьюмера();	
	Компонента = Неопределено;
	
КонецПроцедуры

Удалось прочитать топик с помощью компоненты 1с - проблема оказалась в таймауте, стоял 1000 мс, поставил 15000 мс и тогда прочитал.
Логирование таймаута тоже стоит добавить в код, сейчас насколько понял его нет

Добавлено логирование в релизе 1.1.0
Коллбеки при доставке и статистика так же доступны
statistics.interval.ms - значение настройки, отличное от 0 включает вывод статистики работы клиента и брокера. Расшифровка статистики https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md
debug - настройка вывода в лог информации:
broker,topic,msg - детальная информация о продюссере
consumer,cgrp,topic,fetch - консьюмер.
Полное описание доступно здесь https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md