ElasticMock doesn't support cluster and _trasport methods
AxelSirota opened this issue · 3 comments
When trying to mock Elasticsearch bulk api:
Error
Traceback (most recent call last):
File "/home/axel/Mulesoft/arm-performance/integration_tests/functional/ElasticStatsDeployerTest.py", line 38, in test_push_stats
elastic_instance=self.elastic_instance
File "/home/axel/Mulesoft/arm-performance/src/modules/stats/online/stats_processing/ElasticStatsDeployer.py", line 56, in post_process_stats
self.push_stats_to_elastic_search(elastic_instance, actions_list)
File "/home/axel/Mulesoft/arm-performance/src/modules/stats/online/stats_processing/ElasticStatsDeployer.py", line 60, in push_stats_to_elastic_search
helpers.bulk(elastic_instance, actions_list, chunk_size=ElasticStatsDeployer.max_chunk_size, max_chunk_bytes=ElasticStatsDeployer.max_bytes_size)
File "/home/axel/Mulesoft/virtualenvs/anypoint3.5/lib/python3.5/site-packages/elasticsearch/helpers/init.py", line 195, in bulk
for ok, item in streaming_bulk(client, actions, **kwargs):
File "/home/axel/Mulesoft/virtualenvs/anypoint3.5/lib/python3.5/site-packages/elasticsearch/helpers/init.py", line 162, in streaming_bulk
for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer):
AttributeError: 'FakeElasticsearch' object has no attribute 'transport'
Also, when trying to use cluster.health methods:
Error
Traceback (most recent call last):
File "/home/axel/Mulesoft/arm-performance/integration_tests/functional/ElasticStatsDeployerTest.py", line 38, in test_push_stats
elastic_instance=self.elastic_instance
File "/home/axel/Mulesoft/arm-performance/src/modules/stats/online/stats_processing/ElasticStatsDeployer.py", line 38, in post_process_stats
if not elastic_instance.cluster.health(wait_for_status='green', request_timeout=10):
AttributeError: 'FakeElasticsearch' object has no attribute 'cluster'
I can help add this to the FakeElasticMock class, but I do not know how to in principle :)
Same issue here. Example of workaround with MagicMock
:
from unittest.mock import MagicMock
es = ...
es.transport = MagicMock()
Bulk API was recently mocked in this PR: #30
I'll provide a cluster.health
mock soon
WIll be done on next release (probably 1.5.0)