Sử dụng Akka Persistence trong Event Sourcing

Introduction

Trong bài viết trước đây của mình “Tổng quan về kiến trúc CQRS”, mình có nhắc tới khái niệm là Event Sourcing. Về ý tưởng cơ bản, Event Sourcing là phương pháp để lưu trữ các state (trạng thái) của hệ thống hoặc đối tượng. Khi lập trình Scala, mọi người có thể không xa lạ gì với bộ thư viện Akka. Lần này, mình xin giới thiệu các tính năng cơ bản của Akka persistence (Persistent Actor) để áp dụng trong Event Sourcing. Akka persistence cho phép các Actor thực hiện lưu trữ trạng thái nội tại để nó có thể có khả năng khôi phục khi một Actor start hay restart sau khi JVM crash hoặc có yêu cầu thực thi từ supervisor, hay migrate dữ liệu của 1 cluster.
Nguyên lý chính của Akka persistence là duy nhất các thay đổi state của Actor là tồn tại mà không thực hiện thay đổi trực tiếp đến state của actor (ngoại trừ lúc snapshot). Tất cả các thay đổi về state sẽ được lưu trữ ở storage, dữ liệu này là bất biến (immutable). Do đó, state của các actor có thể được khôi phục bằng cách thực hiện lại (replaying) các thay đổi được lưu trong store từ trạng thái ban đầu.

Setup

Dependencies

"com.typesafe.akka" %% "akka-actor" % "2.5.3",
"com.typesafe.akka" %% "akka-persistence" % "2.5.3"

Tính năng persistence của Akka đi kèm với một vài persistence plugins. Trong ví dụ này, chúng ta sử dụng LevelDB (key-value storage library được viết bởi Google: https://github.com/google/leveldb)

"org.iq80.leveldb"            % "leveldb"          % "0.7",
"org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8"

Configuration

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
akka.persistence.journal.leveldb.native = false

PersistentActor
Để tạo một Persistent Actor chúng ta cần tạo ra một Actor mà extends từ trait PersistentActor của Akka
Ví dụ:

class ExamplePersistentActor extends PersistentActor {
  override def persistenceId = "sample-id-1"

  var state = ExampleState()

  def updateState(event: Evt): Unit =
    state = state.updated(event)

  def numEvents = state.size

  val receiveRecover: Receive = {
    case evt: Evt                                => updateState(evt)
    case SnapshotOffer(_, snapshot: ExampleState) => state = snapshot
  }

  val receiveCommand: Receive = {
    case Cmd(data) =>
      persist(Evt(s"${data}-${numEvents}"))(updateState)
      persist(Evt(s"${data}-${numEvents + 1}")) { event =>
        updateState(event)
        context.system.eventStream.publish(event)
      }
    case "snap"  => saveSnapshot(state)
    case "print" => println(state)
  }
}

Ngoài ra, chúng ta phải cần implement 2 abstract method là receiveRecover và receiveCommand

  • Phương thức receiveCommand sẽ được thực thi khi nhận các Command request. Ví dụ chúng ta tạo ra các command sau:
 
object PersistentActorExample extends App {

  val system = ActorSystem("example")
  val persistentActor = system.actorOf(Props[ExamplePersistentActor], "sample-id-1")

  persistentActor ! Cmd("foo")
  persistentActor ! Cmd("baz")
  persistentActor ! Cmd("bar")
  persistentActor ! Cmd("buzz")
  persistentActor ! "print"

  Thread.sleep(10000)
  system.terminate()
}
//kết quả lần chạy đầu tiên sẽ là
List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7)
//kết quả lần chạy thứ hai sẽ là
List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11, bar-12, bar-13, buzz-14, buzz-15)
  • Phương thức receiveRecover được define cách mà state được update trong quá trình recovery bởi việc xử lý event Evt và SnapshotOffer.

Persistent Id
Có 1 rule mà phải tuân thủ là khi sử dụng Persistent Actors thì Actor bắt buộc phải có ID, và không được thay đổi. Trong ví dụ trên, mình defined như sau:

 override def persistenceId = "sample-id-1"

Bởi vì persistenceId là duy nhất để lưu trữ các sự kiện của 1 entity trong journal (database table/keyspace). Khi thực hiện tìm kiếm các event được lưu theo persistenceId, điều đó có nghĩa là sẽ có lỗi xảy ra nếu có 2 entity khác nhau mà có cùng 1 persistenceId.

