fix schedule repository
flow
This commit is contained in:
@@ -15,28 +15,25 @@ private const val LOG_TAG = "NETWORK_DEBUG"
|
|||||||
|
|
||||||
@AndroidEntryPoint
|
@AndroidEntryPoint
|
||||||
class MainActivity : ComponentActivity() {
|
class MainActivity : ComponentActivity() {
|
||||||
@Inject
|
|
||||||
lateinit var getScheduleUseCase: GetScheduleUseCase
|
@Inject lateinit var getScheduleUseCase: GetScheduleUseCase
|
||||||
|
|
||||||
override fun onCreate(savedInstanceState: Bundle?) {
|
override fun onCreate(savedInstanceState: Bundle?) {
|
||||||
super.onCreate(savedInstanceState)
|
super.onCreate(savedInstanceState)
|
||||||
|
|
||||||
lifecycleScope.launch {
|
lifecycleScope.launch {
|
||||||
getScheduleUseCase(
|
getScheduleUseCase(ScheduleType.Group("220631"))
|
||||||
ScheduleType.Group("220631") // пример группы
|
.collect { result ->
|
||||||
).collect { result ->
|
|
||||||
when (result) {
|
when (result) {
|
||||||
is DataResult.Data -> {
|
is DataResult.Data -> {
|
||||||
val src = if (result.refreshedFromNetwork) "NETWORK" else "CACHE"
|
val src = if (result.refreshedFromNetwork) "NETWORK" else "CACHE"
|
||||||
Log.d(LOG_TAG, "FROM $src: ${result.data}")
|
Log.d(LOG_TAG, "FROM $src: ${result.data}")
|
||||||
}
|
}
|
||||||
|
|
||||||
is DataResult.Error -> {
|
is DataResult.Error -> {
|
||||||
Log.e(LOG_TAG, "ERROR: ${result.throwable}")
|
Log.e(LOG_TAG, "ERROR", result.throwable)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,10 +1,9 @@
|
|||||||
package ru.fincode.tsudesk.feature.schedule.data
|
package ru.fincode.tsudesk.feature.schedule.data
|
||||||
|
|
||||||
import kotlinx.coroutines.channels.awaitClose
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.channelFlow
|
import kotlinx.coroutines.flow.filterNotNull
|
||||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
import kotlinx.coroutines.flow.first
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.flow.flow
|
||||||
import ru.fincode.tsudesk.core.common.model.DataResult
|
import ru.fincode.tsudesk.core.common.model.DataResult
|
||||||
import ru.fincode.tsudesk.core.network.model.NetworkResult
|
import ru.fincode.tsudesk.core.network.model.NetworkResult
|
||||||
import ru.fincode.tsudesk.core.network.model.map
|
import ru.fincode.tsudesk.core.network.model.map
|
||||||
@@ -15,7 +14,6 @@ import ru.fincode.tsudesk.feature.schedule.data.mapper.ScheduleNetworkMapper
|
|||||||
import ru.fincode.tsudesk.feature.schedule.domain.model.ScheduleEntity
|
import ru.fincode.tsudesk.feature.schedule.domain.model.ScheduleEntity
|
||||||
import ru.fincode.tsudesk.feature.schedule.domain.model.ScheduleType
|
import ru.fincode.tsudesk.feature.schedule.domain.model.ScheduleType
|
||||||
import ru.fincode.tsudesk.feature.schedule.domain.repository.ScheduleRepository
|
import ru.fincode.tsudesk.feature.schedule.domain.repository.ScheduleRepository
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
|
|
||||||
class ScheduleRepositoryImpl @Inject constructor(
|
class ScheduleRepositoryImpl @Inject constructor(
|
||||||
@@ -24,47 +22,43 @@ class ScheduleRepositoryImpl @Inject constructor(
|
|||||||
private val mapper: ScheduleNetworkMapper
|
private val mapper: ScheduleNetworkMapper
|
||||||
) : ScheduleRepository {
|
) : ScheduleRepository {
|
||||||
|
|
||||||
override fun observeSchedule(type: ScheduleType): Flow<DataResult<ScheduleEntity>> =
|
override fun observeSchedule(type: ScheduleType): Flow<DataResult<ScheduleEntity>> = flow {
|
||||||
channelFlow {
|
|
||||||
val key = when (type) {
|
val key = when (type) {
|
||||||
is ScheduleType.Group -> ScheduleCacheKey.group(type.number)
|
is ScheduleType.Group -> ScheduleCacheKey.group(type.number)
|
||||||
is ScheduleType.Teacher -> ScheduleCacheKey.teacher(type.name)
|
is ScheduleType.Teacher -> ScheduleCacheKey.teacher(type.name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Маркер: "следующий DB-emit считать пришедшим после сети"
|
val cached: ScheduleEntity? = local.observeSchedule(key).first()
|
||||||
val markNextAsNetwork = AtomicBoolean(false)
|
|
||||||
|
|
||||||
val cacheJob = launch {
|
|
||||||
local.observeSchedule(key).collect { cached ->
|
|
||||||
if (cached != null) {
|
if (cached != null) {
|
||||||
val flag = markNextAsNetwork.getAndSet(false)
|
emit(DataResult.Data(cached, refreshedFromNetwork = false))
|
||||||
trySend(DataResult.Data(cached, refreshedFromNetwork = flag))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// refresh: сеть -> БД, без эмита сетевых данных
|
val networkResult: NetworkResult<ScheduleEntity> = when (type) {
|
||||||
launch {
|
is ScheduleType.Group ->
|
||||||
val net: NetworkResult<ScheduleEntity> = when (type) {
|
remote.loadScheduleByGroup(type.number).map(mapper::invoke)
|
||||||
is ScheduleType.Group -> remote.loadScheduleByGroup(type.number)
|
|
||||||
.map(mapper::invoke)
|
|
||||||
|
|
||||||
is ScheduleType.Teacher -> remote.loadScheduleByTeacher(type.name)
|
is ScheduleType.Teacher ->
|
||||||
.map(mapper::invoke)
|
remote.loadScheduleByTeacher(type.name).map(mapper::invoke)
|
||||||
}
|
}
|
||||||
|
|
||||||
when (net) {
|
when (networkResult) {
|
||||||
is NetworkResult.Success -> {
|
is NetworkResult.Success -> {
|
||||||
local.saveSchedule(key, net.data) // записали
|
// Важно: timestamp должен обновляться при записи из сети (либо приходить новым)
|
||||||
markNextAsNetwork.set(true) // следующий DB emit пометить
|
local.saveSchedule(key, networkResult.data)
|
||||||
}
|
}
|
||||||
|
|
||||||
is NetworkResult.Error -> {
|
is NetworkResult.Error -> {
|
||||||
trySend(DataResult.Error(Throwable(net.error.toString())))
|
emit(DataResult.Error(Throwable(networkResult.error.toString())))
|
||||||
}
|
return@flow
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
awaitClose { cacheJob.cancel() }
|
// 3) DB snapshot после записи — эмитим второй раз (NETWORK)
|
||||||
|
val updated: ScheduleEntity = local.observeSchedule(key).filterNotNull().first()
|
||||||
|
|
||||||
|
// Если не хочешь дублировать, когда данные те же — включи проверку:
|
||||||
|
// if (cached?.timestamp != updated.timestamp) { ... }
|
||||||
|
|
||||||
|
emit(DataResult.Data(updated, refreshedFromNetwork = true))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package ru.fincode.tsudesk.feature.schedule.data.local
|
|||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.combine
|
import kotlinx.coroutines.flow.combine
|
||||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||||
|
import kotlinx.coroutines.flow.onStart
|
||||||
import ru.fincode.tsudesk.core.database.schedule.ScheduleDao
|
import ru.fincode.tsudesk.core.database.schedule.ScheduleDao
|
||||||
import ru.fincode.tsudesk.feature.schedule.data.datasource.ScheduleLocalDataSource
|
import ru.fincode.tsudesk.feature.schedule.data.datasource.ScheduleLocalDataSource
|
||||||
import ru.fincode.tsudesk.feature.schedule.data.mapper.toCache
|
import ru.fincode.tsudesk.feature.schedule.data.mapper.toCache
|
||||||
@@ -26,8 +27,11 @@ class ScheduleLocalDataSourceImpl @Inject constructor(
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun observeSchedule(key: String): Flow<ScheduleEntity?> {
|
override fun observeSchedule(key: String): Flow<ScheduleEntity?> {
|
||||||
return dao.observeSchedule(key)
|
val scheduleFlow = dao.observeSchedule(key) // Flow<ScheduleCacheEntity?>
|
||||||
.combine(dao.observeLessons(key)) { schedule, lessons ->
|
val lessonsFlow = dao.observeLessons(key)
|
||||||
|
.onStart { emit(emptyList()) }
|
||||||
|
return scheduleFlow
|
||||||
|
.combine(lessonsFlow) { schedule, lessons ->
|
||||||
schedule?.toDomain(lessons)
|
schedule?.toDomain(lessons)
|
||||||
}
|
}
|
||||||
.distinctUntilChanged()
|
.distinctUntilChanged()
|
||||||
|
|||||||
Reference in New Issue
Block a user