touchlab/Stately

java.util.ConcurrentModificationException:, even when using stately's concurrent classes ๐Ÿ‘€

Shabinder opened this issue ยท 24 comments

Exception java.util.ConcurrentModificationException:
  at java.util.ArrayList$Itr.next (ArrayList.java:860)
  at co.touchlab.stately.collections.ConcurrentMutableIterator$next$1.invoke (ConcurrentMutableIterator.java:11)
  at co.touchlab.stately.collections.ConcurrentMutableIterator.next (ConcurrentMutableIterator.java:16)
  at arrow.core.IterableKt.flatten (Iterable.kt)
  at in.shabinder.shared.provider.query.DefaultQueryResolver$getRelatedSongs$2$invokeSuspend$$inlined$buildParMergedIorNelSafely$1$1.invokeSuspend (DefaultQueryResolver.java:165)
  at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith (BaseContinuationImpl.java:12)
  at kotlinx.coroutines.DispatchedTask.run
  at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run (LimitedDispatcher.java:4)
  at kotlinx.coroutines.scheduling.TaskImpl.run
  at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely (CoroutineScheduler.java:1)
  at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask (CoroutineScheduler.java:63)
  at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker (CoroutineScheduler.java:63)
  at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run (CoroutineScheduler.java:63)
Fatal Exception: java.util.ConcurrentModificationException:
       at java.util.ArrayList$Itr.next(ArrayList.java:860)
       at co.touchlab.stately.collections.ConcurrentMutableIterator$next$1.invoke(ConcurrentMutableIterator.java:11)
       at co.touchlab.stately.collections.ConcurrentMutableIterator.next(ConcurrentMutableIterator.java:16)
       at kotlinx.collections.immutable.implementations.immutableList.PersistentVectorBuilder.copyToBuffer(PersistentVectorBuilder.java:12)
       at kotlinx.collections.immutable.implementations.immutableList.PersistentVectorBuilder.addAll(PersistentVectorBuilder.java:11)
       at kotlinx.collections.immutable.implementations.immutableList.SmallPersistentVector.addAll(SmallPersistentVector.java:17)
       at kotlinx.collections.immutable.ExtensionsKt.plus(Extensions.kt:5)
       at kotlinx.collections.immutable.ExtensionsKt.toPersistentList(Extensions.kt:4)
       at kotlinx.collections.immutable.ExtensionsKt.toImmutableList(Extensions.kt:2)
       at in.shabinder.shared.screens.home.discover.search.store.SoundBoundSearchStoreFactory$ExecutorImpl$search$processEmission$1.invokeSuspend(SoundBoundSearchStoreFactory.java:69)
       at in.shabinder.shared.screens.home.discover.search.store.SoundBoundSearchStoreFactory$ExecutorImpl$search$processEmission$1.invoke(SoundBoundSearchStoreFactory.java:21)
       at in.shabinder.shared.screens.home.discover.search.store.SoundBoundSearchStoreFactory$ExecutorImpl$search$processEmission$1.invoke(SoundBoundSearchStoreFactory.java:21)
       

Any more context here? Version, what it's doing, etc?

Version:
statelyVersion = "2.0.0-rc1"
statelyIsoVersion = "2.0.0-rc1"

ConcurrentMutableList -> ImmutableList

fun <T> Iterable<T>.toImmutableList(): ImmutableList<T> =
        this as? ImmutableList
        ?: this.toPersistentList()

from kotlinx.collections.immutable

lmk if anything more is needed form my end

It shouldn't make a difference to the issue, but I would bump up the Stately version when you get a chance. 2.0.5 is the latest.

Yeah, will do, but wont resolve this issue I believe as u said as well.
How would it be possible, ConcurrentMutableList synchronised isnt working/holding up as one would expect ?

to note: all traces are coming from ConcurrentMutableIterator.next()

Because the synchronisation lock is always applied on new instance of ArrayList Iterator, r8?
image

instead shouldn't we use the list itself as a synchronizable lock?

image

If this is the case, lmk, happy to open an MR, but strange how no-one has ever encountered this ๐Ÿ‘€

OK, poking around for a bit now. See how it goes...

Just ran this:

class MiscTest {
    @Test
    fun testImmutableConversion(){
        val l = ConcurrentMutableList<SomeData>()
        repeat(20){l.add(SomeData("arst $it"))}
        val il = l.toImmutableList()
        println(il)
    }
}

