Погрузитесь в мир потоков Kotlin с помощью этого подробного сравнения SharedFlow и StateFlow. Вот обзор обоих типов потоков и вариантов их использования:

SharedFlow и StateFlow являются частями библиотеки Kotlin kotlinx.coroutines, специально разработанной для обработки асинхронных потоков данных. Оба построены поверх Flow и предназначены для разных целей.

  1. Общий поток:
  • SharedFlow — это горячий поток, который может иметь несколько коллекторов. Он может выдавать значения независимо от сборщиков, и несколько сборщиков могут собирать одни и те же значения из потока.
  • Это полезно, когда вам нужно передать значение нескольким сборщикам или когда вы хотите иметь несколько подписчиков на один и тот же поток данных.
  • У него нет начального значения, и вы можете настроить его кэш воспроизведения для хранения определенного количества ранее выпущенных значений для новых сборщиков.

Пример использования:

val sharedFlow = MutableSharedFlow<Int>()

// Collect values from sharedFlow
launch {
    sharedFlow.collect { value ->
        println("Collector 1 received: $value")
    }
}

// Collect values from sharedFlow
launch {
    sharedFlow.collect { value ->
        println("Collector 2 received: $value")
    }
}

// Emit values to sharedFlow
launch {
    repeat(3) { i ->
        sharedFlow.emit(i)
    }
}
  1. СтатусФлоу:
  • StateFlow — это горячий поток, представляющий состояние, в каждый момент времени удерживающий одно значение. Это также объединенный поток, означающий, что при передаче нового значения самое последнее значение сохраняется и немедленно передается новым сборщикам.
  • Это полезно, когда вам нужно поддерживать единый источник достоверности для состояния и автоматически обновлять все сборщики последним состоянием.
  • Он всегда имеет начальное значение и хранит только последнее сгенерированное значение.

Пример использования:

val mutableStateFlow = MutableStateFlow(0)
val stateFlow: StateFlow<Int> = mutableStateFlow

// Collect values from stateFlow
launch {
    stateFlow.collect { value ->
        println("Collector 1 received: $value")
    }
}

// Collect values from stateFlow
launch {
    stateFlow.collect { value ->
        println("Collector 2 received: $value")
    }
}

// Update the state
launch {
    repeat(3) { i ->
        mutableStateFlow.value = i
    }
}

Рекомендации

Вот несколько рекомендаций по использованию SharedFlow и StateFlow в Kotlin:

  1. Выберите правильный поток:
  • Используйте SharedFlow, когда вам нужно передать значения нескольким коллекторам или когда вы хотите иметь несколько подписчиков на один и тот же поток данных.
  • Используйте StateFlow, когда вам нужно поддерживать и совместно использовать единый источник достоверной информации для состояния и автоматически обновлять все сборщики последним состоянием.

2. Инкапсулировать изменяемые потоки:

Предоставьте доступную только для чтения версию вашего изменяемого потока, чтобы предотвратить внешние изменения. Этого можно добиться, используя интерфейс SharedFlow для MutableSharedFlow и интерфейс StateFlow для MutableStateFlow.

class ExampleViewModel {
    private val _mutableSharedFlow = MutableSharedFlow<Int>()
    // Represents this mutable shared flow as a read-only shared flow.
    val sharedFlow = _mutableSharedFlow.asSharedFlow()

    private val _mutableStateFlow = MutableStateFlow(0)
    // Represents this mutable state flow as a read-only state flow.
    val stateFlow = _mutableStateFlow.asStateFlow()
}

3. Правильно управляйте ресурсами:

При использовании SharedFlow или StateFlow убедитесь, что вы правильно управляете ресурсами, отменяя сопрограммы или сборщики, когда они больше не нужны.

val scope = CoroutineScope(Dispatchers.Main)

val sharedFlow = MutableSharedFlow<Int>()

val job = scope.launch {
    sharedFlow.collect { value ->
        println("Received: $value")
    }
}

// Later, when the collector is no longer needed
job.cancel()

