Giới thiệu về Future – Ứng dụng của Future trong Scala

Đối với các devoloper java nói riêng thì việc phải xử lý các bài toán phức tạp liên quan đến luồng dữ liệu là một việc khiến nhiều developer phải mất ăn mất ngủ , vì để đảm bảo được performance cho sản phẩm thì việc đồng bộ cực kỳ quan trọng. Trong Scala là một ngôn ngữ thuần hướng đối tượng hơn java nên Scala có thể sử dụng được tất cả các thư viện của Java nên ta hoàn toàn có thể sử dụng các tính năng về lập trình song song của Java (Thread, Runnable…) để phục vụ cho việc xử lý bất đồng bộ khi code Scala. Tuy nhiên, bản thân Scala cũng có những tính năng phục vụ riêng cho việc lập trình bất đối xứng với API ở mức abstract hơn, đồng thời dễ dàng tích hợp với các API mà Scala cung cấp. Một tính năng có sẵn trong standard library của Scala là Future, ngoài ra còn có Async, thư viện cung cấp tính năng bổ trợ cho Future, và Akka, một thư viện rất mạnh phục vụ cho việc lập trình song song. Vì Akka khá phức tạp, xứng đáng có một bài viết riêng về nó, còn Async thì mới chỉ ở dưới dạng SIP (Scala Improvement Process) và chỉ ở dạng optional module, nên nội dùng bài viết này sẽ chỉ tập trung giới thiệu về Future
ud5zi

I. Giới thiệu Future
1. Future là gì ?
Future là gì? Một object Future là một object mà giá trị của nó sẽ được khỏi tại tại một thời điểm nào đó trong tương lại. Giá trị này thường là kết quả của một quá trình tính toán nào đó. Quá trình tính toàn này nếu trả về giá trị kết quả thì ta nói là object Future này đã hoàn thành với giá trị đó, còn ngược lại nếu một Exception được trả về thì object Future được gọi là đã thất bại với Exception kia. Một đặc tính của Future là khi một Future khi đã hoàn thành dù là thành công hay thất bại thì nó sẽ không thể thay đổi được nữa (trở nên immutable), điều này
Khởi tạo một khá đơn giản

val sum : Future[Int] = Future{
    15 * 10
}

Ở đây ta đã đóng gọi một phép nhân đơn giản vào trong một object Future với type Future[Int], dù trong thực tế không ai đóng gói một phép tính đơn giản như vậy mà thường là một quá trình tính toán mất thời gian như IO, network… Để sử dụng Future thì chúng ta cần có một ExecutionContext. Ở đây ExecutionContext.global đã được sử dụng, ExecutionContext này sử dụng java.util.concurrent.ForkJoinPool để quản lý một thread pool thực hiện những công việc tính toán được đóng gói trong Future. Với một số tinh chỉnh, ta có thể sử dụng ExecutionContext.global trong hầu hết các trường hợp. Một số trường hợp đặc biệt (như là long running blocking code trong Future) thì một Execution Context tuỳ biến (wrapper cho Java Executor) có thể được sử dụng.
2.Future in Action
Hãy cùng xem một ví dụ đơn giản về Future.

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global

def factorial(n: BigInt): BigInt = {
  if (n <= 1) 1
  else n * factorial(n - 1)
}
val futures = (0 to 9) map {
  i => Future {
    println(s"Input: $i")
    val s = factorial(i)
    println(s"Result for $i: $s")
    s.toString()
  }
}
val f = Future.reduce(futures) { (s1, s2) => s1 + ',' + s2 }
val n = Await.result(f, Duration.Inf)
println(n)

Kết quả ra như sau:

Input: 0
Result for 0: 1
Result for 1: 1
Input: 2
Result for 2: 2
Input: 4
Input: 3
Result for 3: 6
Input: 5
Result for 4: 24
Result for 5: 120
Input: 6
Result for 6: 720
Input: 8
Result for 8: 40320
Input: 9
Result for 9: 362880
Input: 7
Result for 7: 5040
1,1,2,6,24,120,720,5040,40320,362880

