Programowanie reaktywne z wykorzystaniem Project Reactor

Cel zastosowania

Dzisiejsze systemy często muszą stawiać czoła wyzwaniu konieczności szybkiego przetwarzania dużej ilości danych i odpowiedniemu reagowaniu na napływające informacje. Podejście reaktywne to odpowiedź na wymagania co do dostępności tych systemów oraz ich responsywności.

Założenia określa dokument The Reactive Manifesto.

Czym jest programowanie reaktywne?

Programowanie reaktywne odchodzi od liniowego modelu żądania i odpowiedzi obsługiwanych w jednym wątku proponując w zamian wielowątkową, nieblokującą, obsługę żądań bazującą na zdarzeniach (ang. event-driven). Wykorzystanie asynchroniczności oraz strumieni danych pozwala na poradzenie sobie z dużą liczbą konkurencyjnych żądań pochodzących z różnych źródeł. Dodatkowo – maksymalne wykorzystanie zasobów sprzętowych przyczynia się do wzrostu wydajności aplikacji w porównaniu z tradycyjnym podejściem. Uzyskane w ten sposób określone, szybkie i zbliżone czasy reakcji zapewniają użyteczność systemu oraz stałą jakość dostarczanych przez niego usług.

Zwrot „reaktywny” pochodzi od reagowania na pojawiające się dane, w przeciwieństwie do oczekiwania na nie – przechodzimy więc z modelu pull na push.

Mechanizm

Tradycyjne podejście MVC łączy się z blokowaniem przepływu – żądanie przesłane do serwera implikuje utworzenie wątku servletu, który deleguje do wątków roboczych wykonanie operacji odczytu/zapisu danych np. z bazy. W tym czasie główny wątek jest zblokowany.

Blocking request processing 1024x341 1

Źródło: https://howtodoinjava.com/spring-webflux/spring-webflux-tutorial/

W programowaniu reaktywnym wątek główny powiązany jest z funkcją obsługi oraz funkcją zwrotną. Deleguje on żądanie do puli wątków. Wątek z tej puli wywołuje funkcję obsługi, np. odczytania danych z bazy. Kiedy dane te są gotowe do odebrania – informacja o tym zostaje wysłana jako zdarzenie, wolny wątek z puli je odbiera, a następnie przekazuje za pomocą funkcji zwrotnej do wątku głównego.

Non blocking request processing 1024x374 1

źródło: https://howtodoinjava.com/spring-webflux/spring-webflux-tutorial/

Stąd wzięło się określenie aplikacji reaktywnych jako nieblokujących, asynchronicznych i bazujących na zdarzeniach.

Całość opiera się na wzorcu projektowym Obserwator (ang. observer). Pojawiają się więc też takie pojęcia jak:

  • publisher (wydawca) – dostarcza dane – potencjalnie nieograniczoną liczbę elementów
  • subscriber (subskrybent) – oczekuje na dane nasłuchując wydawcy

Warto również wspomnieć w tym miejscu o zjawisku backpressure. Jest to sytuacja, w której napływ danych od wydawcy jest większy niż możliwości ich przetworzenia przez subskrybenta. Istnieje kilka strategii radzenia sobie z problemem – buforowanie, przetwarzanie paczkami, pomijanie elementów np. najstarszych lub najnowszych itp.

Reactive Streams API

Propozycją mającą na celu określenie standardu dla rozwiązań reaktywnego podejścia do tworzenia systemów jest specyfikacja Reactive Streams. Stała się ona częścią Java 9. Definiuje interfejsy:

  • publisher – Znany nam już wydawca, który emituje sekwencję zdarzeń do subskrybentów[code lang=”java”]
    public interface Publisher<T>
    {
    public void subscribe(Subscriber<? super T> s);
    }
    [/code]
  • subscriber – Subskrybent[code lang=”java”]
    public interface Subscriber
    {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
    }
    [/code]
  • subscription – Pozwala na określenie relacji pomiędzy wydawcą a subskrybentem, który deklaruje za pomocą metod tego interfejsu zainteresowanie danymi bądź anulowanie nasłuchiwania[code lang=”java”]
    public interface Subscription
    {
    public void request(long n);
    public void cancel();
    }[/code]
  • processor – Określa stan wymiany informacji oraz zapewnia zachowanie kontraktu pomiędzy subskrybentem a wydawcą[code lang=”java”]
    public interface Processor<T, R> extends Subscriber<T>, Publisher<R>
    {
    }
    [/code]

