Glimpse: Handle query cancellation
This is pretty ugle, the uriFlow works just fine but our signal to stop a currently running query is a _new_ event from an Uri, and .map{} processes each new value independently. Change-Id: I84d61d76090fb9857739ae21021ed46444c00cf1
This commit is contained in:
parent
ed52fd7c8a
commit
70e83a696b
|
@ -14,11 +14,13 @@ import android.os.Handler
|
|||
import android.os.Looper
|
||||
import android.provider.MediaStore
|
||||
import androidx.activity.result.IntentSenderRequest
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.channels.awaitClose
|
||||
import kotlinx.coroutines.flow.callbackFlow
|
||||
import kotlinx.coroutines.flow.conflate
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.isActive
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
|
||||
fun ContentResolver.createDeleteRequest(vararg uris: Uri) = IntentSenderRequest.Builder(
|
||||
MediaStore.createDeleteRequest(this, uris.toCollection(ArrayList()))
|
||||
|
@ -39,30 +41,47 @@ fun ContentResolver.createWriteRequest(vararg uris: Uri) =
|
|||
MediaStore.createWriteRequest(this, uris.toCollection(ArrayList()))
|
||||
).build()
|
||||
|
||||
fun ContentResolver.uriFlow(uri: Uri) = callbackFlow {
|
||||
val observer = object : ContentObserver(Handler(Looper.getMainLooper())) {
|
||||
override fun onChange(selfChange: Boolean) {
|
||||
if (isActive) {
|
||||
trySend(Unit)
|
||||
}
|
||||
}
|
||||
}
|
||||
registerContentObserver(uri, true, observer)
|
||||
|
||||
trySend(Unit)
|
||||
|
||||
awaitClose {
|
||||
unregisterContentObserver(observer)
|
||||
}
|
||||
}
|
||||
|
||||
fun ContentResolver.queryFlow(
|
||||
uri: Uri,
|
||||
projection: Array<String>? = null,
|
||||
queryArgs: Bundle? = Bundle(),
|
||||
cancellationSignal: CancellationSignal? = null
|
||||
) = uriFlow(uri).map {
|
||||
query(
|
||||
uri, projection, queryArgs, cancellationSignal
|
||||
)
|
||||
) = callbackFlow {
|
||||
// Each query will have its own cancellationSignal.
|
||||
// Before running any new query the old cancellationSignal must be cancelled
|
||||
// to ensure the currently running query gets interrupted so that we don't
|
||||
// send data across the channel if we know we received a newer set of data.
|
||||
var cancellationSignal = CancellationSignal()
|
||||
// ContentObserver.onChange can be called concurrently so make sure
|
||||
// access to the cancellationSignal is synchronized.
|
||||
val mutex = Mutex()
|
||||
|
||||
val observer = object : ContentObserver(Handler(Looper.getMainLooper())) {
|
||||
override fun onChange(selfChange: Boolean) {
|
||||
launch(Dispatchers.IO) {
|
||||
mutex.withLock {
|
||||
cancellationSignal.cancel()
|
||||
cancellationSignal = CancellationSignal()
|
||||
}
|
||||
runCatching {
|
||||
trySend(query(uri, projection, queryArgs, cancellationSignal))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
registerContentObserver(uri, true, observer)
|
||||
|
||||
// The first set of values must always be generated and cannot (shouldn't) be cancelled.
|
||||
launch(Dispatchers.IO) {
|
||||
trySend(
|
||||
query(uri, projection, queryArgs, null)
|
||||
)
|
||||
}
|
||||
|
||||
awaitClose {
|
||||
// Stop receiving content changes.
|
||||
unregisterContentObserver(observer)
|
||||
// Cancel any possibly running query.
|
||||
cancellationSignal.cancel()
|
||||
}
|
||||
}.conflate()
|
||||
|
|
|
@ -63,7 +63,6 @@ class MediaFlow(private val context: Context, private val bucketId: Int) : Query
|
|||
uri,
|
||||
projection,
|
||||
queryArgs,
|
||||
null,
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -7,16 +7,14 @@ package org.lineageos.glimpse.viewmodels
|
|||
|
||||
import android.app.Application
|
||||
import androidx.lifecycle.viewModelScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
import kotlinx.coroutines.flow.flowOn
|
||||
import kotlinx.coroutines.flow.shareIn
|
||||
import org.lineageos.glimpse.repository.MediaRepository
|
||||
|
||||
open class AlbumsViewModel(
|
||||
application: Application,
|
||||
) : GlimpseViewModel(application) {
|
||||
val albums = MediaRepository.albums(context).flowOn(Dispatchers.IO).shareIn(
|
||||
val albums = MediaRepository.albums(context).shareIn(
|
||||
viewModelScope,
|
||||
replay = 1,
|
||||
started = SharingStarted.WhileSubscribed()
|
||||
|
|
|
@ -9,9 +9,7 @@ import android.app.Application
|
|||
import androidx.lifecycle.viewModelScope
|
||||
import androidx.lifecycle.viewmodel.initializer
|
||||
import androidx.lifecycle.viewmodel.viewModelFactory
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
import kotlinx.coroutines.flow.flowOn
|
||||
import kotlinx.coroutines.flow.shareIn
|
||||
import org.lineageos.glimpse.repository.MediaRepository
|
||||
import org.lineageos.glimpse.utils.MediaStoreBuckets
|
||||
|
@ -20,7 +18,7 @@ open class MediaViewModel(
|
|||
application: Application,
|
||||
private val bucketId: Int
|
||||
) : GlimpseViewModel(application) {
|
||||
val media = MediaRepository.media(context, bucketId).flowOn(Dispatchers.IO).shareIn(
|
||||
val media = MediaRepository.media(context, bucketId).shareIn(
|
||||
viewModelScope,
|
||||
replay = 1,
|
||||
started = SharingStarted.WhileSubscribed()
|
||||
|
|
Loading…
Reference in New Issue