4. Разумно используйте конфигурации буфера и воспроизведения:

  • Для SharedFlow вы можете установить емкость буфера и емкость воспроизведения. Выберите подходящую емкость буфера, чтобы избежать проблем с обратным давлением, и установите емкость воспроизведения в соответствии с требованиями вашего варианта использования.
  • Для StateFlow имейте в виду, что у него всегда есть кеш воспроизведения размером 1, что означает, что он сохраняет самое последнее значение для новых сборщиков.
val sharedFlow = MutableSharedFlow<Int>(
    replay = 2, // Replay the last 2 emitted values to new collectors
    extraBufferCapacity = 8 // Extra buffer capacity to avoid backpressure
)

5. Используйте combine, map, filter и другие операторы:

Воспользуйтесь преимуществами операторов потока Kotlin для преобразования, объединения или фильтрации данных по мере необходимости. Это помогает создавать более выразительный и эффективный код.

val flow1 = MutableStateFlow(1)
val flow2 = MutableStateFlow(2)

val combinedFlow = flow1.combine(flow2) { value1, value2 ->
    value1 + value2
}

// Collect and print the sum of flow1 and flow2
launch {
    combinedFlow.collect { sum ->
        println("Sum: $sum")
    }
}

6. Правильно обрабатывайте ошибки:

При использовании потоков убедитесь, что вы правильно обрабатываете исключения. Используйте оператор catch для обработки исключений в конвейере потока и оператор onCompletion для выполнения операций очистки или реагирования на завершение потока.

val flow = flow {
    emit(1)
    throw RuntimeException("Error occurred")
    emit(2)
}.catch { e ->
    // Handle the exception and emit a default value
    emit(-1)
}

launch {
    flow.collect { value ->
        println("Received: $value")
    }
}

7. Используйте lifecycleScope, repeatOnLifecycle и другие операторы, учитывающие жизненный цикл:

При работе с Android или другими платформами с жизненным циклом используйте операторов, поддерживающих жизненный цикл, для автоматического управления жизненным циклом потока.

class MyFragment : Fragment() {
    private val viewModel: MyViewModel by viewModels()

    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)

        viewLifecycleOwner.lifecycleScope.launch {
            // Suspend the coroutine until the lifecycle is DESTROYED.
            // repeatOnLifecycle launches the block in a new coroutine every time the
            // lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED.
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                // Safely collect from locations when the lifecycle is STARTED
                // and stop collecting when the lifecycle is STOPPED
                viewModel.stateFlow.collect { value ->
                    // Update UI with the value
                }
            }
            // Note: at this point, the lifecycle is DESTROYED!
        }
    }
}

Реальные примеры

Сценарий использования StateFlow: оперативные данные

Предположим, у вас есть приложение, которое отображает оперативные данные, такие как курсы акций, информацию о погоде или сообщения чата. StateFlow можно использовать для сохранения последних данных и автоматического обновления пользовательского интерфейса.

// StockViewModel.kt
class StockViewModel {
    // Create a private MutableStateFlow property to hold the stock price
    private val _stockPrice = MutableStateFlow(0.0)
    // Create a public StateFlow property that exposes the stock price as an immutable value
    val stockPrice = _stockPrice.asStateFlow()

    init {
        // Launch a coroutine in the viewModelScope to update the stock price
        viewModelScope.launch {
            updateStockPrice()
        }
    }

    private suspend fun updateStockPrice() {
        while (true) {
            delay(1000) // Update every second
            val newPrice = fetchNewPrice()
            // Update the stock price using the MutableStateFlow's value property
            _stockPrice.value = newPrice
        }
    }

    // Private function to fetch the new stock price from an API or data source
    private suspend fun fetchNewPrice(): Double {
        // TODO: Fetch the new stock price from an API or data source
        return 0.0
    }
}

// MainActivity.kt
class MainActivity : AppCompatActivity() {
    private val stockViewModel = StockViewModel()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        // Launch a coroutine in the lifecycleScope to observe changes to the stock price
        lifecycleScope.launch {
            // Use the repeatOnLifecycle function to ensure the coroutine 
            // is active only when the activity is started
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                // Observe changes to the stock price using the collect operator
                stockViewModel.stockPrice.collect { price ->
                    // Update the UI with the new stock price
                    stockPriceTextView.text = "Stock Price: $price"
                }
            }
        }
    }
}