Ở đây ta đã dùng 10 Future để thực hiện tính factorial cho mười số từ 0 đến 9. Quá trình tính cho mỗi số đều được chạy bất đồng bộ, như đã thấy ở output ở trên. Mỗi lần chạy thứ tự các dòng Input và Result sẽ là khác nhau, chỉ có thứ tự in của kết quả cuối là không đổi do hàm reduce xét lần lượt thứ tự các phần tử của chuỗi futures theo thứ tự khởi tạo. Ngoài ra, ta còn sử dụng Await.result để block tiến trình chính cho đến khi Future f hoàn thành .
Tuy nhiên việc block tiến trình chính trong nhiều trường hợp có thể làm ảnh hưởng đến chương trình. Trong những trường hợp như vậy, ta có thể sử dùng callback
3.Sử dụng Callback với Future
Để có thể xử lý kết quả của Future một cách bất đồng bộ (không làm block tiến trình chính) ta có thể đăng ký callback với Future. Khi Future hoàn thành thì hàm callback này sẽ được gọi một cách bất đồng bộ. Cách đăng ký callback thông dụng nhất là cung cấp một function với type Try[T] => Unit cho method onComplete của Future.
Dưới đây là một ví dụ về calback với onComplete

import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success, Try}

def factorial(n: BigInt): BigInt = {
  if (n <= 1) 1
  else n * factorial(n - 1)
}

val futures = (0 to 9) map {
  i => Future { factorial(i) }
}

val fs = futures map {
  _ flatMap { x =>
    if (x <= 10000) Future.successful(x)
    else Future.failed(new RuntimeException(s"$x > 10000"))
  }
}

def doComplete: Function[Try[BigInt], Unit] = {
  case s @ Success(_) => println(s)
  case f @ Failure(_) => println(f)
}

fs map (_ onComplete doComplete)

Kết quả của đoạn code trên sẽ là như sau (mỗi lần thứ tự các dòng kết quá sẽ khác nhau do tính chất bất đồng bộ của Future)

Success(2)
Success(6)
Success(24)
Success(120)
Success(720)
Success(5040)
Failure(java.lang.RuntimeException: 40320 > 10000)
Failure(java.lang.RuntimeException: 362880 > 10000)
Success(1)

Ở đây ta sử dụng lại đoạn code tính factorial cho 10 số dùng Future, tuy nhiên có thêm phần check kết quả nếu như lớn hơn 10000 thì ta sẽ cho kết quả của Future là thất bại (đóng gói một Exception rồi trả về). Trong đoạn code ở trên, với mỗi Future trong chuỗi fs, chúng ta đều đăng ký một callback cho method onComplete là hàm doComplete . Ngoài ra, ta cũng thấy Future, tương tự như Option, Try, Either… hay các collection như List… là một cấu trúc dữ liệu mang tính chất Monad nên Future cung cấp những hàm như flatMap, filter…; một Future cũng thể được sử dụng với cấu trúc for comprehension của Scala.

Ngoài method onComplete sử dụng để đăng ký callback xử lý cả 2 trường hợp Future thành công lẫn thất bại, Future còn cung cấp 2 method onSuccess và onFailure để đăng ký callback xử lý riêng cho từng trường hợp. Sử dụng onSuccess và onFailure thì đoạn code ở trên sẽ trở thành như sau:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

def factorial(n: BigInt): BigInt = {
  if (n <= 1) 1
  else n * factorial(n - 1)
}
val futures = (0 to 9) map {
  i => Future { factorial(i) }
}
val fs = futures map {
  _ flatMap { x =>
    if (x <= 10000) Future.successful(x)
    else Future.failed(new RuntimeException(s"$x > 10000"))
  }
}
fs map { f =>
  f.onSuccess { case res => println(res) }
  f.onFailure { case err => println("Error: " + err.getMessage) }
}

