DagsHub/streaming-client

Adding large amounts of metadata does not work

Opened this issue · 3 comments

As a stress test, I have a repo with 542,247 images in it and wanted to add metadata to a data source. I ran the following code from a Jupyter notebook:

# Set up DagsHub
import os
os.environ["DAGSHUB_CLIENT_HOST"] = "https://test.dagshub.com"

from dagshub.data_engine.model import datasources

repo = "yonomitt/LAION-Aesthetics-V2-6.5plus"
image_root = "data"
try:
    ds = datasources.get_datasource(repo=repo, name="images")
except:
    ds = datasources.create_from_repo(repo=repo, name="images", path=image_root)


# Imports
from tqdm import tqdm


# Add metadata
annotations_file = 'labels.tsv'

with ds.metadata_context() as ctx, open(annotations_file) as f:
    for row in tqdm(f.readlines()):
        image, caption, score = row.split('\t')[:3]
        ctx.update_metadata(image, {'caption': caption, 'score': score})

The first time I ran this, it never returned (I waited several hours). The second time, I got a 502:

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 542247/542247 [00:01<00:00, 342020.81it/s]
---------------------------------------------------------------------------
JSONDecodeError                           Traceback (most recent call last)
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/requests/models.py:971, in Response.json(self, **kwargs)
    970 try:
--> 971     return complexjson.loads(self.text, **kwargs)
    972 except JSONDecodeError as e:
    973     # Catch JSON-related errors and raise as requests.JSONDecodeError
    974     # This aliases json.JSONDecodeError and simplejson.JSONDecodeError

File ~/.miniforge3/envs/dagstest/lib/python3.10/json/__init__.py:346, in loads(s, cls, object_hook, parse_float, parse_int, parse_constant, object_pairs_hook, **kw)
    343 if (cls is None and object_hook is None and
    344         parse_int is None and parse_float is None and
    345         parse_constant is None and object_pairs_hook is None and not kw):
--> 346     return _default_decoder.decode(s)
    347 if cls is None:

File ~/.miniforge3/envs/dagstest/lib/python3.10/json/decoder.py:337, in JSONDecoder.decode(self, s, _w)
    333 """Return the Python representation of ``s`` (a ``str`` instance
    334 containing a JSON document).
    335 
    336 """
--> 337 obj, end = self.raw_decode(s, idx=_w(s, 0).end())
    338 end = _w(s, end).end()

File ~/.miniforge3/envs/dagstest/lib/python3.10/json/decoder.py:355, in JSONDecoder.raw_decode(self, s, idx)
    354 except StopIteration as err:
--> 355     raise JSONDecodeError("Expecting value", s, err.value) from None
    356 return obj, end

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

During handling of the above exception, another exception occurred:

JSONDecodeError                           Traceback (most recent call last)
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/transport/requests.py:243, in RequestsHTTPTransport.execute(self, document, variable_values, operation_name, timeout, extra_args, upload_files)
    242 try:
--> 243     result = response.json()
    245     if log.isEnabledFor(logging.INFO):

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/requests/models.py:975, in Response.json(self, **kwargs)
    972 except JSONDecodeError as e:
    973     # Catch JSON-related errors and raise as requests.JSONDecodeError
    974     # This aliases json.JSONDecodeError and simplejson.JSONDecodeError
--> 975     raise RequestsJSONDecodeError(e.msg, e.doc, e.pos)

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

During handling of the above exception, another exception occurred:

HTTPError                                 Traceback (most recent call last)
File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/transport/requests.py:231, in RequestsHTTPTransport.execute.<locals>.raise_response_error(resp, reason)
    229 try:
    230     # Raise a HTTPError if response status is 400 or higher
--> 231     resp.raise_for_status()
    232 except requests.HTTPError as e:

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/requests/models.py:1021, in Response.raise_for_status(self)
   1020 if http_error_msg:
-> 1021     raise HTTPError(http_error_msg, response=self)

HTTPError: 502 Server Error: Bad Gateway for url: https://test.dagshub.com/api/v1/repos/yonomitt/LAION-Aesthetics-V2-6.5plus/data-engine/graphql

The above exception was the direct cause of the following exception:

TransportServerError                      Traceback (most recent call last)
Cell In[10], line 3
      1 annotations_file = 'labels.tsv'
----> 3 with ds.metadata_context() as ctx, open(annotations_file) as f:
      4     for row in tqdm(f.readlines()):
      5         image, caption, score = row.split('\t')[:3]

File ~/.miniforge3/envs/dagstest/lib/python3.10/contextlib.py:142, in _GeneratorContextManager.__exit__(self, typ, value, traceback)
    140 if typ is None:
    141     try:
--> 142         next(self.gen)
    143     except StopIteration:
    144         return False

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/dagshub/data_engine/model/datasource.py:118, in Datasource.metadata_context(self)
    116 ctx = MetadataContextManager(self)
    117 yield ctx
--> 118 self._upload_metadata(ctx.get_metadata_entries())

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/dagshub/data_engine/model/datasource.py:183, in Datasource._upload_metadata(self, metadata_entries)
    182 def _upload_metadata(self, metadata_entries: List[DatapointMetadataUpdateEntry]):
