Java 9 újdonságai – 4. rész

Reactive Streams API – reaktív programozás

“A reaktív folyam egy kezdeményezés egy szabvány létrehozására, ami nem blokkoló háttérnyomás esetén is aszinkron folyam feldolgozást tesz lehetővé.” Mi van? – kérdezheted magadban. Pedig ez a lényeg. Bontsuk le kicsit, hogy honnan is származik ez az egész reaktív programozás és mi az értelme! Egy fontos koncepcióról van szó.

A reaktív programozás előzményei

Napjaink alkalmazásainak nagyon más követelményeknek kell megfelelniük, mint néhány évvel ezelőtt. Régebben elég volt, ha a program sok másodperccel később reagált a felhasználói inputra. Az se volt gond, ha néha pár órára leállították karbantartás miatt az egész rendszert. A legnagyobb rendszerek is csak néhány gigabájtnyi adatot kezeltek és az egész néhány szerveren futott.

Ezzel szemben ma a felhasználók inkább milliszekundumban mérhető válaszidőt várnak még a mobiljukról elért webes alkalmazásoktól is. Sokszor már petabájtnyi adatot kezelnek (a gigabájt egymilliószorosa) és felhő alapú klasztereken futnak, ahol több száz vagy ezer magos processzor szolgálja ki a számítási igényüket. A rendelkezésreállás 99,9999% vagy még több kilences a végén, attól függ, hogy milyen rendszerről van szó.

Egyszerűen túl nagy az elvárások közti különbség, és a régen bevált módszerek már nem elégítik ki ezeket az új igényeket. Szükség van tehát valami újra. Ez az új megközelítés a reaktív programozás.

Reaktív programozás

Nagyvállalatok egymástól függetlenül fejlesztették le az IT-s rendszereiket, amik között rengeteg hasonlóság volt, bár mindegyik másképp lett implementálva. Az igények már jól körvonalazódtak külön-külön. Milyen alkalmazásra van manapság szükség? Olyanra, ami

  • jól reagál (reactive): A program a vele folytatott kommunikáció során gyorsan ad választ, ezáltal a problémák is gyorsan felfedezhetők és orvosolhatók benne. Az ilyen rendszerek gyors és konzisztens módon adnak választ a kérésekre, megbízható felső korlátot specifikálnak válaszidőnek.
  • öngyógyító (resilient): A program hiba esetén is jól reagáló marad. A hibákat adott komponensen belül kell kezelni és a különböző komponenseket izolálni kell egymástól, ezáltal lehetővé téve, hogy a rendszer részei meghibásodhassanak és helyreállhassanak az egész rendszer kompromittálása nélkül. A komponensek helyreállítását egy másik külső komponensnek való feladatdelegálással lehet megoldani, a magas elérhetőséget pedig replikációval, amikor szükséges. Az adott komponenst használó klienseket nem terheljük meghibásodás kezelésével.
  • alkalmazkodó (elastic): Az alkalmazás akkor is jól reagál, amikor nagy terhelésnek van kitéve (pl. Neptun :-)). A reaktív rendszerek képesek detektálni a terhelés mértékét és ennek megfelelően allokálni erőforrásokat a programhoz. Ennek persze vonzata, hogy globális bottleneckektől mentes architektúrájú legyen a szoftver. Az ilyen rendszerek prediktív és reaktív skálázó algoritmusokat is támogatnak és az alkalmazkodókészségüket költséghatékony módon, mindenki által elérhető hardveren és szoftveren valósítják meg. (Vagyis nem kell hozzá szuperszámítógép.)
  • üzenet vezérelt (message driven): A program aszinkron üzenetátadáson alapul, amivel jól elhatárolhatók a komponensek egymástól és azok laza csatolását eredményezi. Ez az izoláció azt is lehetővé teszi, hogy a hibákat, mint üzeneteket delegáljuk. Az explicit üzenetátadással képessé válik a terheltség kezelésére, illetve alkalmazkodás érhető el azzal, hogy az üzenetsorok hosszát megfigyeli és formálja az igényeknek megfelelően.

Durván 3 évvel ezelőtt (2014. szeptember 16-án) tették közzé a második verzióját a “Reactive Manifesto”-nak, ami ezeket az igényeket fogalmazta meg.

Új programozási paradigma

A reaktív programozás tehát nem más, mint egy új programozási paradigma, ahol adatelemek aszinkron folyamát dolgozzuk fel, és ahol az alkalmazások az új adatelemekre azok megjelenésekor reagálnak.

Az adatok folyama nem más, mint időben egymás után levő adatelemek. Ezzel a módszerrel sok memóriát spórolhatunk meg és így hatékonyabbak lehetünk, mert az adatokat folyamként dolgozzuk fel, és nem kell egy memóriában összegyűlt adathalmazon végigiterálni.

Reaktív folyamok

