Погрузитесь в мир потоков Kotlin с помощью этого подробного сравнения SharedFlow и StateFlow. Вот обзор обоих типов потоков и вариантов их использования:
SharedFlow
и StateFlow
являются частями библиотеки Kotlin kotlinx.coroutines
, специально разработанной для обработки асинхронных потоков данных. Оба построены поверх Flow
и предназначены для разных целей.
- Общий поток:
SharedFlow
— это горячий поток, который может иметь несколько коллекторов. Он может выдавать значения независимо от сборщиков, и несколько сборщиков могут собирать одни и те же значения из потока.- Это полезно, когда вам нужно передать значение нескольким сборщикам или когда вы хотите иметь несколько подписчиков на один и тот же поток данных.
- У него нет начального значения, и вы можете настроить его кэш воспроизведения для хранения определенного количества ранее выпущенных значений для новых сборщиков.
Пример использования:
val sharedFlow = MutableSharedFlow<Int>() // Collect values from sharedFlow launch { sharedFlow.collect { value -> println("Collector 1 received: $value") } } // Collect values from sharedFlow launch { sharedFlow.collect { value -> println("Collector 2 received: $value") } } // Emit values to sharedFlow launch { repeat(3) { i -> sharedFlow.emit(i) } }
- СтатусФлоу:
StateFlow
— это горячий поток, представляющий состояние, в каждый момент времени удерживающий одно значение. Это также объединенный поток, означающий, что при передаче нового значения самое последнее значение сохраняется и немедленно передается новым сборщикам.- Это полезно, когда вам нужно поддерживать единый источник достоверности для состояния и автоматически обновлять все сборщики последним состоянием.
- Он всегда имеет начальное значение и хранит только последнее сгенерированное значение.
Пример использования:
val mutableStateFlow = MutableStateFlow(0) val stateFlow: StateFlow<Int> = mutableStateFlow // Collect values from stateFlow launch { stateFlow.collect { value -> println("Collector 1 received: $value") } } // Collect values from stateFlow launch { stateFlow.collect { value -> println("Collector 2 received: $value") } } // Update the state launch { repeat(3) { i -> mutableStateFlow.value = i } }
Рекомендации
Вот несколько рекомендаций по использованию SharedFlow
и StateFlow
в Kotlin:
- Выберите правильный поток:
- Используйте
SharedFlow
, когда вам нужно передать значения нескольким коллекторам или когда вы хотите иметь несколько подписчиков на один и тот же поток данных. - Используйте
StateFlow
, когда вам нужно поддерживать и совместно использовать единый источник достоверной информации для состояния и автоматически обновлять все сборщики последним состоянием.
2. Инкапсулировать изменяемые потоки:
Предоставьте доступную только для чтения версию вашего изменяемого потока, чтобы предотвратить внешние изменения. Этого можно добиться, используя интерфейс SharedFlow
для MutableSharedFlow
и интерфейс StateFlow
для MutableStateFlow
.
class ExampleViewModel { private val _mutableSharedFlow = MutableSharedFlow<Int>() // Represents this mutable shared flow as a read-only shared flow. val sharedFlow = _mutableSharedFlow.asSharedFlow() private val _mutableStateFlow = MutableStateFlow(0) // Represents this mutable state flow as a read-only state flow. val stateFlow = _mutableStateFlow.asStateFlow() }
3. Правильно управляйте ресурсами:
При использовании SharedFlow
или StateFlow
убедитесь, что вы правильно управляете ресурсами, отменяя сопрограммы или сборщики, когда они больше не нужны.
val scope = CoroutineScope(Dispatchers.Main) val sharedFlow = MutableSharedFlow<Int>() val job = scope.launch { sharedFlow.collect { value -> println("Received: $value") } } // Later, when the collector is no longer needed job.cancel()
4. Разумно используйте конфигурации буфера и воспроизведения:
- Для
SharedFlow
вы можете установить емкость буфера и емкость воспроизведения. Выберите подходящую емкость буфера, чтобы избежать проблем с обратным давлением, и установите емкость воспроизведения в соответствии с требованиями вашего варианта использования. - Для
StateFlow
имейте в виду, что у него всегда есть кеш воспроизведения размером 1, что означает, что он сохраняет самое последнее значение для новых сборщиков.
val sharedFlow = MutableSharedFlow<Int>( replay = 2, // Replay the last 2 emitted values to new collectors extraBufferCapacity = 8 // Extra buffer capacity to avoid backpressure )
5. Используйте combine
, map
, filter
и другие операторы:
Воспользуйтесь преимуществами операторов потока Kotlin для преобразования, объединения или фильтрации данных по мере необходимости. Это помогает создавать более выразительный и эффективный код.
val flow1 = MutableStateFlow(1) val flow2 = MutableStateFlow(2) val combinedFlow = flow1.combine(flow2) { value1, value2 -> value1 + value2 } // Collect and print the sum of flow1 and flow2 launch { combinedFlow.collect { sum -> println("Sum: $sum") } }
6. Правильно обрабатывайте ошибки:
При использовании потоков убедитесь, что вы правильно обрабатываете исключения. Используйте оператор catch
для обработки исключений в конвейере потока и оператор onCompletion
для выполнения операций очистки или реагирования на завершение потока.
val flow = flow { emit(1) throw RuntimeException("Error occurred") emit(2) }.catch { e -> // Handle the exception and emit a default value emit(-1) } launch { flow.collect { value -> println("Received: $value") } }
7. Используйте lifecycleScope
, repeatOnLifecycle
и другие операторы, учитывающие жизненный цикл:
При работе с Android или другими платформами с жизненным циклом используйте операторов, поддерживающих жизненный цикл, для автоматического управления жизненным циклом потока.
class MyFragment : Fragment() { private val viewModel: MyViewModel by viewModels() override fun onViewCreated(view: View, savedInstanceState: Bundle?) { super.onViewCreated(view, savedInstanceState) viewLifecycleOwner.lifecycleScope.launch { // Suspend the coroutine until the lifecycle is DESTROYED. // repeatOnLifecycle launches the block in a new coroutine every time the // lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED. repeatOnLifecycle(Lifecycle.State.STARTED) { // Safely collect from locations when the lifecycle is STARTED // and stop collecting when the lifecycle is STOPPED viewModel.stateFlow.collect { value -> // Update UI with the value } } // Note: at this point, the lifecycle is DESTROYED! } } }
Реальные примеры
Сценарий использования StateFlow: оперативные данные
Предположим, у вас есть приложение, которое отображает оперативные данные, такие как курсы акций, информацию о погоде или сообщения чата. StateFlow можно использовать для сохранения последних данных и автоматического обновления пользовательского интерфейса.
// StockViewModel.kt class StockViewModel { // Create a private MutableStateFlow property to hold the stock price private val _stockPrice = MutableStateFlow(0.0) // Create a public StateFlow property that exposes the stock price as an immutable value val stockPrice = _stockPrice.asStateFlow() init { // Launch a coroutine in the viewModelScope to update the stock price viewModelScope.launch { updateStockPrice() } } private suspend fun updateStockPrice() { while (true) { delay(1000) // Update every second val newPrice = fetchNewPrice() // Update the stock price using the MutableStateFlow's value property _stockPrice.value = newPrice } } // Private function to fetch the new stock price from an API or data source private suspend fun fetchNewPrice(): Double { // TODO: Fetch the new stock price from an API or data source return 0.0 } } // MainActivity.kt class MainActivity : AppCompatActivity() { private val stockViewModel = StockViewModel() override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // Launch a coroutine in the lifecycleScope to observe changes to the stock price lifecycleScope.launch { // Use the repeatOnLifecycle function to ensure the coroutine // is active only when the activity is started repeatOnLifecycle(Lifecycle.State.STARTED) { // Observe changes to the stock price using the collect operator stockViewModel.stockPrice.collect { price -> // Update the UI with the new stock price stockPriceTextView.text = "Stock Price: $price" } } } } }
Сценарий использования SharedFlow 1: приложение для обмена сообщениями в чате
Предположим, мы хотим создать приложение для чата в реальном времени, используя SharedFlow и лучшие практики. У нас будет один ChatRepository
, который имитирует получение сообщений чата, один ChatViewModel
, который обрабатывает поток сообщений, и действие Android для отображения сообщений.
- ChatRepository.kt: имитация отправки и получения сообщений для нескольких пользователей.
class ChatRepository { private val coroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) // Create a private MutableSharedFlow property to hold incoming chat messages private val _incomingMessages = MutableSharedFlow<ChatMessage>(extraBufferCapacity = 64) // Create a public SharedFlow property that exposes incoming chat messages as an immutable value val incomingMessages: _incomingMessages.asSharedFlow() init { // Call the simulateIncomingMessages function to simulate incoming chat messages simulateIncomingMessages() } /** * Sends a outgoing chat message to the server, and emit it to the flow * to be displayed on the UI */ fun sendMessage(username: String, content: String) { // Launch a coroutine in the IO scope to emit a chat message to the _incomingMessages flow coroutineScope.launch { _incomingMessages.emit(ChatMessage(username, content)) } } private fun simulateIncomingMessages() { coroutineScope.launch { while (true) { // Wait for a random amount of time between 500 // and 2000 milliseconds to simulate random message delays // Simulate random message delays delay(Random.nextLong(500, 2000)) // Create a new chat message with a random sender and content val message = ChatMessage("User ${Random.nextInt(1, 6)}", "Hello, world!") // Emit the chat message to the _incomingMessages flow _incomingMessages.emit(message) } } } /** * Cancels all coroutines launched by the [ChatRepository] instance. */ fun cancel() { coroutineScope.cancel() } } data class ChatMessage(val sender: String, val content: String)
2. ChatViewModel.kt: создайте ChatViewModel, которая позволяет отправлять сообщения и предоставляет доступ к входящим сообщениям.
class ChatViewModel(private val chatRepository: ChatRepository) : ViewModel() { /** * A [SharedFlow] of incoming chat messages. * * The flow emits incoming chat messages as they arrive, * and can be observed by clients to display the chat history. */ val incomingMessages: SharedFlow<ChatMessage> = chatRepository.incomingMessages fun sendMessage(username: String, content: String) { chatRepository.sendMessage(username, content) } }
3. ChatActivity.kt
class ChatActivity : AppCompatActivity() { private val viewModel: ChatViewModel by viewModels() private val chatAdapter = ChatAdapter() override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // Setup UI components and adapter initialization setContentView(R.layout.activity_chat) val recyclerView = findViewById<RecyclerView>(R.id.recyclerView) recyclerView.layoutManager = LinearLayoutManager(this) recyclerView.adapter = chatAdapter // Observer incoming messages as they arrive and display it on the list lifecycleScope.launch { viewModel.incomingMessages.collect { message -> conversationAdapter.addMessage(message) binding.recyclerView.scrollToPosition(conversationAdapter.itemCount - 1) } } } } // RecyclerView to display incoming messages on the list class ChatAdapter : RecyclerView.Adapter<ChatAdapter.ViewHolder>() { private val messages = mutableListOf<ChatMessage>() fun addMessage(message: ChatMessage) { messages.add(message) notifyItemInserted(messages.size - 1) } override fun onCreateViewHolder(parent: ViewGroup, viewType: Int): ViewHolder { val view = LayoutInflater.from(parent.context).inflate(R.layout.item_chat_message, parent, false) return ViewHolder(view) } override fun onBindViewHolder(holder: ViewHolder, position: Int) { holder.bind(messages[position]) } override fun getItemCount() = messages.size class ViewHolder(itemView: View) : RecyclerView.ViewHolder(itemView) { private val senderTextView: TextView = itemView.findViewById(R.id.senderTextView) private val contentTextView: TextView = itemView.findViewById(R.id.contentTextView) fun bind(chatMessage: ChatMessage) { senderTextView.text = chatMessage.sender contentTextView.text = chatMessage.content } } }
Проверьте ChatRepository.kt в действии:
Сценарий использования SharedFlow 2: шина событий
Вот более сложный пример использования общего потока. Предположим, мы хотим создать шину событий, которая транслирует события нескольким слушателям с помощью SharedFlow.
- EventBus.kt: определите общий класс EventBus с общим потоком.
/** * An event bus implementation that uses a shared flow to * broadcast events to multiple listeners. */ class EventBus<T> { // Create a private MutableSharedFlow property to hold events private val _events = MutableSharedFlow<T>(replay = 0, extraBufferCapacity = 64) // Create a public SharedFlow property that exposes events as an immutable value val events: Flow<T> = _events.asSharedFlow() /** * Sends an event to the shared flow. */ suspend fun sendEvent(event: T) { _events.emit(event) } }
2. Event.kt: определение различных типов событий.
sealed class Event { object EventA : Event() object EventB : Event() data class EventC(val value: Int) : Event() }
3. EventListener.kt: создайте класс EventListener, отвечающий за подписку на определенные типы событий.
class EventListener( private val eventBus: EventBus<Event>, private val scope: CoroutineScope ) { init { // Subscribe to the events flow using the onEach operator eventBus.events .onEach { event -> // Use a when expression to handle different types of events when (event) { is Event.EventA -> handleEventA() is Event.EventB -> handleEventB() is Event.EventC -> handleEventC(event.value) } } // Launch the event listener in the given coroutine scope // It can cancel the subscription when scope is not present any more .launchIn(scope) } // Private functions to handle specific types of events private fun handleEventA() { println("EventA received") } private fun handleEventB() { println("EventB received") } private fun handleEventC(value: Int) { println("EventC received with value: $value") } }
4. Main.kt: создайте экземпляры EventBus и EventListener, затем отправьте события с помощью EventBus.
fun main() = runBlocking { val eventBus = EventBus<Event>() // Instantiate EventListener to start listening for events from the eventBus val eventListener = EventListener(eventBus, this) // Send events using the eventBus in a separate coroutine launch(Dispatchers.Default) { delay(1000) eventBus.sendEvent(Event.EventA) delay(1000) eventBus.sendEvent(Event.EventB) delay(1000) eventBus.sendEvent(Event.EventC(42)) } // Keep the main coroutine scope active to let the listener process the events delay(5000) }
Обратите внимание, что переменная eventListener
по-прежнему явно не используется в функции main
. Однако его создание запускает блок init
внутри класса EventListener
, который подписывается на события из класса eventBus
.
Переменная eventListener
создается для того, чтобы экземпляр EventListener
хранился в памяти, а не собирался мусор, что помешало бы ему получать события. Создав переменную eventListener
и назначив ей экземпляр EventListener
, слушатель останется активным, а события, отправленные eventBus
, обрабатываются должным образом.
В этом примере демонстрируется более расширенное использование SharedFlow, где создается универсальный класс EventBus для трансляции событий нескольким прослушивателям. Класс EventListener прослушивает определенные типы событий и обрабатывает их соответствующим образом. Код следует рекомендациям, таким как инкапсуляция изменяемых потоков, управление ресурсами и эффективная обработка различных типов событий.
В этом посте я постарался осветить все, что вам нужно о SharedFlow
и StateFlow
. Я надеюсь, что эта статья поможет вам найти лучший способ использовать их в своем приложении.
Посетите репозиторий чата, чтобы узнать, как использовать SharedFlow и StateFlow на практике, и, пожалуйста, дайте мне знать ваше мнение об этой статье.