Kotlin

[Kotlin] MutableSharedFlow의 tryEmit()이 실행되지 않을 때

728x90

 

 

 SharedFlow는 코루틴에서 쉽게 사용할 수 있는 Hot Stream입니다. 안드로이드에서는 뷰모델에서 이벤트 처리할 때 자주 사용합니다. 이번에 MutableSharedFlow를 사용하며 tryEmit()이 실행되지 않는 이슈가 있었고, 이에 대한 내용을 공유해보고자 합니다.

 

 

 아래에 예시 코드는 뷰모델에서 특정 이벤트를 발생시켜 액티비티에서 이벤트를 처리해주는 코드입니다.

class MainViewModel @Inject constructor() : ViewModel() {

    private val _event = MutableSharedFlow<Event>()
    val event = _event.asSharedFlow()
    
    fun doSomething() {
        _event.tryEmit(ToastEvent("오류입니다."))
    }
}

class MainActivity : Activity() {

    private viewModel: MainViewModel by viewModels()
    
    fun onCreate() {
        lifecycleScope.launch {
            viewModel.event.collect {
                // doSomething
            }
        }
    }
}

 뷰모델의 doSomething() 함수가 실행되면 MutableSharedFlow에 이벤트를 tryEmit() 함수를 통해 담아주고 액티비티에서 소비해줍니다. 그러나 이 코드를 실행하면 아무런 일도 일어나지 않습니다. 왜 그럴까요?

 

 

 SharedFlow 문서의 Unbuffed shared flow 섹션에서는 아래와 같이 설명하고 있습니다.

A default implementation of a shared flow that is created with MutableSharedFlow() constructor function without parameters has no replay cache nor additional buffer. 
emit call to such a shared flow suspends until all subscribers receive the emitted value and returns immediately if there are no subscribers. Thus, tryEmit call succeeds and returns true only if there are no subscribers (in which case the emitted value is immediately lost).

 MutableSharedFlow를 생성할 때 매개 변수 없이 생성하게 되면 buffer가 없는 상태로 만들어지게 됩니다. MutableSharedFlow의 tryEmit() 함수는 Boolean 타입을 반환하는데, buffer가 없는 MutableSharedFlow는 구독자가 없는 경우에만 true를 반환하고 구독자가 있으면 false를 반환합니다. 

 

 

이에 대해서 Roman Elizarov님은 아래와 같이 설명합니다.

tryEmit (unlike emit) is not a suspending function, so it clearly cannot operate without a buffer where it can store emitted value for all the suspending subscribers to process. On the other hand, emit is suspending, so it does not need buffer space, as it can always suspend in case any of the subscribers are not ready yet.

 emit() 함수와 달리 tryEmit() 함수는 suspend function이 아니기 때문에 방출된 값을 저장할 수 있는 버퍼 없이는 작동할 수 없습니다.

 구독자가 있는 상태에서 tryEmit()로 값을 방출하기 위해선 MutableSharedFlow를 생성할 때 buffer가 있게끔 만들어줘야 합니다. MutableSharedFlow의 생성자로 3가지 옵션을 전달하여 SharedFlow의 동작을 정의해줄 수 있습니다.

  • replay : 이전에 내보낸 여러 값을 새 구독자에게 다시 보낼 수 있다.
  • extraBufferCapacity : 추가 버퍼를 생성하여 emit한 데이터가 버퍼에 유지되도록 한다.
  • onBufferOverflow : 버퍼가 가득찼을 때 처리 방법에 대해 정의한다.

 

 

 MutableSharedFlow의 buffer size는 relpay + extraBufferCapacity 값을 합친 값으로 정의됩니다. 따라서 buffer를 사용해주기 위해선 replay나 extraBufferCapacity 값을 1 이상으로 설정해줘야 합니다. 위의 예시 코드에서 extraBufferCapacity = 1로 설정해주면 정상적으로 동작하게 됩니다.

private val _event = MutableSharedFlow<Event>(extraBufferCapacity = 1)
val event = _event.asSharedFlow()

 한 가지 주의할 점은, replay 값은 새로운 구독자가 구독을 시작했을 때 기존에 캐시되어 있던 값들을 방출한다는 특징이 있습니다. 이러한 특징이 필요하다면 replay로, 그게 아니라면 extraBufferCapacity 변수로 정의해주면 좋습니다.

 

 

 또한 buffer에 쌓여 있는 값들이 가득찼을 때 tryEmit을 더 호출한다면 false를 반환할 수 있습니다.

@Test
fun test() = runTest {
    val flow = MutableSharedFlow<String>(extraBufferCapacity = 1)
    val job = launch {
        flow.collect {
            println("value: $it")
        }
    }
    delay(100)
    
    flow.tryEmit("first") // true
    flow.tryEmit("second") // false
    
    delay(100)
    job.cancel()
}

 extraBufferCapacity = 1로 설정해준 뒤 tryEmit을 연속으로 2번 호출해준다면 두번째의 tryEmit은 false를 반환합니다. tryEmit() 함수를 호출했는데 buffer가 가득 찬 상태라면 값을 저장할 수 없기 때문에 false를 반환하게 됩니다.

 이는 MutableSharedFlow에서 buffer가 가득찼을 때 처리해주는 방법이 기본값인 BufferOverflow.SUSPEND이기 때문입니다. tryEmit() 함수는 suspend function이 아니기 때문에 일시 중단을 할 수가 없습니다. 따라서 값이 정상적으로 반환되지 못하고 손실되게 됩니다.

 이를 해결하기 위해서는 buffer size를 더 높게 잡아주거나 onBufferOverflow 파라미터 값을 BufferOverflow.DROP_OLDEST나 BufferOverflow.DROP_LATEST로 설정해주면 됩니다. BufferOverflow.DROP_OLDEST는 buffer에 먼저 들어온 값을 drop 시켜버리고, BufferOverflow.DROP_LATEST는 나중에 들어온 값을 drop 시켜버립니다.

 

 

 

참고

https://blog.danlew.net/2021/03/23/do-or-do-not-there-is-no-tryemit/

 

Do or do not; there is no tryEmit()

In RxJava, PublishSubject [http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/subjects/PublishSubject.html] (or PublishRelay [https://github.com/JakeWharton/RxRelay] if you’re a cool kid) is a gizmo for manually pumping data into streams like in

blog.danlew.net

 

728x90