data class SomeData(val s: String)

That works. Will dig through your comments in more detail to see if I can find a good repro.

Above Snippet is not concurrent right ๐Ÿ‘€ and being accessed and modifed across multiple threads ๐Ÿ‘€

I actually thought about it after sending this. The problem isn't complicated. Rather obvious looking at it now. I'll have to look at the iterator contract here and think through this a bit.

This fails, no concurrency needed. Just changing the underlying list.

    fun testImmutableConversion(){
        val l = ConcurrentMutableList<SomeData>()
        repeat(20){l.add(SomeData("arst $it"))}
        val iter = l.iterator()
        iter.next()
        l.add(SomeData("Hello"))
        iter.next()
        val il = l.toImmutableList()
        println(il)
    }

The "simple" answer would be to copy the list and return an iterator to that. Obviously, that means making a whole new list, which isn't great. Also, the returned iterator should be a MutableIterator, and changing values to it wouldn't impact the original list, which is bad. In theory, the "proper" implementation would handle underlying changes and handle changes applied to the iterator.

The function above works for iOS, but not JVM (in a single-threaded context only). The plot thickens.

copy the list and return an iterator to that.

In some of my use-cases, above is a deal-breaker since list is too extensive and copying will hurt performance, since same operation runs multiple times in some scenarios.

putting a synchronised lock on list rather than iterator in below shown method would be the simple solution I believe, will have to test, but not at console as of now.
#105 (comment)

putting a synchronised lock on list rather than iterator in below shown method would be the simple solution I believe, will have to test, but not at console as of now.

Not sure what you mean about that. Example code would be good. The issue in my sample code is that any change to the underlying list will "break" the iterator, even in the same thread, so the synchronization doesn't matter. That's JVM-only. On iOS, changes to the underlying list don't break the iterator. For your case, the "simple" solution would be to provide a synchronized method that takes a lambda block, which would allow the creation of the immutable list to happen atomically. Creating and holding onto an iterator, where the delegate may change after the iterator is created, is where this goes off the rails (again, JVM only, although I'm not sure how the native implementations interpret changes to the underlying collection).

One famous lib is using stately under the hood, so the issue is apparent that it indeed happens on JVM only, can reproduce it on Android and Desktop but not on iOS

Kamel-Media/Kamel#75

Stacktrace

java.util.ConcurrentModificationException
                                                                                                    	at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:760)
                                                                                                    	at java.util.LinkedHashMap$LinkedValueIterator.next(LinkedHashMap.java:787)
                                                                                                    	at co.touchlab.stately.collections.ConcurrentMutableIterator$next$1.invoke(ConcurrentMutableCollection.kt:54)
                                                                                                    	at co.touchlab.stately.collections.ConcurrentMutableIterator.next(ConcurrentMutableCollection.kt:85)
                                                                                                    	at io.kamel.core.cache.disk.DiskLruCache.removeOldestEntry(DiskLruCache.kt:615)
                                                                                                    	at io.kamel.core.cache.disk.DiskLruCache.trimToSize(DiskLruCache.kt:608)
                                                                                                    	at io.kamel.core.cache.disk.DiskLruCache.access$trimToSize(DiskLruCache.kt:93)
                                                                                                    	at io.kamel.core.cache.disk.DiskLruCache$launchCleanup$1$1.invokeSuspend(DiskLruCache.kt:654)
                                                                                                    	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
                                                                                                    	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
                                                                                                    	at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:115)
                                                                                                    	at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:103)
                                                                                                    	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
                                                                                                    	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
                                                                                                    	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
                                                                                                    	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)
                                                                                                    	Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@14ec0e7, Dispatchers.IO]

@kpgalligan yeah I recently tried fixing some concurrent modification exceptions some people were experiencing with kamel by moving from LinkedHashMap to stately's ConcurrentMutableMap. https://github.com/Kamel-Media/Kamel/pull/76/files

As @FunkyMuse pointed out some people are still experiencing ConcurrentModificationException. Is there more to using ConcurrentMutableMap correctly than just swapping out use of LinkedHashMap?

We fixed this issue by using the JDK ConcurrentHashMap on JVM, however a fix would be great either way

#112 is also related

We fixed this issue by using the JDK ConcurrentHashMap on JVM, however a fix would be great either way

Yeah, a fix would definitely be needed for other platforms.