Và do onSuccess và onFailure khác với onComplete sẽ lấy trực tiếp các kết quả và exception được đóng gói trong Success(..) và Failure(…) nên ta sẽ có kết quả như sau:

2
6
24
120
720
5040
Error: 40320 > 10000
Error: 362880 > 10000
1

II. Một số ứng dụng của Future trong Scala
Future chạy song song với Main Thread
Tạo một Future về cơ bản là ném xử lý cho một thread khác sẽ chạy song song với Main Thread

import scala.concurrent._
import ExecutionContext.Implicits.global

val f = Future(10)

Tạo một Future độc lập (ở luôn trạng thái đã thực thi xong) không cần Execution Context

import scala.concurrent._

val g = Future.successful(10)
val h = Future.failed{ new Exception("Error in The Future") }

Nhiều Future chạy song song
Future.sequence

import scala.concurrent._
import ExecutionContext.Implicits.global

val futures: List[Future[Int]] = (1 to 10).toList.map(i => Future(i))
val f: Future[List[Int]] = Future.sequence(futures)

Ở đây mình đã biến List[Future] thành Future[List]. Khi thực thi f thì 10 Future bên trong sẽ chạy song song bằng các thread độc lập với Main Thread. Kết quả trả về là List[Int]
Future.traverse

import scala.concurrent._
import ExecutionContext.Implicits.global

val futures: Future[List[Int]] = Future.traverse((1 to 10).toList)(i => Future(i))

Ở đây kết quả sẽ là kiểu Future[List] sau khi gọi Future.traverse
Future.firstCompletedOf

Cho nhiều Future chạy song song và trả về thằng hoàn thành trước nhất

import scala.concurrent._
import ExecutionContext.Implicits.global

val futures: List[Future[Int]] = (1 to 10).toList.map(i => Future(i))
val f: Future[Int] = Future.firstCompletedOf(futures)

Future.find

Cho nhiều Future chạy song song và trả về thằng hoàn thành trước nhất thỏa mãn điều kiện

import scala.concurrent._
import ExecutionContext.Implicits.global

val futures: List[Future[Int]] = (1 to 10).toList.map(i => Future(i))
val f: Future[Option[Int]] = Future.find(futures){ _ % 3 == 1 } // Ket qua chia het cho 3

Vì find có thể không tìm thấy, nên giá trị trả về ở đây sẽ là Option[Int]
Future.reduce và Future.fold

Kết hợp kết quả của nhiều Future chạy song song. Có thể dùng reduce hoặc fold

import scala.concurrent._
import ExecutionContext.Implicits.global

val futures: List[Future[Int]] = (1 to 10).toList.map(i => Future(i))
val f: Future[Int] = Future.reduce(futures){ (r,v) => r+v }
val g: Future[Int] = Future.fold(futures)(-5){ (r,v) => r+v }

Ở đây mình cộng dồn với kết quả trả về từ các Future. Hàm reduce chỉ đơn giản trả về tổng, còn hàm fold sẽ lấy giá trị đầu tiên -5 rồi mới thực hiện cộng dồn.
Kết Luận:
Với API dễ sử dụng, Future là một tính năng rất hữu dụng khi lập trình song song, bất đồng bộ trong Scala. Tuy nhiên do thiếu những công cụ để quản lý các tiến trình bất đồng bộ, xử lý Exception một cách hiệu quả… nên Future không thích hợp với những hệ thống song song lớn. Với những hệ thống này, Akka là một lựa chọn tốt hơn.

Bài viết đã giới thiệu các tính năng cơ bản về Future, tuy nhiên còn một số tính năng chưa được đề cập đến như sử dụng Promise với Future, optimize global Execution Context hay blocking code trong Future… Bạn đọc có thể tìm hiểu thêm ở các đường link reference đi kèm với bài viết này.

Reference:
1.http://docs.scala-lang.org/overviews/core/futures.html
2.https://www.amazon.com/Programming-Scala-Scalability-Functional-Objects/dp/1491949856/ref=dp_ob_title_bk

Add a Comment

Scroll Up