Сценарий использования SharedFlow 1: приложение для обмена сообщениями в чате

Предположим, мы хотим создать приложение для чата в реальном времени, используя SharedFlow и лучшие практики. У нас будет один ChatRepository, который имитирует получение сообщений чата, один ChatViewModel, который обрабатывает поток сообщений, и действие Android для отображения сообщений.

  1. ChatRepository.kt: имитация отправки и получения сообщений для нескольких пользователей.
class ChatRepository {
    private val coroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())

    // Create a private MutableSharedFlow property to hold incoming chat messages
    private val _incomingMessages = MutableSharedFlow<ChatMessage>(extraBufferCapacity = 64)
    // Create a public SharedFlow property that exposes incoming chat messages as an immutable value
    val incomingMessages: _incomingMessages.asSharedFlow()

    init {
        // Call the simulateIncomingMessages function to simulate incoming chat messages
        simulateIncomingMessages()
    }

    /**
     * Sends a outgoing chat message to the server, and emit it to the flow
     * to be displayed on the UI
     */
    fun sendMessage(username: String, content: String) {
        // Launch a coroutine in the IO scope to emit a chat message to the _incomingMessages flow
        coroutineScope.launch {
            _incomingMessages.emit(ChatMessage(username, content))
        }
    }

    private fun simulateIncomingMessages() {
        coroutineScope.launch {
            while (true) {
                // Wait for a random amount of time between 500 
                // and 2000 milliseconds to simulate random message delays
                // Simulate random message delays
                delay(Random.nextLong(500, 2000)) 
                // Create a new chat message with a random sender and content
                val message = ChatMessage("User ${Random.nextInt(1, 6)}", "Hello, world!")
                // Emit the chat message to the _incomingMessages flow
                _incomingMessages.emit(message)
            }
        }
    }

    /**
     * Cancels all coroutines launched by the [ChatRepository] instance.
     */
    fun cancel() {
        coroutineScope.cancel()
    }
}

data class ChatMessage(val sender: String, val content: String)

2. ChatViewModel.kt: создайте ChatViewModel, которая позволяет отправлять сообщения и предоставляет доступ к входящим сообщениям.

class ChatViewModel(private val chatRepository: ChatRepository) : ViewModel() {

    /**
     * A [SharedFlow] of incoming chat messages.
     *
     * The flow emits incoming chat messages as they arrive, 
     * and can be observed by clients to display the chat history.
     */
    val incomingMessages: SharedFlow<ChatMessage> = chatRepository.incomingMessages

    fun sendMessage(username: String, content: String) {
        chatRepository.sendMessage(username, content)
    }
}

3. ChatActivity.kt

class ChatActivity : AppCompatActivity() {
    private val viewModel: ChatViewModel by viewModels()
    private val chatAdapter = ChatAdapter()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        // Setup UI components and adapter initialization
        setContentView(R.layout.activity_chat)

        val recyclerView = findViewById<RecyclerView>(R.id.recyclerView)
        recyclerView.layoutManager = LinearLayoutManager(this)
        recyclerView.adapter = chatAdapter

        // Observer incoming messages as they arrive and display it on the list
        lifecycleScope.launch {
            viewModel.incomingMessages.collect { message ->
                conversationAdapter.addMessage(message)
                binding.recyclerView.scrollToPosition(conversationAdapter.itemCount - 1)
            }
        }
    }
}

// RecyclerView to display incoming messages on the list
class ChatAdapter : RecyclerView.Adapter<ChatAdapter.ViewHolder>() {
    private val messages = mutableListOf<ChatMessage>()

    fun addMessage(message: ChatMessage) {
        messages.add(message)
        notifyItemInserted(messages.size - 1)
    }

    override fun onCreateViewHolder(parent: ViewGroup, viewType: Int): ViewHolder {
        val view = LayoutInflater.from(parent.context).inflate(R.layout.item_chat_message, parent, false)
        return ViewHolder(view)
    }

    override fun onBindViewHolder(holder: ViewHolder, position: Int) {
        holder.bind(messages[position])
    }

    override fun getItemCount() = messages.size

