코루틴은 race condition이 발생하지 않는 것일까?
C, JAVA의 멀티 쓰레드에서 비동기 처리에서는 공유변수에 대한 접근제어를 해왔었다.(세마포어, 뮤텍스, synchronized , etc.) 동기화 제어를 제대로 하지 않으면 원자성에 위배되어, 변경된 데이터의 손실이 일어날 수 있다.
그런데 코틀린 코루틴에 대해 배우면서 아직까지 race condition에 대한 언급은 한 번도 못봤던 것 같다. 서로 다른 coroutineScope에서 공유 변수에 대한 update가 동시에 발생할 때 문제가 생길 가능성은 전혀 없는 것일까?
결론부터 말하자면 멀티스레드와 마찬가지로 동기화 제어를 해주어야 한다.
각 쓰레드는 JVM의 Stack 영역을 차지하고, 코루틴은 각각의 작업이 Object로 할당되어 JVM의 Heap에 적재된다.
하지만 Heap영역이라고 해서 동기화 제어의 필요성에 차이가 있는 것은 아니다.
따라서 내가 알아본 coroutine의 동기화 제어 방법은 아래의 4가지다.
1. Mutex
2. Actor
3. synchronized
4. 쓰레드 가두기(SingleThreadContext)
우선 아래의 코드를 먼저 보자
var counter = 0
suspend fun concurrencyTest(action: suspend () -> Unit) {
val n = 100
val k = 1000
val jobs = List(n) {
CoroutineScope(Dispatchers.Default).launch {
repeat(k) {
action.invoke()
}
}
}
jobs.forEach { it.join() }
}
fun main() = runBlocking {
println("let's go")
concurrencyTest { //no control
counter++
}
println("Counter = $counter")
}
위의 코드를 간략히 설명하자면
concurrencyTest 함수는 100개의 코루틴 스코프를 생성하고 각 코루틴은 1000번의 action을 실행한다. 이때 action은 main함수에서 명시한 counter++이다. 따라서 100*1000의 counter++를 기대한다면 100,000를 출력해야한다.
하지만 실행 결과는?
100,000보다 작은 수가 출력된다. 동기화 제어가 제대로 되지 않았기 때문이다. 멀티 쓰레드의 race condition과 같으니 추가 설명은 생략하겠다.
아래의 4가지 방법은 이러한 동시성 문제를 제어하기 위한 방법들이다.
1. Mutex
main함수 코드를 다음과 같이 수정한다.
val mutex = Mutex()
fun main() = runBlocking {
println("let's go")
concurrencyTest { //mutex
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
상호 배제(Mutual exclusion)를 이용한 Block으로 race condition을 해결할 수 있다. 코루틴에서 제공하는 Mutex로 쉽게 임계구역을 생성할 수 있다.
2. Actor
Actor는 코루틴의 복합체로 구성되어 있으며, Actor 클래스 내부의 channel을 통해 다른 coroutines들과 통신한다.
또한 동기화 이슈가 있는 자원을 Actor 내에서 관리하게 한다. 따라서 Actor는 어떤 context에서 자신을 수행하는지와 상관없이 동기화 문제에서 자유로울 수 있다.
아래 코드의 IncCounter는 counter값을 증가시키고, getCounter는 counter값을 반환한다.
completableDeffered는 한 개의 값을 반환하면서 연산이 완료되어 complete를 호출할 때까지 코드가 block된다. 100,000의 counter++연산이 끝난 후에 getCounter로 send를 하여 값이 반환된다.
sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
@ObsoleteCoroutinesApi
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) {
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
@ObsoleteCoroutinesApi
fun main() = runBlocking {
println("let's go")
val counterActor = counterActor() //Actor
concurrencyTest {
counterActor.send(IncCounter)
}
val response = CompletableDeferred<Int>()
counterActor.send(GetCounter(response))
val cnt = response.await()
counterActor.close() // shutdown the actor
println("Counter = $cnt")
}
sealed는 Java에 없던 키워드인데, abstract class이면서 enum의 확장 버전이다. 여기서 sealed클래스인 counterMsg는 IncCounter, GetCounter의 부모 클래스이다.
3. Synchronized
synchronized 키워드를 통해 같은 결과를 얻을 수 있었다.
fun main() = runBlocking {
println("let's go")
concurrencyTest { //synchronized
synchronized(this) {
counter++
}
}
println("Counter = $counter")
}
4. 쓰레드 가두기(SingleThreadContext)
아래와 같이 newSingleThreadContext 객체를 생성하고 withContext()를 이용하여 코루틴이 실행되는 coroutineContext를 하나로 통일시키는 방법이다.
@ObsoleteCoroutinesApi
val counterContext = newSingleThreadContext("counterContext")
@ObsoleteCoroutinesApi
fun main() = runBlocking {
println("let's go")
concurrencyTest { //newSingleThreadContext
withContext(counterContext){
counter++
}
}
println("Counter = $counter")
}
newSingleThreadContext()의 매개변수는 어떤 context인지 명시하는 것인데, 쓰레드에 이름을 붙인다고 생각하면 될 것 같다.
추가 실험(속도 비교)
동기화 제어를 했을 때 Synchronized > Mutex > Actor > SingleThreadContext 순으로 속도가 빨랐다.
근데 Synchronized가 동시성 제어를 안한것보다 빠르게 나오는건 뭐지..싶어서
n과 k를 각각 10배씩 하여 다시 측정해보았다.
연산량이 많아지니 no control이 가장 빨랐고, 그 다음으로 Synchronized, Mutex, Actor, SingleThreadContext 순서로 오래걸렸다.
SingleThreadContext는 10초나 걸렸다.
도움된 사이트
https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html
https://parksb.github.io/article/16.html
https://lwndnjs93.tistory.com/92
https://aaronryu.github.io/2019/05/27/coroutine-and-thread/
https://velog.io/@jshme/kotlin-coroutines-basic