涡轮机是Kotlinx Flow小型测试库。
flowOf( " one " , " two " ).test {
assertEquals( " one " , awaitItem())
assertEquals( " two " , awaitItem())
awaitComplete()
}涡轮机是一种旋转机械装置,可从流体流中提取能量并将其转换为有用的工作。
- 维基百科
repositories {
mavenCentral()
}
dependencies {
testImplementation( " app.cash.turbine:turbine:1.2.0 " )
}repositories {
maven {
url = uri( " https://oss.sonatype.org/content/repositories/snapshots/ " )
}
}
dependencies {
testImplementation( " app.cash.turbine:turbine:1.3.0-SNAPSHOT " )
}虽然涡轮机自己的API稳定,但目前我们被迫依靠Kotlinx的不稳定API。Coroutinestest Artifact: UnconfinedTestDispatcher 。如果没有这种涡轮机对runTest的使用,将会破裂。未来的Coroutine库更新可能会更改此库的行为。我们将尽一切努力确保行为稳定性,直到该API依赖性稳定为止(跟踪问题#132)。
Turbine是一条薄的包装器,其Channel上的API设计用于测试。
您可以致电awaitItem()暂停并等待将物品发送到Turbine :
assertEquals( " one " , turbine.awaitItem()) ... awaitComplete()暂停,直到Turbine完成没有例外:
turbine.awaitComplete() ...或awaitError()暂停,直到Turbine以Throwable为止。
assertEquals( " broken! " , turbine.awaitError().message)如果await* ,什么也没发生, Turbine将超时和失败而不是悬挂。
使用Turbine完成后,可以通过调用cancel()来清理以终止任何备用旋风。最后,您可以断言所有事件都是通过致电ensureAllEventsConsumed()来消耗的。
创建和运行Turbine最简单方法是从Flow中产生一种。要测试单个Flow ,请调用test扩展名:
someFlow.test {
// Validation code here!
} test启动了一个新的Coroutine,称为someFlow.collect ,并将结果馈送到Turbine中。然后,它调用验证块,以接收者的身份传递仅读取的ReceiveTurbine接口:
flowOf( " one " ).test {
assertEquals( " one " , awaitItem())
awaitComplete()
}验证块完成后, test取消coroutine并调用ensureAllEventsConsumed() 。
要测试多个流量,请通过调用testIn来分配每个Turbine val
runTest {
turbineScope {
val turbine1 = flowOf( 1 ).testIn(backgroundScope)
val turbine2 = flowOf( 2 ).testIn(backgroundScope)
assertEquals( 1 , turbine1.awaitItem())
assertEquals( 2 , turbine2.awaitItem())
turbine1.awaitComplete()
turbine2.awaitComplete()
}
}像test一样, testIn会产生ReceiveTurbine 。呼叫Coroutine完成后,将调用ensureAllEventsConsumed() 。
testIn无法自动清理其Coroutine,因此可以确保运行流终止。使用runTest的backgroundScope ,它将自动处理。否则,请确保在范围结束之前调用以下方法之一:
cancel()awaitComplete()awaitError()否则,您的测试将悬挂。
在基于流Turbine的验证块结束之前,未能消耗所有事件,这将使您的测试失败:
flowOf( " one " , " two " ).test {
assertEquals( " one " , awaitItem())
} Exception in thread "main" AssertionError:
Unconsumed events found:
- Item(two)
- Complete
testIn也是如此,但在通话的结尾处:
runTest {
turbineScope {
val turbine = flowOf( " one " , " two " ).testIn(backgroundScope)
turbine.assertEquals( " one " , awaitItem())
}
} Exception in thread "main" AssertionError:
Unconsumed events found:
- Item(two)
- Complete
但是,可以明确忽略收到的事件。
flowOf( " one " , " two " ).test {
assertEquals( " one " , awaitItem())
cancelAndIgnoreRemainingEvents()
}此外,我们可以收到最新的发射项目并忽略以前的项目。
flowOf( " one " , " two " , " three " )
.map {
delay( 100 )
it
}
.test {
// 0 - 100ms -> no emission yet
// 100ms - 200ms -> "one" is emitted
// 200ms - 300ms -> "two" is emitted
// 300ms - 400ms -> "three" is emitted
delay( 250 )
assertEquals( " two " , expectMostRecentItem())
cancelAndIgnoreRemainingEvents()
}流程终止事件(例外和完成)被视为必须消耗的事件才能验证。因此,例如,在您的flow中抛出一个RuntimeException会不会在测试中引发例外。它将产生涡轮错误事件:
flow { throw RuntimeException ( " broken! " ) }.test {
assertEquals( " broken! " , awaitError().message)
}如果不消耗错误将导致与上述相同的未消耗事件异常,但是除例外是原因,以便可以使用完整的stacktrace。
flow< Nothing > { throw RuntimeException ( " broken! " ) }.test { } app.cash.turbine.TurbineAssertionError: Unconsumed events found:
- Error(RuntimeException)
at app//app.cash.turbine.ChannelTurbine.ensureAllEventsConsumed(Turbine.kt:215)
... 80 more
Caused by: java.lang.RuntimeException: broken!
at example.MainKt$main$1.invokeSuspend(FlowTest.kt:652)
... 105 more
除了从流量创建的应ReceiveTurbine器外,独立的Turbine机还可用于与流程外的测试代码进行通信。到处使用它们,您可能再也不需要runCurrent()了。这是如何在假货中使用Turbine()的示例:
class FakeNavigator : Navigator {
val goTos = Turbine < Screen >()
override fun goTo ( screen : Screen ) {
goTos.add(screen)
}
}runTest {
val navigator = FakeNavigator ()
val events : Flow < UiEvent > =
MutableSharedFlow < UiEvent >(extraBufferCapacity = 50 )
val models : Flow < UiModel > =
makePresenter(navigator).present(events)
models.test {
assertEquals( UiModel (title = " Hi there " ), awaitItem())
events.emit( UiEvent . Close )
assertEquals( Screens . Back , navigator.goTos.awaitItem())
}
}为了支持COROUTINES和非核心代码的混合代码库,独立的Turbine包括非悬浮的兼容兼式API。所有await方法都有相当于无悬的方法take方法:
val navigator = FakeNavigator ()
val events : PublishRelay < UiEvent > = PublishRelay .create()
val models : Observable < UiModel > =
makePresenter(navigator).present(events)
val testObserver = models.test()
testObserver.assertValue( UiModel (title = " Hi there " ))
events.accept( UiEvent . Close )
assertEquals( Screens . Back , navigator.goTos.takeItem())使用takeItem()和朋友, Turbine行为就像简单的队列;使用awaitItem()和朋友,它是Turbine 。
这些方法只能在非悬而未决的上下文中使用。在JVM平台上,它们将在悬浮上下文中使用。
默认情况下,流是异步的。您的流量是通过涡轮在测试代码旁边同时收集的。
处理这种异步性与涡轮机的作用与生产Coroutines代码相同:而不是使用runCurrent()之类的工具来“按”沿着异步流的“推动”,而是使用涡轮机( Turbine )的awaitItem() ,等待complete(),而awaitComplete()和awaitError() ”将它们拉开,然后停车,直到准备好了。
channelFlow {
withContext( IO ) {
Thread .sleep( 100 )
send( " item " )
}
}.test {
assertEquals( " item " , awaitItem())
awaitComplete()
}您的验证代码可能与测试的流量同时运行,但涡轮机将其尽可能地放在驾驶员座椅上: test将在执行验证块时结束,隐含地取消测试的流量。
channelFlow {
withContext( IO ) {
repeat( 10 ) {
Thread .sleep( 200 )
send( " item $it " )
}
}
}.test {
assertEquals( " item 0 " , awaitItem())
assertEquals( " item 1 " , awaitItem())
assertEquals( " item 2 " , awaitItem())
}在任何时候也可以明确取消流量。
channelFlow {
withContext( IO ) {
repeat( 10 ) {
Thread .sleep( 200 )
send( " item $it " )
}
}
}.test {
Thread .sleep( 700 )
cancel()
assertEquals( " item 0 " , awaitItem())
assertEquals( " item 1 " , awaitItem())
assertEquals( " item 2 " , awaitItem())
}可以命名涡轮机以改善错误反馈。将name传递到test , testIn或Turbine() ,并将其包含在被丢弃的任何错误中:
runTest {
turbineScope {
val turbine1 = flowOf( 1 ).testIn(backgroundScope, name = " turbine 1 " )
val turbine2 = flowOf( 2 ).testIn(backgroundScope, name = " turbine 2 " )
turbine1.awaitComplete()
turbine2.awaitComplete()
}
} Expected complete for turbine 1 but found Item(1)
app.cash.turbine.TurbineAssertionError: Expected complete for turbine 1 but found Item(1)
at app//app.cash.turbine.ChannelKt.unexpectedEvent(channel.kt:258)
at app//app.cash.turbine.ChannelKt.awaitComplete(channel.kt:226)
at app//app.cash.turbine.ChannelKt$awaitComplete$1.invokeSuspend(channel.kt)
at app//kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
...
共享的流对执行顺序很敏感。在调用collect之前致电emit将删除发射值:
val mutableSharedFlow = MutableSharedFlow < Int >(replay = 0 )
mutableSharedFlow.emit( 1 )
mutableSharedFlow.test {
assertEquals(awaitItem(), 1 )
} No value produced in 1s
java.lang.AssertionError: No value produced in 1s
at app.cash.turbine.ChannelKt.awaitEvent(channel.kt:90)
at app.cash.turbine.ChannelKt$awaitEvent$1.invokeSuspend(channel.kt)
(Coroutine boundary)
at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTestCoroutine$2.invokeSuspend(TestBuilders.kt:212)
涡轮机的test和testIn方法可以确保在进行前测试的流量将升至第一个悬架点。因此,在排放之前对共享流进行调用test不会下降:
val mutableSharedFlow = MutableSharedFlow < Int >(replay = 0 )
mutableSharedFlow.test {
mutableSharedFlow.emit( 1 )
assertEquals(awaitItem(), 1 )
}如果您的代码在共享流程中收集,请确保它会迅速获得可爱的体验。
Kotlin当前提供的共享流类型类型是:
MutableStateFlowStateFlowMutableSharedFlowSharedFlow每当等待事件时,涡轮机都会应用超时。这是一个壁时钟时间超时,忽略了runTest的虚拟时钟时间。
默认超时长度为三秒钟。可以通过将超时持续时间进行test来覆盖:
flowOf( " one " , " two " ).test(timeout = 10 .milliseconds) {
.. .
}此超时将用于验证块内的所有与涡轮机相关的调用。
您还可以覆盖用testIn和Turbine()创建的涡轮机的超时:
val standalone = Turbine < String >(timeout = 10 .milliseconds)
val flow = flowOf( " one " ).testIn(
scope = backgroundScope,
timeout = 10 .milliseconds,
)这些超时仅适用于施用它们的Turbine 。
最后,您还可以使用withTurbineTimeout out更改整个代码的超时:
withTurbineTimeout( 10 .milliseconds) {
.. .
}涡轮机的大部分API都作为Channel上的扩展实现。 Turbine的API表面更有限,通常是可取的,但是如果您需要,这些扩展也可以作为公共API提供。
Copyright 2018 Square, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.