    class ViewHolder(itemView: View) : RecyclerView.ViewHolder(itemView) {
        private val senderTextView: TextView = itemView.findViewById(R.id.senderTextView)
        private val contentTextView: TextView = itemView.findViewById(R.id.contentTextView)

        fun bind(chatMessage: ChatMessage) {
            senderTextView.text = chatMessage.sender
            contentTextView.text = chatMessage.content
        }
    }
}

Проверьте ChatRepository.kt в действии:



Сценарий использования SharedFlow 2: шина событий

Вот более сложный пример использования общего потока. Предположим, мы хотим создать шину событий, которая транслирует события нескольким слушателям с помощью SharedFlow.

  1. EventBus.kt: определите общий класс EventBus с общим потоком.
/**
 * An event bus implementation that uses a shared flow to 
 * broadcast events to multiple listeners.
 */
class EventBus<T> {
    // Create a private MutableSharedFlow property to hold events
    private val _events = MutableSharedFlow<T>(replay = 0, extraBufferCapacity = 64)
    // Create a public SharedFlow property that exposes events as an immutable value    
    val events: Flow<T> = _events.asSharedFlow()

    /**
     * Sends an event to the shared flow.
     */
    suspend fun sendEvent(event: T) {
        _events.emit(event)
    }
}

2. Event.kt: определение различных типов событий.

sealed class Event {
    object EventA : Event()
    object EventB : Event()
    data class EventC(val value: Int) : Event()
}

3. EventListener.kt: создайте класс EventListener, отвечающий за подписку на определенные типы событий.

class EventListener(
    private val eventBus: EventBus<Event>,
    private val scope: CoroutineScope
) {
    init {
        // Subscribe to the events flow using the onEach operator
        eventBus.events
            .onEach { event ->
                // Use a when expression to handle different types of events
                when (event) {
                    is Event.EventA -> handleEventA()
                    is Event.EventB -> handleEventB()
                    is Event.EventC -> handleEventC(event.value)
                }
            }
            // Launch the event listener in the given coroutine scope
            // It can cancel the subscription when scope is not present any more
            .launchIn(scope)
    }

    // Private functions to handle specific types of events
    private fun handleEventA() {
        println("EventA received")
    }

    private fun handleEventB() {
        println("EventB received")
    }

    private fun handleEventC(value: Int) {
        println("EventC received with value: $value")
    }
}

4. Main.kt: создайте экземпляры EventBus и EventListener, затем отправьте события с помощью EventBus.

fun main() = runBlocking {
    val eventBus = EventBus<Event>()
    // Instantiate EventListener to start listening for events from the eventBus    
    val eventListener = EventListener(eventBus, this)

    // Send events using the eventBus in a separate coroutine
    launch(Dispatchers.Default) {
        delay(1000)
        eventBus.sendEvent(Event.EventA)

        delay(1000)
        eventBus.sendEvent(Event.EventB)

        delay(1000)
        eventBus.sendEvent(Event.EventC(42))
    }

    // Keep the main coroutine scope active to let the listener process the events
    delay(5000)
}

Обратите внимание, что переменная eventListener по-прежнему явно не используется в функции main. Однако его создание запускает блок init внутри класса EventListener, который подписывается на события из класса eventBus.

Переменная eventListener создается для того, чтобы экземпляр EventListener хранился в памяти, а не собирался мусор, что помешало бы ему получать события. Создав переменную eventListener и назначив ей экземпляр EventListener, слушатель останется активным, а события, отправленные eventBus, обрабатываются должным образом.

В этом примере демонстрируется более расширенное использование SharedFlow, где создается универсальный класс EventBus для трансляции событий нескольким прослушивателям. Класс EventListener прослушивает определенные типы событий и обрабатывает их соответствующим образом. Код следует рекомендациям, таким как инкапсуляция изменяемых потоков, управление ресурсами и эффективная обработка различных типов событий.

В этом посте я постарался осветить все, что вам нужно о SharedFlow и StateFlow. Я надеюсь, что эта статья поможет вам найти лучший способ использовать их в своем приложении.

Посетите репозиторий чата, чтобы узнать, как использовать SharedFlow и StateFlow на практике, и, пожалуйста, дайте мне знать ваше мнение об этой статье.