Stream Processing with ZIO stream

ZIO

ZIO là một Functional Effect System, được cho là sự kết hợp của Reader, Either và Future:

ZIO[R, E, A] ≅ R => Either'[E, A]

R: Environment, đơn giản có thể hiểu để thực hiện ZIO effect thì cần 1 giá trị kiểu R (Reader), một hình thức của dependency injection.

E: Lỗi của effect

A: Giá trị của effect

Either’ mô tả giá trị khi thực hiện function đó đồng bộ hoặc bất đồng bộ.

ZStream[R, E, O] được xây dựng dựa trên ZIO, có thể hình dung ZStream giống như Iterator[O] với thêm nhiều tính năng.

Khi làm việc với ZIO, mọi effect đều nằm trong ZIO

ZStream & Akka Stream

Không như Akka stream gồm: Source, Flow, Sink; ZStream gồm 2 phần chính: ZStream, ZSink. Trong đó Source – ZStream mô tả nguồn (nơi sản xuất dữ liệu, tương tự vòi nước), Sink – ZSink mô tả chỗ hứng dữ liệu sau khi trải qua 1 loạt các phép biến đổi, effect (sink: bồn). ZStream tích hợp sẵn các phép biến đổi như: map, flatMap, mapMPar,… không chia ra 1 kiểu riêng là Flow như Akka stream. 1 vài khái niệm có vài nét tương tự nhất định trong reactive stream:

Source ~ Publisher ~ Observable

Sink ~ Subcriber ~ Observer

Ví dụ

Crawl data từ 1 api và load vào mongo

  1. API

    Có 3 api để lấy dữ liệu. Mục tiêu là lấy hết id từ api 1 và 2 để call api thứ 3.
    api 1: /content/all/{category}
    api 2: /content/all/{category}/{subcategory}
    api 3: /content/{id}
    Nếu api 1 trả về canRequestMore true thì call Api 2 với subcategory là nested.  Với contentType là Indivdual lấy package id, Bundle thì lấy hết id trong bundleItems
  2. Stream

Chương trình thực hiện lấy id từ /content/all api đồng thời tối đa 16 task được upsert vào db và chạy với Sink đếm số record thực hiện.

mapConcatM: với mỗi element của stream, tạo ra 1 effect mới trả về 1 Seq các phần tử, các Seq này sẽ được concat với nhau theo thứ tự. Eg: ZStream(1,2,3).mapConcatM(i => ZIO.succeed(Seq.fill(i)(i))) == ZStream(1,2,2,3,3,3)

mapMPar(n): Với mỗi element của stream, tạo ra 1 effect từ element đó, tối đa n effect được thực hiện cùng lúc.
M trong mapM, mapMPar, mapConcatM,…: phần tử nằm trong effect này sẽ sinh ra 1 effect mới (M: Monad)

Kết

ZIO và ZStream hỗ trợ nhiều phép biến đổi dễ dàng hơn so với Akka stream, nhưng có 1 số mà akka làm đc hiện chưa làm đc ở zio (có thể là do mình chưa tìm được)

Full example: https://github.com/qhquanghuy/zio-simple-api-crawler/blob/main/app/src/console/main.scala#L176

ZIO homepage: https://zio.dev/docs/getting_started.html

Add a Comment

Scroll Up