Istnieje kilka implementacji specyfikacji Reactive Streams dla JVM. Dwie najbardziej popularne to RxJava oraz Project Reactor

Project Reactor

Jest to biblioteką wykorzystywana przez moduł Spring WebFlux, który jest częścią Spring 5 i wprowadza wsparcie dla reaktywnego programowania w tworzeniu aplikacji webowych – łączne z dostarczeniem mechanizmów radzenia sobie ze zjawiskiem backpressure.

Podstawą aplikacji reaktywnych są strumienie danych pochodzące od wydawców. Project Reactor wykorzystuje 2 typy takich strumieni:

  • Flux – produkuje od 0 do N elementów, a więc może to być strumień nieograniczony[code lang=”java”]Flux<String> dataStream = Flux.just(“test1”, “test2”, “test3”);[/code]
  • Mono – produkuje od 0 do 1 elementów, jest używany przez metody zwracające pojedynczy element[code lang=”java”]Mono<String> just = Mono.just(“test”);[/code]

Oba te typy są implementacją interfejsu Pubisher.

Przykład wydawcy – serwer SSE

Wyobraźmy sobie aplikację, która powinna przetwarzać pojawiające się dane i automatycznie wyświetlać informacje o ich aktualizacji dla użytkownika. Niech będzie to aplikacja finansowa obsługująca bieżące operacje – wpływy na konto. Zasymulujmy sobie źródło takich danych.

Dodajmy zależności Mavena do pliku konfiguracji pom.xml – Spring WebFlux oraz biblioitekę Lombok, która pozwoli nam na pozbycie się nadmiarowego kodu

[code lang=”xml”]
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.18</version>
</dependency>
[/code]

utwórzmy klasę reprezentującą transakcję finansową

[code lang=”java”]
@Data
@AllArgsConstructor
public class Transfer {

private LocalDateTime date;
private String title;
private BigDecimal amount;

public static Transfer generate() {
return new Transfer(LocalDateTime.now(), „Wypłata z ASC ;)”, randomAmount());
}

private static BigDecimal randomAmount() {
BigDecimal randomDecimal = BigDecimal.valueOf(Math.random())
.multiply(BigDecimal.valueOf(15000));
return randomDecimal.setScale(2, BigDecimal.ROUND_DOWN);
}
}
[/code]

a następnie przykładowy kontroler…

[code lang=”java”]
@RestController
public class TransferController {

@GetMapping(value = „/transfers”, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Publisher<Transfer> transfers() {
Flux<Transfer> transfersFlux = Flux.fromStream(Stream.generate(Transfer::generate));
Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(2));
return Flux.zip(transfersFlux, durationFlux).map(Tuple2::getT1);
}
}
[/code]

Po uruchomieniu aplikacji i otworzeniu URLa http://localhost:8080/transfers w przeglądarce pokażą się wyniki – nowy przelew co dwie sekundy:

data:{"date":"2021-02-19T10:11:22.065","title":"Wypłata z ASC ;)","amount":12992.87}
data:{"date":"2021-02-19T10:11:23.128","title":"Wypłata z ASC ;)","amount":5775.30}
data:{"date":"2021-02-19T10:11:24.084","title":"Wypłata z ASC ;)","amount":7840.29}
data:{"date":"2021-02-19T10:11:25.08","title":"Wypłata z ASC ;)","amount":13100.37}
data:{"date":"2021-02-19T10:11:26.081","title":"Wypłata z ASC ;)","amount":8928.89}
data:{"date":"2021-02-19T10:11:27.078","title":"Wypłata z ASC ;)","amount":13824.04}
data:{"date":"2021-02-19T10:11:30.083","title":"Wypłata z ASC ;)","amount":10135.54}
data:{"date":"2021-02-19T10:11:31.081","title":"Wypłata z ASC ;)","amount":7831.80}
data:{"date":"2021-02-19T10:11:32.075","title":"Wypłata z ASC ;)","amount":13472.26}

Przykład subskrybenta

Załóżmy potrzebę implementacji aplikacji, która będzie wyświetlała dane użytkowników. Możliwe będzie odpytanie o użytkownika po jego identyfikatorze. Bazę danych zasymulujemy w postaci mapy, a czas dostępu jako 2 sekundy.

[code lang=”java”]
@RestController
public class UserController {

private final Map<Integer, String> users = new HashMap<Integer, String>() {{
put(1, „Test User 1”);
put(2, „Test User 2”);
put(3, „Test User 3”);
put(4, „Test User 4”);
put(5, „Test User 5”);
}};

@GetMapping(„/person/{id}”)
public String getPerson(@PathVariable int id) throws InterruptedException {
Thread.sleep(2000L);
return users.get(id);
}
}
[/code]

Odpytajmy o użytkowników kolejno w klasyczny sposób – z użyciem RestTemplate…

[code lang=”java”]
@Slf4j
public class UserRestTemplateExample {

private static final RestTemplate restTemplate = new RestTemplate();
static {
restTemplate.setUriTemplateHandler(
new DefaultUriBuilderFactory(„http://localhost:8080”)
);
}

public static void main(String[] args) {
Instant start = Instant.now();
for (int i = 1; i <= 5; i++) {
restTemplate.getForObject(„/person/{id}”, String.class, i);
}
log.info(„Czas wykonania: ” + Duration.between(start, Instant.now()));
}
}
[/code]

Jaki będzie rezultat?

[main] INFO reactor.web.consumer.UserRestTemplateExample - Czas wykonania: PT10.283S

Czas oczekiwania na wynik powyżej 10 sekund – raczej nieakceptowalny w systemach, gdzie reakcja na zmieniające się dane powinna odbywać się jak najszybciej.

Spróbujmy więc do naszego zadania użyć WebClient będącego częścią Spring WebFlux. Spodziewamy się pojedynczego wyniku zapytania – używamy więc typu Mono:

[code lang=”java”]
@Slf4j
public class WebClientExample {

private static final WebClient client = WebClient.create(„http://localhost:8080”);

public static void main(String[] args) {
Instant start = Instant.now();
List<Mono<String>> list = Stream.of(1, 2, 3, 4, 5)
.map(i -> client.get().uri(„/person/{id}”, i).retrieve().bodyToMono(String.class))
.collect(Collectors.toList());

Mono.when(list).block();
log.info(„Czas wykonania: ” + Duration.between(start, Instant.now()));
}
}
[/code]

Tyle, że zamiast czekać na każdy wynik z osobna – czekamy na całą listę. Efekt?

[main] INFO reactor.web.consumer.WebClientExample - Czas wykonania: PT3.88S

3.88 sekundy – i to przy dwusekundowym opóźnieniu na odpytaniu o pojedynczy wynik.

Podsumowanie

Podejście reaktywne – nieblokujące, asynchroniczne – nie sprawia, że pobieranie danych na elementarnym poziomie trwa krócej. Pozwala za to na lepsze skalowanie aplikacji poprzez użycie niewielkiej, stałej liczby wątków, a co za tym idzie – na mniejsze zużycie pamięci. Dzięki temu możliwe jest płynne, bardziej przewidywalne działanie aplikacji pomimo pojawiających się problemów wynikających z obciążenia i dużego ruchu.

Warto zerknąć

Autor: Paweł Talacha – Wiodący Programista Java