--> 183     self.source.client.update_metadata(self, metadata_entries)

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/dagshub/data_engine/client/data_client.py:109, in DataClient.update_metadata(self, datasource, entries)
    102 assert len(entries) > 0
    104 params = GqlMutations.update_metadata_params(
    105     datasource_id=datasource.source.id,
    106     datapoints=[e.to_dict() for e in entries]
    107 )
--> 109 return self._exec(q, params)

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/dagshub/data_engine/client/data_client.py:82, in DataClient._exec(self, query, params)
     80     logger.debug(f"Params: {params}")
     81 q = gql.gql(query)
---> 82 resp = self.client.execute(q, variable_values=params)
     83 return resp

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/client.py:403, in Client.execute(self, document, variable_values, operation_name, serialize_variables, parse_result, get_execution_result, **kwargs)
    400     return data
    402 else:  # Sync transports
--> 403     return self.execute_sync(
    404         document,
    405         variable_values=variable_values,
    406         operation_name=operation_name,
    407         serialize_variables=serialize_variables,
    408         parse_result=parse_result,
    409         get_execution_result=get_execution_result,
    410         **kwargs,
    411     )

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/client.py:221, in Client.execute_sync(self, document, variable_values, operation_name, serialize_variables, parse_result, get_execution_result, **kwargs)
    219 """:meta private:"""
    220 with self as session:
--> 221     return session.execute(
    222         document,
    223         variable_values=variable_values,
    224         operation_name=operation_name,
    225         serialize_variables=serialize_variables,
    226         parse_result=parse_result,
    227         get_execution_result=get_execution_result,
    228         **kwargs,
    229     )

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/client.py:849, in SyncClientSession.execute(self, document, variable_values, operation_name, serialize_variables, parse_result, get_execution_result, **kwargs)
    829 """Execute the provided document AST synchronously using
    830 the sync transport.
    831 
   (...)
    845 
    846 The extra arguments are passed to the transport execute method."""
    848 # Validate and execute on the transport
--> 849 result = self._execute(
    850     document,
    851     variable_values=variable_values,
    852     operation_name=operation_name,
    853     serialize_variables=serialize_variables,
    854     parse_result=parse_result,
    855     **kwargs,
    856 )
    858 # Raise an error if an error is returned in the ExecutionResult object
    859 if result.errors:

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/client.py:758, in SyncClientSession._execute(self, document, variable_values, operation_name, serialize_variables, parse_result, **kwargs)
    748         if serialize_variables or (
    749             serialize_variables is None and self.client.serialize_variables
    750         ):
    751             variable_values = serialize_variable_values(
    752                 self.client.schema,
    753                 document,
    754                 variable_values,
    755                 operation_name=operation_name,
    756             )
--> 758 result = self.transport.execute(
    759     document,
    760     variable_values=variable_values,
    761     operation_name=operation_name,
    762     **kwargs,
    763 )
    765 # Unserialize the result if requested
    766 if self.client.schema:

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/transport/requests.py:249, in RequestsHTTPTransport.execute(self, document, variable_values, operation_name, timeout, extra_args, upload_files)
    246         log.info("<<< %s", response.text)
    248 except Exception:
--> 249     raise_response_error(response, "Not a JSON answer")
    251 if "errors" not in result and "data" not in result:
    252     raise_response_error(response, 'No "data" or "errors" keys in answer')

File ~/.miniforge3/envs/dagstest/lib/python3.10/site-packages/gql/transport/requests.py:233, in RequestsHTTPTransport.execute.<locals>.raise_response_error(resp, reason)
    231     resp.raise_for_status()
    232 except requests.HTTPError as e:
--> 233     raise TransportServerError(str(e), e.response.status_code) from e
    235 result_text = resp.text
    236 raise TransportProtocolError(
    237     f"Server did not return a GraphQL result: "
    238     f"{reason}: "
    239     f"{result_text}"
    240 )

TransportServerError: 502 Server Error: Bad Gateway for url: https://test.dagshub.com/api/v1/repos/yonomitt/LAION-Aesthetics-V2-6.5plus/data-engine/graphql

The labels.tsv file can be found here: https://dagshub.com/DagsHub-Datasets/LAION-Aesthetics-V2-6.5plus/src/main/data/labels.tsv

And has 542,247 rows.

The workaround was to batch the metadata uploads:

annotations_file = 'labels.tsv'

all_metadata = []
with open(annotations_file) as f:
    for row in tqdm(f.readlines()):
        image, caption, score = row.split('\t')[:3]
    all_metadata.append((image, {'caption': caption[:255], 'score': score}))

total = len(all_metadata)

batch = 1000
for start in tqdm(range(0, total, batch)):
    data = all_metadata[start:start+batch]

    with ds.metadata_context() as ctx, open(annotations_file) as f:
        for image, metadata in data:
            ctx.update_metadata(image, metadata)

I've copied the batching into the metadata upload, uploading it in batches of 5k points at a time.
Hope that's good enough and we don't need any backend changes