Adding large amounts of metadata does not work
Opened this issue · 3 comments
yonomitt commented
As a stress test, I have a repo with 542,247 images in it and wanted to add metadata to a data source. I ran the following code from a Jupyter notebook:
# Set up DagsHub
import os
os.environ["DAGSHUB_CLIENT_HOST"] = "https://test.dagshub.com"
from dagshub.data_engine.model import datasources
repo = "yonomitt/LAION-Aesthetics-V2-6.5plus"
image_root = "data"
try:
ds = datasources.get_datasource(repo=repo, name="images")
except:
ds = datasources.create_from_repo(repo=repo, name="images", path=image_root)
# Imports
from tqdm import tqdm
# Add metadata
annotations_file = 'labels.tsv'
with ds.metadata_context() as ctx, open(annotations_file) as f:
for row in tqdm(f.readlines()):
image, caption, score = row.split('\t')[:3]
ctx.update_metadata(image, {'caption': caption, 'score': score})
The first time I ran this, it never returned (I waited several hours). The second time, I got a 502:
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 542247/542247 [00:01<00:00, 342020.81it/s]
---------------------------------------------------------------------------
JSONDecodeError Traceback (most recent call last)
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/requests/models.py:971, in Response.json(self, **kwargs)
970 try:
--> 971 return complexjson.loads(self.text, **kwargs)
972 except JSONDecodeError as e:
973 # Catch JSON-related errors and raise as requests.JSONDecodeError
974 # This aliases json.JSONDecodeError and simplejson.JSONDecodeError
File ~/.miniforge3/envs/dagstest/lib/python3.10/json/__init__.py:346, in loads(s, cls, object_hook, parse_float, parse_int, parse_constant, object_pairs_hook, **kw)
343 if (cls is None and object_hook is None and
344 parse_int is None and parse_float is None and
345 parse_constant is None and object_pairs_hook is None and not kw):
--> 346 return _default_decoder.decode(s)
347 if cls is None:
File ~/.miniforge3/envs/dagstest/lib/python3.10/json/decoder.py:337, in JSONDecoder.decode(self, s, _w)
333 """Return the Python representation of ``s`` (a ``str`` instance
334 containing a JSON document).
335
336 """
--> 337 obj, end = self.raw_decode(s, idx=_w(s, 0).end())
338 end = _w(s, end).end()
File ~/.miniforge3/envs/dagstest/lib/python3.10/json/decoder.py:355, in JSONDecoder.raw_decode(self, s, idx)
354 except StopIteration as err:
--> 355 raise JSONDecodeError("Expecting value", s, err.value) from None
356 return obj, end
JSONDecodeError: Expecting value: line 1 column 1 (char 0)
During handling of the above exception, another exception occurred:
JSONDecodeError Traceback (most recent call last)
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/transport/requests.py:243, in RequestsHTTPTransport.execute(self, document, variable_values, operation_name, timeout, extra_args, upload_files)
242 try:
--> 243 result = response.json()
245 if log.isEnabledFor(logging.INFO):
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/requests/models.py:975, in Response.json(self, **kwargs)
972 except JSONDecodeError as e:
973 # Catch JSON-related errors and raise as requests.JSONDecodeError
974 # This aliases json.JSONDecodeError and simplejson.JSONDecodeError
--> 975 raise RequestsJSONDecodeError(e.msg, e.doc, e.pos)
JSONDecodeError: Expecting value: line 1 column 1 (char 0)
During handling of the above exception, another exception occurred:
HTTPError Traceback (most recent call last)
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/transport/requests.py:231, in RequestsHTTPTransport.execute.<locals>.raise_response_error(resp, reason)
229 try:
230 # Raise a HTTPError if response status is 400 or higher
--> 231 resp.raise_for_status()
232 except requests.HTTPError as e:
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/requests/models.py:1021, in Response.raise_for_status(self)
1020 if http_error_msg:
-> 1021 raise HTTPError(http_error_msg, response=self)
HTTPError: 502 Server Error: Bad Gateway for url: https://test.dagshub.com/api/v1/repos/yonomitt/LAION-Aesthetics-V2-6.5plus/data-engine/graphql
The above exception was the direct cause of the following exception:
TransportServerError Traceback (most recent call last)
Cell In[10], line 3
1 annotations_file = 'labels.tsv'
----> 3 with ds.metadata_context() as ctx, open(annotations_file) as f:
4 for row in tqdm(f.readlines()):
5 image, caption, score = row.split('\t')[:3]
File ~/.miniforge3/envs/dagstest/lib/python3.10/contextlib.py:142, in _GeneratorContextManager.__exit__(self, typ, value, traceback)
140 if typ is None:
141 try:
--> 142 next(self.gen)
143 except StopIteration:
144 return False
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/dagshub/data_engine/model/datasource.py:118, in Datasource.metadata_context(self)
116 ctx = MetadataContextManager(self)
117 yield ctx
--> 118 self._upload_metadata(ctx.get_metadata_entries())
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/dagshub/data_engine/model/datasource.py:183, in Datasource._upload_metadata(self, metadata_entries)
182 def _upload_metadata(self, metadata_entries: List[DatapointMetadataUpdateEntry]):
--> 183 self.source.client.update_metadata(self, metadata_entries)
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/dagshub/data_engine/client/data_client.py:109, in DataClient.update_metadata(self, datasource, entries)
102 assert len(entries) > 0
104 params = GqlMutations.update_metadata_params(
105 datasource_id=datasource.source.id,
106 datapoints=[e.to_dict() for e in entries]
107 )
--> 109 return self._exec(q, params)
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/dagshub/data_engine/client/data_client.py:82, in DataClient._exec(self, query, params)
80 logger.debug(f"Params: {params}")
81 q = gql.gql(query)
---> 82 resp = self.client.execute(q, variable_values=params)
83 return resp
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/client.py:403, in Client.execute(self, document, variable_values, operation_name, serialize_variables, parse_result, get_execution_result, **kwargs)
400 return data
402 else: # Sync transports
--> 403 return self.execute_sync(
404 document,
405 variable_values=variable_values,
406 operation_name=operation_name,
407 serialize_variables=serialize_variables,
408 parse_result=parse_result,
409 get_execution_result=get_execution_result,
410 **kwargs,
411 )
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/client.py:221, in Client.execute_sync(self, document, variable_values, operation_name, serialize_variables, parse_result, get_execution_result, **kwargs)
219 """:meta private:"""
220 with self as session:
--> 221 return session.execute(
222 document,
223 variable_values=variable_values,
224 operation_name=operation_name,
225 serialize_variables=serialize_variables,
226 parse_result=parse_result,
227 get_execution_result=get_execution_result,
228 **kwargs,
229 )
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/client.py:849, in SyncClientSession.execute(self, document, variable_values, operation_name, serialize_variables, parse_result, get_execution_result, **kwargs)
829 """Execute the provided document AST synchronously using
830 the sync transport.
831
(...)
845
846 The extra arguments are passed to the transport execute method."""
848 # Validate and execute on the transport
--> 849 result = self._execute(
850 document,
851 variable_values=variable_values,
852 operation_name=operation_name,
853 serialize_variables=serialize_variables,
854 parse_result=parse_result,
855 **kwargs,
856 )
858 # Raise an error if an error is returned in the ExecutionResult object
859 if result.errors:
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/client.py:758, in SyncClientSession._execute(self, document, variable_values, operation_name, serialize_variables, parse_result, **kwargs)
748 if serialize_variables or (
749 serialize_variables is None and self.client.serialize_variables
750 ):
751 variable_values = serialize_variable_values(
752 self.client.schema,
753 document,
754 variable_values,
755 operation_name=operation_name,
756 )
--> 758 result = self.transport.execute(
759 document,
760 variable_values=variable_values,
761 operation_name=operation_name,
762 **kwargs,
763 )
765 # Unserialize the result if requested
766 if self.client.schema:
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/transport/requests.py:249, in RequestsHTTPTransport.execute(self, document, variable_values, operation_name, timeout, extra_args, upload_files)
246 log.info("<<< %s", response.text)
248 except Exception:
--> 249 raise_response_error(response, "Not a JSON answer")
251 if "errors" not in result and "data" not in result:
252 raise_response_error(response, 'No "data" or "errors" keys in answer')
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/transport/requests.py:233, in RequestsHTTPTransport.execute.<locals>.raise_response_error(resp, reason)
231 resp.raise_for_status()
232 except requests.HTTPError as e:
--> 233 raise TransportServerError(str(e), e.response.status_code) from e
235 result_text = resp.text
236 raise TransportProtocolError(
237 f"Server did not return a GraphQL result: "
238 f"{reason}: "
239 f"{result_text}"
240 )
TransportServerError: 502 Server Error: Bad Gateway for url: https://test.dagshub.com/api/v1/repos/yonomitt/LAION-Aesthetics-V2-6.5plus/data-engine/graphql
yonomitt commented
The labels.tsv file can be found here: https://dagshub.com/DagsHub-Datasets/LAION-Aesthetics-V2-6.5plus/src/main/data/labels.tsv
And has 542,247 rows.
yonomitt commented
The workaround was to batch the metadata uploads:
annotations_file = 'labels.tsv'
all_metadata = []
with open(annotations_file) as f:
for row in tqdm(f.readlines()):
image, caption, score = row.split('\t')[:3]
all_metadata.append((image, {'caption': caption[:255], 'score': score}))
total = len(all_metadata)
batch = 1000
for start in tqdm(range(0, total, batch)):
data = all_metadata[start:start+batch]
with ds.metadata_context() as ctx, open(annotations_file) as f:
for image, metadata in data:
ctx.update_metadata(image, metadata)
kbolashev commented
I've copied the batching into the metadata upload, uploading it in batches of 5k points at a time.
Hope that's good enough and we don't need any backend changes