Missing onError handler in subscribe() method call
Opened this issue · 7 comments
Before opening, please confirm:
- I have searched for duplicate or closed issues and discussions.
Language and Async Model
Java
Amplify Categories
GraphQL API, DataStore
Gradle script dependencies
implementation 'com.amplifyframework:aws-api:2.14.11'
implementation 'com.amplifyframework:aws-datastore:2.14.11'
coreLibraryDesugaring 'com.android.tools:desugar_jdk_libs:1.1.5'
Environment information
------------------------------------------------------------
Gradle 8.0
------------------------------------------------------------
Build time: 2023-02-13 13:15:21 UTC
Revision: 62ab9b7c7f884426cf79fbedcf07658b2dbe9e97
Kotlin: 1.8.10
Groovy: 3.0.13
Ant: Apache Ant(TM) version 1.10.11 compiled on July 10 2021
JVM: 17.0.6 (JetBrains s.r.o. 17.0.6+0-b2043.56-10027231)
OS: Windows 10 10.0 amd64
Please include any relevant guides or documentation you're referencing
No response
Describe the bug
Since I have included Amplify in our production app on Android, I'm seeing a significant amount of java.util.concurrent.TimeoutException
crashes with the crash message: "The source did not signal an event for 5000 milliseconds and has been terminated."
Full error message: "The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.util.concurrent.TimeoutException: The source did not signal an event for 5000 milliseconds and has been terminated."
This crash already occurred for ~2% of my users, which is quite significant since usually our crash rate is below 0.3%. Is there perhaps an issue in my implementation, or could this be an issue with the Amplify library? (it does mention "missing onError handler in the subscribe() method call") I'm using the DataStore and GraphQL subscriptions, I'll provide a code snippet of the way I use Amplify below.
P.S. Interestingly, I saw a similar crash happening on this GitHub issue from 2020
Full stack trace (if helpful)
Fatal Exception: kf.e
The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.util.concurrent.TimeoutException: The source did not signal an event for 5000 milliseconds and has been terminated.
io.reactivex.rxjava3.internal.observers.CallbackCompletableObserver.accept (CallbackCompletableObserver.java:47)
io.reactivex.rxjava3.internal.observers.CallbackCompletableObserver.accept (CallbackCompletableObserver.java:26)
io.reactivex.rxjava3.internal.observers.CallbackCompletableObserver.onError (CallbackCompletableObserver.java:64)
io.reactivex.rxjava3.internal.operators.completable.CompletablePeek$CompletableObserverImplementation.onError (CompletablePeek.java:95)
io.reactivex.rxjava3.internal.operators.completable.CompletablePeek$CompletableObserverImplementation.onError (CompletablePeek.java:95)
io.reactivex.rxjava3.internal.operators.completable.CompletableSubscribeOn$SubscribeOnObserver.onError (CompletableSubscribeOn.java:74)
io.reactivex.rxjava3.internal.operators.completable.CompletableTimeout$DisposeTask.run (CompletableTimeout.java:109)
io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call (ScheduledDirectTask.java:41)
io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call (ScheduledDirectTask.java:28)
java.util.concurrent.FutureTask.run (FutureTask.java:264)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:307)
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:644)
java.lang.Thread.run (Thread.java:1012)
Caused by java.util.concurrent.TimeoutException
The source did not signal an event for 5000 milliseconds and has been terminated.
io.reactivex.rxjava3.internal.operators.completable.CompletableTimeout$DisposeTask.run (CompletableTimeout.java:109)
io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call (ScheduledDirectTask.java:41)
io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call (ScheduledDirectTask.java:28)
java.util.concurrent.FutureTask.run (FutureTask.java:264)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:307)
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:644)
java.lang.Thread.run (Thread.java:1012)
GraphQL schema
input AMPLIFY { globalAuthRule: AuthRule = { allow: public } } # FOR TESTING ONLY!
enum UnblockStatus {
inProgress
cancelled
success
error
}
type UnblockRequest @model @auth(rules: [{ allow: public }]) {
id: ID!
userEmail: String!
barrier: String!
barrierDifficulty: Int!
timestamp: AWSTimestamp!
}
type UnblockResult @model @auth(rules: [{ allow: public }]) {
id: ID!
userEmail: String!
timestamp: AWSTimestamp!
unblockStatus: UnblockStatus!
}
Reproduction steps (if applicable)
No response
Code Snippet
// This is the relevant code where I send / update data
public void sendUnblockResult(Context context, UnblockStatus unblockStatus) {
UnblockResult unblockResult = UnblockResult.builder()
.userEmail(AppConfig.getInstance(context).getUser().getEmail())
.timestamp(Temporal.Timestamp.now())
.unblockStatus(unblockStatus)
.build();
Amplify.DataStore.save(unblockResult,
success -> {
Log.i(AMPLIFY_DEBUG_TAG, "Saved item: " + success.item());
setLatestUnblockResultId(context, success.item().getId());
},
error -> Log.e(AMPLIFY_DEBUG_TAG, "Could not save item to DataStore", error)
);
}
public void updateUnblockStatus(Context context, UnblockStatus unblockStatus) {
String unblockId = getLatestUnblockId(context);
Amplify.DataStore.query(UnblockResult.class, Where.matches(UnblockResult.ID.eq(unblockId)),
matches -> {
if (matches.hasNext()) {
UnblockResult unblockResult = matches.next();
UnblockResult updatedUnblockResult = unblockResult.copyOfBuilder()
.unblockStatus(unblockStatus)
.timestamp(Temporal.Timestamp.now())
.build();
Amplify.DataStore.save(updatedUnblockResult,
updated -> Log.i(AMPLIFY_DEBUG_TAG, "Updated unblock result: " + updatedUnblockResult),
failure -> Log.e(AMPLIFY_DEBUG_TAG, "Update failed.", failure)
);
}
},
failure -> {
Log.e(AMPLIFY_DEBUG_TAG, "Update failed.", failure);
}
);
}
// And this is how I subscribe to updates, from my MainActivity
Amplify.DataStore.observe(UnblockRequest.class,
cancelable -> Log.i(AmplifyManager.AMPLIFY_DEBUG_TAG, "Observation began."),
unblockRequestChanged -> {
// Do some stuff
},
failure -> Log.e(AmplifyManager.AMPLIFY_DEBUG_TAG, "Observation failed", failure),
() -> Log.i(AmplifyManager.AMPLIFY_DEBUG_TAG, "Observation complete")
);
Log output
// Put your logs below this line
amplifyconfiguration.json
No response
GraphQL Schema
// Put your schema below this line
Additional information and screenshots
No response
Thank you for the report @JornR94. We will need to take a look. Are you able to share the GraphQL schema you're using?
Thanks for your quick reply @mattcreaser -- I've added the GraphQL schema in the bug report description. I still need to update the global auth rule from allow: public
to a production-ready authorization rule
@mattcreaser Did you have any time to look into this already? It's still happening consistently for a handful of users, making the entire app crash. Happy to provide additional info if needed
Hi @JornR94 I have not had a chance to look into this yet. It's still on my radar, I'm hoping to get to it next week.
I don't see an obvious error in your implementation based on the code samples provided. The missing onError
handler is within DataStore, so that's not on your end. Unfortunately the RxJava stack traces obscure the location of where the error might actually be occurring, so it's not entirely obvious where the problem lies.
I do have one suggestion to try. Can you move the call to save
outside of the callback from query
? (this could be as simple as running it on another handler or an executor). There have been some previous issues with similar stacktraces that involved race conditions, so that is one possible thing that jumps out.
Otherwise, any information you can provide is always useful, particularly if it helps to reproduce the crash. Does the crash appear on specific devices or OS versions?
Hi @mattcreaser thanks for your patience in my reply.
I could try moving the save
call outside of the call to query
, but the thing is that this code is live in production, but not actively being used yet. I've added it to production because we're expecting to release another product that interacts with our app soon (through Amplify), but this code for now is not being used by people other than myself and a half dozen testers. So I'm pretty confident the issue is not there (since the save
call only gets called when that part of the code is being used).
What's interesting about this crash: we have thousands of users with this version in production, but only 280 users were affected by this crash in the past 30 days (which is still highly significant, since it accounts for about 80% of all our crashes). So it's far from everyone who experiences these crashes / doesn't seem very reproducible. Another interesting thing is the device distribution:
- 32% Motorola
- 27% Oppo
- 14% Samsung
- 7% Vivo
- 3% OnePlus
- 3% Huawei
There is 1% Google but as you can see, it's mostly specific OEMs from https://dontkillmyapp.com/ that are causing issues, likely due to some specific modifications in their OS? Might be helpful information to try and reproduce on your end, so wanted to share.
I'm also curious how / why these crashes don't happen for other developers using this part of the Amplify SDK, since I would expect more devs to have crashes like this on those types of devices when using this part of the Amplify SDK?
@JornR94 I'm slightly confused - the code snippets you posted are for code that is not being executed by users experiencing the crash? If so, could you clarify what parts of DataStore are being used by crashing users?
On my side I have spent some time looking into this issue. I think the most likely source for this crash is failing to configure datastore here as this subscribe does not have onError
handling and matches the timeout value from the stacktrace.
This error essentially indicates that DataStore could not connect to the local sqlite database. It's not immediately clear why this may happen, why it is only affecting a subset of users, or why other customers have not been reporting the same issue. I'll continue to investigate but here are a few next steps:
- If you are able to add the RxDogTag library to your application that will "tag" the RxJava exception to make it more obvious where the exception is coming from.
- If you are able to collect logs from the crashing devices you can add
Amplify.addPlugin(AndroidLoggingPlugin(LogLevel.VERBOSE))
to your Amplify configuration code (before adding any other plugins) and that will give far more contextual information around the crash. - We will see if we can find a potential cause to explain why the initialization failed and if there is any way to recover. Worst case, we can consume the exception and leave datastore in a "failed initialization" state. This will cause any future datastore operations to result in failures, but it won't crash the application.
Hi @mattcreaser . Little extra context: the last part of the code snippet I shared (in MainActivity, the Amplify.DataStore.observe(UnblockRequest.class
part) is being called for all users in production. In production those other methods that I shared should never be called (apologies for the confusion this caused!)
I was thinking: could there be an issue with configuring Amplify on the devices where I am seeing the crashes? What happens if Amplify's data store or the ApiPlugin don't configure properly, could this cause these exceptions to be thrown? Here's how Amplify is configured, in the top-level app class which extends Application
:
try {
Amplify.addPlugin(new AWSApiPlugin());
Amplify.addPlugin(new AWSDataStorePlugin());
Amplify.configure(this);
Log.i(AmplifyManager.AMPLIFY_DEBUG_TAG, "Initialized Amplify");
} catch (AmplifyException error) {
Log.e(AmplifyManager.AMPLIFY_DEBUG_TAG, "Could not initialize Amplify", error);
}