Amikor a Reactive Manifesto alapján a reaktív folyam megközelítés útjára indult, akkor az volt a cél, hogy egy szabványos megoldást adjanak az aszinkron folyamfeldolgozásra, nem blokkoló háttérnyomás esetén. (Erre a háttérnyomásos dologra rögtön kitérünk részletesebben.) A fő kihívás nem az volt, hogy megoldást találjanak erre a problémára – hisz ekkor már több implementáció is létezett – hanem az, hogy a különböző megoldásokat összeolvasszák és megtalálják azt a minimális halmazát az interfészeknek, metódusoknak és protokolloknak, ami leírja a szükséges műveleteket és entitásokat, hogy elérhessük ezt a célt. Ha még mindig olvasod és idáig eljutottál, akkor megérdemelnél egy ingyen csokit!

Háttérnyomás

A háttérnyomás a kulcs koncepció itt, vagyis angolul a “back pressure”. Képzeld el a termelő (producer) és fogyasztó (consumer) feleket a rendszerben! A termelő adja és a fogyasztó kapja az üzeneteket. A fogyasztó feliratkozik a termelő adatforrására, ha szeretne tőle üzeneteket kapni. Ezek után, amikor egy új üzenet elérhetővé válik, a termelő a fogyasztónak egy callback metódusát meghívva feldolgozásra átadja az üzenetet.

Ha a termelő az üzeneteket nagyobb ütemben adja ki magából, mint ahogy a fogyasztó képes azt feldolgozni, akkor a fogyasztó rá lesz kényszerítve arra, hogy egyre több és több erőforrást ragadjon magához, amely jó eséllyel crash-eli a fogyasztót. Ennek megakadályozására szükség van egy mechanizmusra, hogy a fogyasztó értesíteni tudja a termelőt, hogy az üzenetküldési sebességet csökkentse. A termelő ekkor néhány különböző módszer közül választhat, amivel ezt a helyzetet kezelheti. Ezt a mechanizmust hívjuk háttérnyomásnak, vagyis back pressure-nek.

Blokkoló háttérnyomás

Ezt a legegyszerűbb elérni. Ha a termelő és a fogyasztó azonos szálon futnak, akkor az egyik végrehajtása blokkolja a másik végrehajtását. Probléma megoldva. Persze sok esetben ez nem eszközölhető. Gondolj csak arra, hogy a termelőnek nem csak ez az egy fogyasztója van, hanem több is, és mindegyik más sebességgel képes az üzeneteket feldolgozni. Legtöbbször nem is megoldható, hogy azonos szálon fussanak, mert különböző környezetekben vannak. Ilyen esetekben szükséges egy nem blokkoló háttérnyomási mechanizmus.

Nem blokkoló háttérnyomás

Ezt úgy érhetjük el, hogy a push stratégiánkat lecseréljük pull stratégiára. Vagyis nem akkor történik küldés, amikor a termelő készen áll az új üzenetekkel, hanem akkor, amikor a fogyasztó szól, hogy készen áll x üzenet fogadására. Ekkor a termelő pontosan ennyi üzenetet küld és vár, mielőtt továbbiakat küldene.

JDK 9 Flow API

A Java 9-ben megalkotott, a fentieket minimális méretű interfészekkel támogató API-ja a JEP 226: More Concurrency Updates keretein belül valósult meg. A következő 4 interfészt tartalmazza:

@FunctionalInterface
public static interface Flow.Publisher<T> {
    void subscribe(Flow.Subscriber<? super T> subscriber);
}

public static interface Flow.Subscriber<T> {
    void onSubscribe(Flow.Subscription subscription);
    void onNext(T item);
    void onError(Throwable throwable);
    void onComplete();
}

public static interface Flow.Subscription  {
    void request(long n);
    void cancel() ;
}

public static interface Flow.Processor<T,R>extends Flow.Subscriber<T>, Flow.Publisher<R> {  
}

Java reaktív programozás Publisher és Subscriber szerepeket szemléltető ábra

Nomenklatúra

Korábbi elnevezéseink alapján (termelő, fogyasztó) a Subscriber a fogyasztó, de a Java 9 Flow API nomenklatúrája alapján a feliratkozó, a Publisher pedig a termelő, vagyis a publikáló (kiadó). Ezeket az elnevezéseket magyarul nem tartom szerencsésnek, így ezekre az angol nevükkel, vagy a magyarban meghonosult általános nevükkel fogok hivatkozni a továbbiakban.

Subscriber

A Subscriber feliratkozik a Publisher-hez annak subscribe metódusának meghívásával. Az adatelemek nem kerülnek kiküldésre a Subscriberhez, amíg azokat nem kéri explicite. A Subscriber Subscription-ön hívott metódusai szigorúan rendezettek. Az alkalmazás a következő callback-ekre tud reagálni, amik a Subscriberen vannak definiálva:

CallbackLeírás
onSubscribeEz a metódus kerül meghívásra minden egyéb Subscriber metódus előtt az adott Subscription esetén.
onNextEz a metódus kerül meghívásra egy Subscription következő eleménél.
onErrorEz a metódus kerül meghívásra egy helyreállíthatatlan hiba esetén, ami a Publisher-ben vagy Subscription-ben történt, ami után más Subscriber metódus már nem lesz hívva a Subscription által.

Ha egy Publisher hibába ütközik, ami nem teszi lehetővé adatelemek küldését a Subscriber-nek, akkor ezen a Subscriber-en az onError metódus kerül meghívásra és további üzeneteket nem fog kapni.

onCompleteEz a metódus akkor hívódik meg, amikor már ismertté válik, hogy további Subscriber metódushívások nem lesznek egy olyan Subscription-höz, ami még nem zárult le hibával.

Subscriber példa

import java.util.concurrent.Flow.*;
...

public class MySubscriber<T> implements Subscriber<T> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        System.out.println("Megérkezett elem: " + item);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("Kész");
    }
    
}

Publisher

A Publisher az adatelemek folyamát továbbítja a feliratkozóknak. Ezt aszinkron módon teszi, általános esetben egy Executor segítségével. A Publisher-ek biztosítják, hogy a Subscriber metódushívások minden Subscription esetén szigorúan rendezett sorrendben történnek. Ha még mindig itt vagy velem, akkor az adatfolyamon érkezik neked egy újabb csoki reaktív módon.

Publisher példa a JDK SubmissionPublisher osztályával

import java.util.concurrent.SubmissionPublisher;
...

    // Publisher példányosítása
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

    // Subscriber regisztrálása
    MySubscriber<String> subscriber = new MySubscriber<>();
    publisher.subscribe(subscriber);

    // Elemek küldése
    System.out.println("Elemek küldése...");
    String[] items = {"1", "alma", "2", "körte", "3", "banán"};
    Arrays.asList(items).stream().forEach(i -> publisher.submit(i));

Subscription

Egy Publisher-t és egy Subscriber-t köt össze. A Subscriber-ek csak akkor kapnak adatelemeket, amikor explicit kérik azokat, és bármikor visszavonhatják a Subscription segítségével.

MetódusLeírás
requestHozzáadja az adott n számú elemet az aktuálisan be nem teljesített kérelmekhez az adott Subscription-nél.
cancelA Subscriber-t leállítja, ami ezek után nem kap több üzenetet. A visszavonási kérelem nem azonnal történik.

Processor

A Processor egy olyan komponens, ami Subscriber-ként és Publisher-ként is viselkedik. A Publisher és Subscriber között helyezkedik el és átalakítja az egyik folyamot egy másikra. Több Processor-t is egymás után lehet kötni, és az utolsó Processor eredményét dolgozza fel a Subscriber. A JDK nem tartalmaz konkrét Processor implementációt, így ezt az API-t felhasználó programozónak kell megvalósítania.

Processor megvalósítására egy példa, ami String-ből Integer-t készít

import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
...

public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements Processor<T, R> {

    private Function function;
    private Subscription subscription;
    
    public MyTransformProcessor(Function<? super T, ? extends R> function) {
        super();
        this.function = function;
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        submit((R) function.apply(item));
        subscription.request(1);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        close();
    }
  
}

Példa a Processor-ral történő folyam átalakításhoz

import java.util.concurrent.SubmissionPublisher;
...

    // Publisher példányosítása
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

    // Processor és Subscriber példányosítása
    MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));

    MyTransformProcessor<String, Integer> transformProcessor = new MyTransformProcessor<>(s -> Integer.parseInt(s));

    MySubscriber<Integer> subscriber = new MySubscriber<>();

    // Processor és Subscriber egymás után kötése
    publisher.subscribe(filterProcessor);
    filterProcessor.subscribe(transformProcessor);
    transformProcessor.subscribe(subscriber);

    System.out.println("Elemek küldése...");
    String[] items = {"1", "alma", "2", "körte", "3", "csoki"};
    Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
    publisher.close();

Konklúzió

A reaktív programozás megvalósulásához a Java 9 Flow API-ja egy jó kezdet. Megadja a fejlesztőknek azt a minimális interfész csomagot, amivel reaktív programok készíthetők, de időre lesz szükség, mire ennek az ökoszisztémája kialakul. Ha más termékekben is elérhetővé válik majd a reaktív programozás API-ja – mint például adatbáziskezelő rendszerekben – akkor megvalósulhatnak majd azok a modern alkalmazások, amik a Reactive Manifesto-ban leírt feltételeknek megfelelnek.
Ha érdekel a Java 9 további nyelvi újításai, akkor olvasd el a korábbi blog posztokat is a Java 9 témában!

Forrás:
https://community.oracle.com/docs/DOC-1006738
https://aboullaite.me/java-9-new-features-reactive-streams/

Ha tetszett, oszd meg!

Szólj hozzá!