Làm quen với Akka Streams

Nếu bạn đã và đang xài ngôn ngữ Scala thì ắt hẳn bạn sẽ biết đến thư viện Akka. Akka là một thư viện rất mạnh trong việc xử lý tính toán phân tán và tính toán song song.

Hôm nay chúng ta sẽ bàn về Akka Streams – một phần trong thư viện Akka. Akka Streams là một implement của Reactive Streams, được sử dụng để xử lý và chuyển đổi luồng dữ liệu bằng cách sử dụng không gian buffer giới hạn (bounded buffer space). Tính chất này được biết đến là tính bị chặn (boundessnesss) – tính năng định nghĩa của Akka Streams.  Có thể hiểu đơn giản rằng nó diễn giải một chuỗi việc thực thi các thực thể. Mỗi thực thể đều được thực thi độc lập (hoặc song song) với các thực thể khác trong khi được buffer một lượng phần tử (element) giới hạn ở mỗi thời điểm.

Để sử dụng Akka Streams, cần thêm dependency bên dưới vào trong build.sbt:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.25"

Trước khi tìm hiểu về các thành phần cơ bản của Akka Streams, chúng ta sẽ cùng xem qua một vài thuật ngữ cơ bản sẽ được sử dụng.

Stream: luồng di chuyển và chuyển đổi dữ liệu.

Element: đơn vị xử lý của Stream. Tất cả các phép sẽ thực thi việc di chuyển và chuyển đổi các element từ upstream xuống downstream.

Graph: mô tả cấu trúc xử lý stream, định nghĩa các luồng mà các element sẽ đi theo khi stream được chạy.

Akka Streams gồm những thành phần cơ bản sau đây.

Source

Được sử dụng để tạo ra một output duy nhất. Các element được emit (sinh ra) bởi Source.

Có rất nhiều cách để implement Source:

//Tạo một source rỗng
val emptySource = Source.empty

//Source tạo ra các element từ Range
val sourceUsingRange: Source[Int, NotUsed] = Source(1 to 100)

//Source tạo ra các element từ List 
val sourceUsingList: Source[Int, NotUsed] = Source(List(1, 2, 3))

//Source tạo ra các element từ Future 
val sourceFromFuture: Source[Int, NotUsed] = Source.fromFuture(Future(2))

//Source tạo ra một element đơn
val singleSource: Source[String, NotUsed] = Source.single("1")

//Source tạo ra các element repeat
val repeatedSource: Source[Int, NotUsed] = Source.repeat(5)

Trong các ví dụ trên, Source được tham số hoá với 2 tham số. Tham số đầu chỉ định kiểu dữ liệu Source sinh ra. Tham số còn lại  xác định các thông tin phụ mà Source tạo ra khi nó bắt đầu được chạy.

Ví dụ Source[Int, NotUsed] sẽ tạo ra dữ liệu kiểu Int và NotUsed được sử dụng khi không cần cung cấp thêm thông tin về nguồn dữ liệu.

Flow

Flow nhận một đầu vào là input stream và một đầu ra là output stream. Nó lấy dữ liệu từ Source và xử lý, chuyển đổi trên các element rồi trả về các element đã được xử lý cho Sink. Dữ liệu được tạo ra bởi Source có thể được chọn lọc, được chuyển đổi như ví dụ dưới đây.

// Chọn lọc các element lớn hơn 0
val flowWithFilter = Flow[Int].filter(_ > 0)

// Chuyển đổi các element sang kiểu string
val flowWithMap = Flow[Int].map(_.toString)

Sink

Sink có đúng một đầu vào để tiếp nhận xử lý các element từ Flow. Sink cũng được tham số hoá với 2 tham số. Tham số đầu chỉ định kiểu dữ liệu mà Sink sẽ tiếp nhận và tham số còn lại định nghĩa kiểu của thông tin phụ trợ. Sink có nhiều hình thức implement khác nhau như bên dưới.

//Sink thêm tất cả các element của stream bằng cách sử dụng phương thức fold
val sinkAddingElements: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
 
// Sink trả về một Future như giá trị cụ thể, chứa element đầu tiên của stream
val sinkReturningFirstElement: Sink[Int, Future[Int]] = Sink.head
 
// Sink tiếp nhận stream mà không làm gì với các element
val sinkIgnoringElements: Sink[Int, Future[Done]] = Sink.ignore
 
// Sink thực thi side-effecting call cho tất cả các element của stream
val sinkPrintingElements: Sink[String, Future[Done]] = Sink.foreach[String](println(_))

Runnable Graph

Một Flow được “đính” vào một Source và Sink tương ứng, và sẵn sàng để chạy. Có thể đính một Flow với một Source cho ra kết quả một Source tổng hợp, cũng có thể thêm một Flow vào một Sink để có được một Sink mới. Một stream được giới hạn chính xác bởi một Source và một Sink, nó sẽ là biểu diễn của kiểu Runnable Graph (như hình dưới).

Runnable Graph cũng có thể được tạo mà không cần Flow, nghĩa là Source và Sink là các thành phần cần thiết được sử dụng để chạy một Stream.

Sau khi Runnable Graph được tạo xong, nó sẵn sàng để chạy.

implicit val system: ActorSystem = ActorSystem("akka-streams")
implicit val materializer: ActorMaterializer = ActorMaterializer()

val sourceProducingElements: Source[Int, NotUsed] = Source(1 to 100)
val flowTransformingData: Flow[Int, Int, NotUsed] = Flow[Int].filter(_ % 2 == 0).map(_ * 2)
val sinkPrintingElements: Sink[Int, Future[Done]] = Sink.foreach[Int](println(_))

val runnableGraph = source.via(flowTransformingData).toMat(sinkPrintingElements)(Keep.right)
runnableGraph.run()

Trong ví dụ trên, Source tạo ra các số Interger từ 1 đến 100. Flow được tạo để lọc các số chẵn và  nhân đôi nó lên. Sau đó sẽ có một Sink (được đặt tên là sinkPrintingElements) thực hiện công việc in các element ra console. Việc kết nối giữa Source, Flow và Sink bằng cách sử dụng RunnableGraph[Int, Future[Done]]. Để thực thi Stream, phương thức run() được gọi thông qua runnableGraph.

Chú ý rằng, stream yêu cầu sử dụng một materializer để thực thi. materializer có nhiệm vụ phân bổ các tài nguyên như là Actor để chạy Stream. ActorMaterializer có thể được cung cấp bởi một biến implicit trong scope hoặc biến rõ ràng khi chạy stream.

Chạy stream sử dụng Runnable Graph là một cách để thực thi stream. Có một cách khác là sử dụng function có sẵn như phương thức runWith() – phương thức nhận Sink như một tham số đầu vào.  Trong ví dụ bên dưới, phương thức runForEach() được sử dụng để gọi phương thức runWith() với implement của nó.

source
.filter(_ % 2 == 0)
.map(_* 2)
.runForeach(println)

 

Nguồn:

https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html

https://blog.knoldus.com/introduction-to-akka-streams-part-1

 

Tags:,

Add a Comment

Scroll Up