প্রজেক্ট রিয়েক্টর, দ্বিতীয় পর্ব।
জাভা স্ট্রিমস এবং রিএক্টিভ স্ট্রিমস এর মধ্যে অনেকটা মিল রয়েছে আবার কিছু পার্থক্য রয়েছে। আসুন আমরা এখন জাভা স্ট্রিমস এবং রিএকটিভ স্ট্রিমস এর মধ্যে পার্থক্য দেখব।
ধরুন একটা উপাদানের লিস্ট ইটারেট করে আমাদের কিছু ভ্যালু প্রিন্ট করতে হবে সে ক্ষেত্রে আমরা স্ট্রিম এপিআই ব্যবহার করতে পারি, খেয়াল করুন আমরা যে লিস্ট ইটারেট করতে যাচ্ছি সেটাতে ফিনিট সেট অফ ডেটা আছে। অলরেডি লিস্ট এর সাইজ ফিক্সট, তার মানে বোঝা গেল আমরা নির্দিষ্ট সংখ্যক ডেটার উপর ইটারেট করতেছি।
সেহেতু লিস্ট থেকে একটি একটি করে উপাদান আসবে এবং উপাদানগুলোকে মডিফাই করব সুতরাং বলা যায় জাভা স্ট্রিমস বাই ডিফল্ট সিনক্রোনাস।
রিএক্টিভ স্ট্রিমস এর বিহেভিয়ার কিছুটা আলাদা, এর মাধ্যমে ইনফিনিট ডেটা সেট অ্যাসিনক্রোনাসলি প্রসেস করা যায়, রিয়েল টাইম ডেটা যখনই স্ট্রিমসে এভেইলেবল হয় তখন ব্যাক প্রেসার এর মাধ্যমে সাবস্ক্রাইবারকে জানিয়ে দেয়।
জাভা ৮ এর প্রধান ফিচার গুলোর মধ্যে দুটি ফিচার হচ্ছে স্ট্রিমস এপিআই এবং অপশনাল।
Streams<Integer> API: আমরা স্ট্রিমস পাইপ লাইন ব্যবহার করে ০ থেকে N সংখ্যক উপাদান বিভিন্ন ধরনের ফিল্টারের মডিফাই করে আবার টার্মিনাল অপারেশনের মাধ্যমে কোন একটা লিস্ট অথবা ম্যাপে জমা করতে পারি। তারমানে একের অধিক উপাদান প্রসেস করা সম্ভব।
চলুন একটু উদাহরণ দেখে আসি, একটি ইন্টিজার স্ট্রিমস থেকে ফিল্টার করে একটি লিস্ট তৈরি করব।
public static void main(String[] args) { var integerStreams = Stream.of(10, 20, 25, 30, 35, 40); var filteredIntegerList = integerStreams.filter(n -> n % 2 == 0).collect(Collectors.toList()); filteredIntegerList.forEach(System.out::println); }
যেহেতু ইন্টিজার স্ট্রিমসকে ব্যবহার করে filteredIntegerList তৈরি করে ফেলেছি, এখন যদি দ্বিতীয়বার ব্যবহার করতে চাই সেক্ষেত্রে IllegalStateException: খাবে।
public static void main(String[] args) { var integerStreams = Stream.of(10, 20, 25, 30, 35, 40); var filteredIntegerList = integerStreams.filter(n -> n % 2 == 0).collect(Collectors.toList()); filteredIntegerList.forEach(System.out::println); integerStreams.filter(n -> n % 2 == 1).forEach(System.out::println); }
আউটপুটঃ
public static void main(String[] args) { Optional<Integer> optionalInteger = Stream.of(10, 20, 40).findFirst(); Optional<Integer> optionalEmptyInteger = Stream.of(10, 20, 40).filter(n -> n % 2 == 1).findFirst(); optionalInteger.ifPresent(System.out::println); optionalEmptyInteger.ifPresent(System.out::println); }
optionalInteger এ স্ট্রিমস এর প্রথম উপাদান ১০ পাওয়া যাবে, অন্যদিকে optionalEmptyInteger এ স্ট্রিমস ইটারেট করে কোন বিজোড় সংখ্যা না পাওয়ায় খালি থেকে যাবে।
Mono<Integer> অনেকটা Optional<Integer> এর মত বাট যখন ডেটা সোর্স থেকে একটি ডেটা ইমিট করবে তখন ডেটা রিটার্ন করবে এবং onComplete() মেথড কল করে বের হয়ে আসবে যদি ডেটা সোর্স এরর আসে তখন onError() মেথড কল করে জানিয়ে দিবে ডেটা সোর্স থেকে ডেটা পাওয়া যায়নি ডেটার পরিবর্তে এরর পাওয়া গেছে।
মনো হচ্ছে রিয়াক্টিভ স্ট্রিমস থেকে পাওয়া সিঙ্গেল ইউনিট।
মনো কয়েক ভাবে তৈরি করা যায়ঃ
6. Mono.fromFuture(supplier)
Mono.just() এর মাধ্যমে আমরা মনোর একটি সিঙ্গেল ইউনিট তৈরি করতে পারি, যখন সাবস্ক্রাইবার সাবস্ক্রাইব করবে তখন মনোর ভিতরের ইন্টিজার ভ্যালুটি প্রিন্ট করবে। Mono.just() হচ্ছে এগার পাবলিশার, কেউ সাবস্কারাইব করুক আর না করুক স্ট্রিমসে ভ্যালু পাবলিশ করে বসে থাকে।
public static void main(String[] args) { Mono<Integer> integerMono = Mono.just(2021).log(); integerMono.subscribe(System.out::println); }
আউটপুটঃ
@Test public void defer(){ Supplier<Mono<String>> supplierMono = () -> Mono.just("Hello"); Mono<String> deferMono = Mono.defer(supplierMono).log(); deferMono.subscribe(System.out::println); }
Mono.create() মনো তৈরি করার সবচেয়ে এডভান্স মেথড, এটির মাধ্যমে অন্য কোন ডেটা সোর্স থেকে ঈমিট করা ভ্যালুকে পুর্ন কন্ট্রোল করার সুবিধা দেয়। প্রথম সোর্স থেকে পাওয়া ডেটার উপর ভিত্তি করে MonoSink.success(value), MonoSink.error(throwable) মেথডের মাধ্যমে সাকসেস অথবা এরর মনো তৈরি করতে পারি।
@Test public void create() { String sendMessage = "Привет мир"; Consumer<MonoSink<String>> callback = (MonoSink<String> sink) -> { if (isValidEncoded(sendMessage)) { sink.success(sendMessage); } else { sink.error(new RuntimeException("Encoding failed")); } }; Mono<String> createMono = Mono.create(callback); createMono.subscribe(System.out::println); } private boolean isValidEncoded(String message){ byte[] bytes = message.getBytes(StandardCharsets.UTF_8); String asciiEncodedString = new String(bytes, StandardCharsets.US_ASCII); return message.equals(asciiEncodedString); }
Mono.fromCallable(() -> “Hello”) মাধ্যমে কোন Callable থ্রেডের রেসপন্স থেকে মনো তৈরি করতে পারি।
@Test public void fromCallable() { Callable<String> stringCallable = () -> "Hello"; Mono<String> callableMono = Mono.fromCallable(stringCallable); callableMono.subscribe(System.out::println); }
Mono.fromSupplier(() -> “Hello”) মেথড কল করে কোন Supplier থেকে মনো তৈরি করতে পারি।
@Test public void fromSupplier() { Supplier<String> supplier = () -> "Hello"; Mono<String> supplierMono = Mono.fromSupplier(supplier); supplierMono.subscribe(System.out::println); }
Mono.fromFuture(supplier) মেথড Supplier গ্রহণ করে, Supplier এর রিটার্ন টাইপ অবশ্যই CompletableFuture টাইপের হতে হবে।
@Test public void fromFuture1() { Supplier<CompletableFuture<String>> supplier = () -> CompletableFuture.completedFuture("hello"); Mono.fromFuture(supplier).subscribe(System.out::println); }
Mono.fromFuture(completableFuture) মেথড সরাসরি CompletableFuture এর ইন্সটান্স ও পাঠানো যায় মনো তৈরি করার জন্য।
@Test public void fromFuture2() { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync( () -> "Hello"); Mono.fromFuture(completableFuture).subscribe(System.out::println); }
ফ্লাক্স হচ্ছে রিয়াক্টিভ স্ট্রিমস থেকে পাওয়া মাল্টিপল ইউনিট। ফ্লাক্স শূন্য থেকে n সংখ্যক উপাদান ইমিট করতে পারে, ফ্লাক্স পাইপলাইন সম্পূর্ণ এসিনক্রোনাসলি ডেটা সার্ভ করে, অন্যদিকে জাভা স্ট্রিম পাইপলাইন সিনক্রোনাস।ফ্লাক্স হচ্ছে রিয়াক্টিভ স্ট্রিমস পাইপলাইন থেকে 50 থেকে 100 উপাদান(ফিনিট সংখ্যক) ইমিট করার পরিবর্তে এই পাইপলাইন থেকে যেই উপাদানগুলো পাওয়া যায় সেগুলো হচ্ছে রিয়াল টাইম ডেটা(ক্রিকেটে খেলার স্কোর)।
ফ্লাক্স কয়েক ভাবে তৈরি করা যায়।
@Test public void fluxJust() { Flux<String> justFlux = Flux.just("Time ", "for ", "fun"); justFlux.subscribe( data -> System.out.println("Data " + data), error -> System.out.println("Error found " + error), () -> System.out.println("End")); }
@Test public void 1luxFromArray() { Flux<String> fluxFromArray = Flux.fromArray(new String[]{"Time ", "for ", "fun"}); fluxFromArray.subscribe(System.out::println); }
@Test public void fluxFromStream() { var stringStream = Stream.of("Time ", "for ", "fun"); Flux<String> fluxFromStream = Flux.fromStream(stringStream); fluxFromStream.subscribe(System.out::println); }
4. Flux.create((FluxSink<Integer> fluxSink) ফ্লাক্স স্ট্রিমস তৈরি করার সবচেয়ে এডভান্স মেথড, এটির মাধ্যমে অন্য কোন ডেটা সোর্স থেকে ইমিট করা ভ্যালু পুর্ন কন্ট্রোল করার সুবিধা দেয়। প্রথম সোর্স থেকে পাওয়া ডেটার উপর বিভিন্ন ক্যালকুলেশন করে মেথডের মাধ্যমে সাকসেস অথবা এরর রিটার্ন করতে পারি। যখন কোন সাবস্ক্রাইবার এই স্ট্রিমসকে সাবস্ক্রাইব করবে প্রত্যেকে একটি করে FluxSink ইন্সটান্স পাবে যার মাধ্যমে ০ থেকে n সংখ্যক ভ্যালু পেতে পারে। সাবস্ক্রাইবার(ডাউন স্ট্রিমস) সিদ্ধান্ত নিবে কতগুলি ডেটা সোর্স থেকে পেতে চাচ্ছে। যদি কোন কারণে সোর্স থেকে ইমিটেড ডেটা কঞ্জিউম করতে ব্যার্থ হয় তা হলে রিয়েক্টর নতুন ডেটা পাবলিশ না করে সাবস্ক্রাইবারের রেসপন্স এর জন্য অপেক্ষা করতে থাকে।
@Test public void fluxCreate() { Consumer<FluxSink<Integer>> emitter = (FluxSink<Integer> fluxSink) -> { var random = new Random(); IntStream.range(1, 4) .map(n -> { int rand = random.nextInt(10); System.out.println("emitting number : " + rand); return rand; }) .forEach(value -> { if (value != 10) { fluxSink.next(value); } else { fluxSink.error(new RuntimeException("we cannot proceed value no 10")); } }); }; Flux<Integer> flux = Flux.create(emitter);//.log(); flux.delayElements(Duration.ofMillis(0)) .subscribe(value -> System.out.println("Subscriber 1 " + value)); flux.delayElements(Duration.ofMillis(1)) .subscribe(value -> System.out.println("Subscriber 2 " + value)); flux.delayElements(Duration.ofMillis(2)) .subscribe(value -> System.out.println("Subscriber 3 " + value)); }
আউটপুটঃ
@Test public void fluxDefer() { //call network /** * key company name * value share quantity * Map<String, Integer> stocks = new HashMap<>(); * stocks.put("GOOG",5); * stocks.put("CSCO",8); * stocks.put("FB",6); */ Supplier<Flux<Double>> supplier = () -> Flux.just( YahooFinance.getPrice("GOOG"), YahooFinance.getPrice("CSCO"), YahooFinance.getPrice("FB")); Flux<Double> flux = Flux.defer(supplier); flux.subscribe(company -> System.out.println("stock price of 2017-05-01 " + company)); }
আউটপুটঃ
@Test public void fluxDefer() { //call network /** * key company name * value share quantity * Map<String, Integer> stocks = new HashMap<>(); * stocks.put("GOOG",5); * stocks.put("CSCO",8); * stocks.put("FB",6); */ Supplier<Flux<Double>> supplier = () -> Flux.just( YahooFinance.getPrice("GOOG"), YahooFinance.getPrice("CSCO"), YahooFinance.getPrice("FB")); Flux<Double> flux = Flux.defer(supplier); //flux.subscribe(company -> System.out.println("stock price of 2017-05-01 " + company)); }
আউটপুটঃ
@Test public void fluxZip1() { Flux<String> nameFlux = Flux.just("Russia", "Ukraine", "Bangladesh"); Flux<String> capitalFlux = Flux.just("Moscow", "Kyiv", "Dhaka"); Flux<Country> countryFlux = Flux.zip(nameFlux, capitalFlux) .flatMap(dFlux -> Flux.just(new Country(dFlux.getT1(), dFlux.getT2()))); countryFlux.subscribe(System.out::println); }
আউটপুটঃ
country info Country{name=’Bangladesh’, capital=’Dhaka’}
একটি বিষয় লক্ষণীয় যদি কোন ডেটা সোর্স অন্য কমপ্লিট ইভেন্ট ট্রিগার করে, অন্যান্য সোর্স ডেটা থাকলে ও পাওয়া যাবে না।
উদাহরণ-২
@Test public void fluxZip2() { Flux<String> nameFlux = Flux.just("Bangladesh","Russia", "Ukraine"); Flux<String> capitalFlux = Flux.just("Dhaka"); Flux<Country> countryFlux = Flux.zip(nameFlux, capitalFlux) .flatMap(dFlux -> Flux.just(new Country(dFlux.getT1(), dFlux.getT2()))) //.log() ; countryFlux.subscribe(country -> System.out.println("country info "+country)); }
আউটপুটঃ
country info Country{name=’Bangladesh’, capital=’Dhaka’}
Volunteer at Java User Group Bangladesh (JUGBD)
cmabdullah21@gmail.com @cmabdullah21