Failed/Recovery
Ở chế độ mặc định, một Persistent Actor tự động được recovery lúc start và restart bằng việc thực hiện thực thi lại các event đã lưu từ thời điểm trạng thái ban đầu (replaying journaled messages). Trong lúc khôi phục trạng thái bằng việc replaying message, nếu có 1 message đến thì sẽ không ảnh hưởng quá trình khôi phục, message này sẽ được lưu lại bởi persistent actor sau khi kết thúc quá trình recovery.

Số lượng các recovery có thể được thực hiện đồng thời nên được giới hạn để không làm quá tải hệ thống. Có thể cấu hình bằng tham số

akka.persistence.max-concurrent-recoveries = 50

Có một lưu ý là nếu bạn hay sử dụng PoisonPill để shutdown một Actor thì không nên áp dụng đối với Persistent Actor, bởi vì việc này có thể gây ảnh hưởng gián đoạn quá trình journaling. Đối với Persistent Actor, Akka khuyên bạn nên sử dụng một commands shutdown riêng biệt, để bạn có thể kiểm soát được nó.

Snapshots
Snapshots có thể làm giảm đáng kể thời gian recovery của Persistent Actors. Persistent Actor có thể lưu snapshots của internal state bằng việc gọi method saveSnapshot. Nếu việc lưu trữ 1 snapshot thành công, persistent actors sẽ nhận được 1 SaveSnapshotSuccess message, ngược lại sẽ là SaveSnapshotFailure

val receiveCommand: Receive = {
  case Cmd(data) =>
    persist(Evt(s"${data}-${numEvents}"))(updateState)
  case "snap"  => saveSnapshot(state)
  case SaveSnapshotSuccess(metadata) =>
    println(s"SaveSnapshotSuccess(metadata) :  metadata=$metadata")
  case SaveSnapshotFailure(metadata, reason) =>
    println(s"SaveSnapshotFailure(metadata, reason) : metadata=$metadata, reason=$reason")
}

Persisting
Phương thức persit thực hiện việc lưu trữ các event bất đồng bộ và event handler sẽ thực hiên việc xử lý event. Ngoài ra phương thức này cũng đảm bảo rằng không có các commands mới sẽ được tiếp nhận bởi Persistent Actor trong lúc gọi method persist và thực thi handler của nó. Trong hàm xử lý handler thường các ứng dụng sẽ update trạng thái của Persistent Actor sử dụng persistent event data, thông báo cho các listerners và phản hồi command cho các sender.

def persist[A](event: A, handler: Procedure[A]): Unit =
  persist(event)(event =< handler(event)

Ngoài ra, Akka cũng cung cấp phương thức persistAsync cho phép cài đặt đối với các Persistent Actors cần high-throughput. Phương thức này sẽ không block các Commands được gửi tới, trong lúc đó quá trình thực thi event handler vẫn tiếp tục hoạt động và sau đó sẽ thực hiện callback đối với các Commands này.
Ví dụ dưới đây so sách cách thực hiện của persist và persistAsync
Có 3 Commands sau:

val future1 = persistentActor ? Cmd("foo")
future1.onSuccess({
  case x: String => println("Response: " + x)
})

val future2 = persistentActor ? Cmd("bazz")
future2.onSuccess({
  case x: String => println("Response: " + x)
})

val future3 = persistentActor ? Cmd("bar")
future3.onSuccess({
  case x: String => println("Response: " + x)
})

Đối với phương thức persit:

val receiveCommand: Receive = {
  case Cmd(data) =>
    println(data)
    persist(Evt(s"${data}-${numEvents + 1}")) { event =>
      Thread.sleep(1000l)
      sender() ! data
    }
}

Cho kết quả là:

Receive request foo
Response: foo
Receive request bazz
Response: bazz
Receive request bar
Response: bar

Nếu chúng ta sử dụng persistAsync thay cho persist thì kết quả là:

Receive request foo
Receive request bazz
Receive request bar
Response: foo
Response: bazz
Response: bar

Where is the code
Akka cung cấp rất nhiều example do đó bạn có thể tìm code demo về Persistence Actor ở đây https://github.com/akka/akka-samples/tree/2.5/akka-sample-persistence-scala

References

Add a Comment

Scroll Up