Bài toán tối ưu performance và memory cùng với Akka (Phần I)
Đây là bài toán thực tế trong project của mình
Có lẽ cũng nhiều bạn cũng đã gặp trường hợp tương tự
Có khi chúng ta thường tặc lưỡi bỏ qua, hay đổ tội cho “con server”,.. but NOT TODAY
Problem
(đã được lược giản cho phù hợp)
1 con batch chạy hàng ngày để crawl dữ liệu rồi insert vào Database
Đây là data lược giản khi crawl
Code hiện tại của batch
accounts foreach(account => { // something cool ... campaigns.foreach(campaign => { // something cool ... println("Insert campaign") keywords.foreach(keyword => { // something cool ... println("Insert keyword") // }) }) })
Điểm qua các vấn để của code hiện tại:
- Blocking code
- Chỉ sử dụng 1 Thread
- Không có cơ chế khi Failure
=> Performance thấp, không tận dụng được tài nguyên của server, độ ổn định không cao
Và quan trọng nhất là số lượng accounts sắp tới sẽ tăng lên gấp nhiều lần…
Solution
Akka — Chiếc Ferrari 812 động cơ V12
Nhưng tại sao….?
- Akka dựa trên Actor model cung cấp cơ chế xử lý trên Multi-thread với bộ sậu Actor, Dispatcher, Routing, Mailboxes,… mà không cần quan tâm đến các vấn đề thường gặp khi xử lý Multi-thread
- A clustered, high-availability architecture
Đây là cách Akka hoạt động:
First design
Let The Code Speak For Itself
package com.lightbend.akka.sample import akka.actor.{Actor, ActorLogging, ActorSystem, Props} import akka.routing.RoundRobinPool object SampleData { case class Account(id: Int, name: String) case class Campaign(id: Int, accountId: Int, name: String) case class Keyword(id: Int, campaignId: Int, name: String) val accounts: List[Account] = List( Account(1, "Maria"), Account(2, "Jon"), Account(3, "Snow") ) val campaigns: List[Campaign] = List( Campaign(1, 1, "Maria-campaign-1"), Campaign(2, 1, "Maria-campaign-2"), Campaign(3, 2, "Jon-campaign-1"), Campaign(4, 3, "Snow-campaign-1"), ) val keywords: List[Keyword] = List( Keyword(1, 1, "A"), Keyword(2, 1, "B"), Keyword(3, 1, "C"), Keyword(4, 1, "D"), Keyword(5, 1, "E"), Keyword(6, 2, "F"), Keyword(7, 2, "G"), Keyword(8, 2, "H"), Keyword(9, 3, "I"), ) } object Test extends App { import com.lightbend.akka.sample.Test.AccountActor._ import SampleData._ object AccountActor { def props = Props[AccountActor] case class ProcessAccount(account: Account) } class AccountActor extends Actor with ActorLogging { import AccountActor._ import CampaignActor._ val campaignActor = context.actorOf(CampaignActor.props, "adGroup") override def receive: Receive = { case ProcessAccount(account) => val campaignIn = for (campaign campaignActor ! ProcessCampaign(campaign)) } } object CampaignActor { def props: Props = Props[CampaignActor] case class ProcessCampaign(campaign: Campaign) } class CampaignActor extends Actor with ActorLogging { import CampaignActor._ import KeywordActor._ // create route pool with 5 val keywordActor = context.actorOf(RoundRobinPool(5).props(KeywordActor.props), "keyword") override def receive: Receive = { case ProcessCampaign(campaign) => log.info(s"Insert campaign ${campaign.name}") val keywordsIn = for (keyword keywordActor ! ProcessKeyword(keyword)) } } object KeywordActor { def props = Props[KeywordActor] case class ProcessKeyword(keyword: Keyword) } class KeywordActor extends Actor with ActorLogging { import KeywordActor._ override def receive: Receive = { case ProcessKeyword(keyword) => log.info(s"Insert keyword ${keyword.name}") } } val system: ActorSystem = ActorSystem("BatchAkka") val accountActor = system.actorOf(AccountActor.props, "account") accounts.foreach(accountActor ! ProcessAccount(_)) }
Kết quả
- Tất cả đều là non-blocking code, xử lý liên tục mà không cần đoạn trên hoàn thành
- Tận dụng tối đa tài nguyên server, Dispatcher sẽ phân các actor vào từng thread, càng thêm CPU càng nhanh
- Dễ dàng điều chỉnh số lượng actor
- Code decoupling, mỗi Actor gói gọn 1 business
Nhược điểm
Tuy nhiên, với design này lộ ra 1 nhược điểm: out of memory
Tưởng tượng cứ như việc tắc đường ở hiện tượng thắt cổ chai:
Lý do bởi vì Actor xử lý message thông của Mailbox của chính nó một cách tuần tự.
Account process xử lý rất nhanh do không có IO process, nó đổ messages liên tục vào Mailbox của Campaign Actor, tượng tự với Campaign actor đổ messages vào mailbox của Keyword actor. Mà không cần quan tâm process sau mình đang xử lý ở tốc độ nào.
Với việc Mailbox tăng nhanh, mà actor ở process đấy có hạn(đang busy xử lý) => memory dùng cho Mailboxs tăng cao một cách không cần thiết.
Cũng như con đường 5 làn oto đổ vào con đường chỉ chịu được 2 làn oto! Cuối con đường thì chỉ có từng ấy oto ra được mà thôi!
Vậy thì sao nhỉ?
Đây là bài toán cân băng về performance và memory giữa Publisher và Subscriber
- bao nhiêu subscriber là đủ?
- Publisher push data với rate cao hơn Subscriber có thể process thì sao?
- Publisher push data với rate thấp hơn Subscriber có thể process thì sao?
Có 2 cách xử lý trong trường hợp này
- Work pulling pattern
- Akka streams
Trong phần tiếp theo mình sẽ phân tích 2 cách làm trên và cách project mình implement như nào!!!