Performance - Serialization and Validation
sidharthramesh opened this issue · 6 comments
Configuration information
- EHRbase version: 0.24-SNAPSHOT
- openEHR_SDK version: 0.24-SNAPSHOT
Steps to reproduce
I'm trying to directly load compositions into a Postgres database using the SDK.
The data is in the Simplified Flat format, and this needs to be validated and converted into the Database Native format.
The input data is a JSON array of multiple compositions (batches of 1000) that look like this:
[{
"composition": {
"clinikk.prescription_pad.v2/context/_health_care_facility|id": "19700ddf-6b3a-54ad-9e3b-c73f49149130",
"clinikk.prescription_pad.v2/context/_health_care_facility|id_namespace": "Clinikk",
"clinikk.prescription_pad.v2/context/_health_care_facility|id_scheme": "Clinikk",
"clinikk.prescription_pad.v2/context/_health_care_facility|name": "Clinikk",
"ctx/time": "2021-08-02T00:00:00",
"ctx/composer_name": "Migration Agent",
"ctx/language": "en",
"ctx/territory": "IN",
"clinikk.prescription_pad.v2/pulse_oximetry/spo|denominator": 100.0,
"clinikk.prescription_pad.v2/pulse_oximetry/spo|type": 3,
"clinikk.prescription_pad.v2/pulse_oximetry/spo": 0.97,
"clinikk.prescription_pad.v2/pulse_oximetry/spo|numerator": 97,
"clinikk.prescription_pad.v2/blood_pressure/systolic|magnitude": 140,
"clinikk.prescription_pad.v2/blood_pressure/systolic|unit": "mm[Hg]",
"clinikk.prescription_pad.v2/blood_pressure/diastolic|magnitude": 88,
"clinikk.prescription_pad.v2/blood_pressure/diastolic|unit": "mm[Hg]",
"clinikk.prescription_pad.v2/respiration/rate|magnitude": 30,
"clinikk.prescription_pad.v2/respiration/rate|unit": "/min",
"clinikk.prescription_pad.v2/height_length/height_length|magnitude": 164,
"clinikk.prescription_pad.v2/height_length/height_length|unit": "cm",
"clinikk.prescription_pad.v2/body_weight/any_event:0/weight|magnitude": 93,
"clinikk.prescription_pad.v2/body_weight/any_event:0/weight|unit": "kg",
"clinikk.prescription_pad.v2/body_mass_index/body_mass_index|magnitude": 34.58,
"clinikk.prescription_pad.v2/body_mass_index/body_mass_index|unit": "kg/m2",
"clinikk.prescription_pad.v2/body_temperature/temperature|magnitude": 97,
"clinikk.prescription_pad.v2/body_temperature/temperature|unit": "[degF]",
"clinikk.prescription_pad.v2/reason_for_encounter/presenting_problem:0": "k/co asthma \nwith acute exacerbation\n",
"clinikk.prescription_pad.v2/story_history/any_event:0/story:0": "patient came to opd with c/o wheeze and breathing difficulty since yesterday\nh\nl/c/o t2 dm with asthma\nno h/o cough/fever",
"clinikk.prescription_pad.v2/physical_examination_findings/description": "rs rhonchi b/l \nrest examination nad",
"clinikk.prescription_pad.v2/problem_diagnosis:0/problem_diagnosis_name": "asthma\n",
"clinikk.prescription_pad.v2/follow_up/current_activity:0/description": "avoid triggers",
"clinikk.prescription_pad.v2/follow_up/current_activity:0/service_due": "2021-08-05T18:30:00Z",
"clinikk.prescription_pad.v2/follow_up/current_activity:0/service_name": "Follow up",
"clinikk.prescription_pad.v2/ha_follow_up/current_activity:0/service_name": "HA Follow up",
"clinikk.prescription_pad.v2/ha_follow_up/current_activity:0/description": "wheeze"
},
"id": "19700ddf-6b3a-54ad-9e3b-c73f49149130",
"template": "clinikk.prescription_pad.v2",
"ehrId": "b7701e59-e345-5060-818a-0394bb0b8a90"
}]
A snippet from the script that does the conversion looks like:
byte[] bytes = IOUtils.toByteArray(inputStream);
JSONArray inputArray = new JSONArray(new String(bytes));
for (int i = 0; i < inputArray.length(); i++) {
JSONObject input = inputArray.getJSONObject(i);
JSONObject composition = input.getJSONObject("composition");
String id = input.getString("id");
String ehrId = input.getString("ehrId");
Composition flatComposition = unflattner.unmarshal(composition.toString(),
this.webTemplate);
List<ConstraintViolation> result = validator.validate(flatComposition,
template);
if (result.size() > 0) {
throw new ProcessException(id + "- Validation failed: " + result);
}
String rawJson = new RawJson().marshal(flatComposition);
System.out.println("marshalling " + id);
String sqlString = "call put_composition('" + ehrId + "', '" + id +
"', '" + template.getTemplateId().getValue() + "', '" + rawJson +
"','" + systemId + "', '" + id + "');\n";
IOUtils.write(sqlString.getBytes(), outputStream);
}
The put_composition
is a stored procedure on Postgres that will do what's necessary to create a composition, contribution, party and entries in the database.
This takes about < 30ms
/ composition
Actual result
Validation and transforming 3013 compositions in total took a total of - 661.1 seconds running on an M1 Macbook Air (running Java without Rosetta emulation).
The batches of x
compositions each took:
1274 compositions - 297.6s
1108 compositions - 221.8s
631 compositions - 141.7s
Averaging at 219ms
per composition.
Expected result (Acceptance Criteria)
Running validation and transformation operations should at least be as fast as the database insert operation ~ 30ms
to not make the validation and transformation process the choking point on ETL pipelines.
Any other suggestions or workarounds to speed up the process would also be much appreciated!
Definition of Done
- The defect is checked by an unit or an integration test (Robot)
- Merge Request approved
- Unit tests passed
- Build without errors
- Release notes prepared
- No additional runtime warnings
Replace
validator.validate(flatComposition, template);
with
validator.validate(flatComposition, this.webTemplate);
@sidharthramesh
Also you might want to take a look at https://github.com/ehrbase/performance-test/blob/main/src/main/java/org/ehrbase/webtester/service/LoaderServiceImp.java
cc: @vidi42
@stefanspiska thank you for the quick reply. With just that 1 change, it's already much faster:
1274 compositions - 87.4s
1108 compositions - 56.7s
631 compositions - 37.1s
= 181.2s
( 3.6 x faster)
Averaging around 60ms
per composition - so much better. But still technically the bottleneck xD.
I'll look at the Performance Test Loader and see if we can get it lower to around 30ms
.
For context, We're building a Nifi Processor that can ingest compositions in bulk after multiple other ETL pipelines.
I do not thing what you are trying to do is a good idea.
Its one thing to directly insert data to generate a test set or a initial import. But ehrbase is build on the assumption that it has exclusive written access to the data. The DB-Structure can and will change. Also you might hit concurrency and integrity issues. And in the end in the best case you just replicate our service layer.
If you need a batch which run in one transaction you can do that via the contribution Endpoint. (Now supported in the sdk).
If you want throughput your pipeline needs to sent request in parallel. (Ideally the contribution Service would use parallel processing, but this is not yet implemented )
And finally if you do not want to have the rest overhead some other protocols could be added via a plugin, but plugins is a beta feature right now.
Hey @stefanspiska,
I understand my solution is hacky, and yes, I totally expect the database schema to change over time. The points you made about concurrency and integrity are also bothering me now, and it's probably best to seek a proper solution to this - will come in handy for many clients.
2 key requirements to be able to do ETL well - idempotency and batching.
- Idempotency: The ability to run the same process any number of times and NOT have the state of the target system change is essential for experimentation, making mistakes, and fixing these mistakes with live data by just running the whole migration from the begging. Cleaning out a DB in between migrations is not a viable option in live systems. Some messaging system also has an "at-least-once" guarantee - meaning a system might receive the same message twice, and process both of these almost at the same time.
- Batching: For better performance and transactional behavior.
We tried using the EHRbase REST API first, but didn't meet these requirements:
- Idempotent PUT operation on compositions with predefined IDs: EHRbase does not allow creating new compositions with a predefined ID (this might be an openEHR thing too). In most cases, we will need to define the ID of a composition usually a UUID5 hash from another unique ID in another system to sync reliably, including when creating it for the first time.
- Idempotent PUT operation on EHRs with predefined IDs - Creation with ID is already supported by the REST API. However, PUT on an existing EHR errors out instead of being idempotent and succeeding.
- Previous version ID should not be a requirement (optional requirement) - Updating Compositions in the official REST API also requires that a previous composition version ID be provided. This is problematic for ELT pipelines that just want to do an idempotent PUT and not care about the previous version. Retrieving the previous version in the ETL flow adds a significant amount of overhead.
- Batching of multiple operations while keeping all above features - While I think contribution support batching Compositions, I am not sure it supports creating EHRs too? And that too while allowing predefined IDs for creation as well as updates. Ideally it should also not require the previous version ID.