Reactive Systems এর মূল লক্ষ্য কী?
স্বল্প রিসোর্স নিয়ে বেশি পরিমাণ কাজ করা, রিএক্টিভ প্রসেসিং এর মাধ্যমে অধিক পরিমাণে কনকারেন্ট রিকোয়েস্ট স্বল্প রিসোর্স (মাইক্রসার্ভিস/অ্যাপ্লিকেশন) ইন্সটান্স দিয়ে সার্ভ করা।
রিএক্টিভ প্রোগ্রামিং এর প্রধান বৈশিষ্ট্য সমূহ নিম্নে দেওয়া হল।
1. এসিনক্রোনাস এবং নন ব্লকিং।
5. ডেটা সোর্স থেকে ডেটা ব্যাক প্রেসার এর মাধ্যমে রিটার্ন করা হয়।
নন ব্লকিং অ্যাসিনক্রোনাস কমিউনিকেশন এর একটা উদাহরণ।
ধরুন আপনি হাসপাতালে গেলেন কভিড -১৯ এর ভ্যাক্সিন নিতে সেখানে গিয়ে ভ্যাক্সিন নেয়ার জন্য আপনাকে সিরিয়াল দিতে হবে, যে ভদ্রলোক সিরিয়াল নিচ্ছে সে সিরিয়াল নিয়েই যাচ্ছে একজন একজন করে, এই সিরিয়াল নাম্বার গুলি ২/৩ টি জোনে ভাগ করে পাঠানো হচ্ছে এবং সবাইকে জোন থেকে ভ্যাক্সিন দেওয়া হচ্ছে । ব্যাপারটা এরকম না যে “সিরিয়াল নেয়ার পরে ওই ভদ্রলোক ভ্যাক্সিন পেল তারপরে দ্বিতীয়জনের সিরিয়াল নেয়া হলো যদি এরকম হত তাহলে সেটা হতো সিনক্রোনাস প্রসেসিং” বরং সিরিয়াল দেয়ার সাথে ভ্যাক্সিন প্রদান ব্লকিং অবস্থায় নেই সিরিয়াল ম্যান স্বাধীনভাবে সিরিয়াল নিচ্ছে ভ্যাক্সিন প্রদান টিমে যারা আছেন তারা একজন একজন করে ভ্যাক্সিন দিয়ে যাচ্ছেন এই কাজটাই হচ্ছে এসিনক্রোনাস প্রসেসিং।
এধরনের নন ব্লকিং এবং এসিনক্রোনাস কমিউনিকেশন কে বলা হয় ইভেন্ট-ড্রিভেন স্ট্রিমিং কমিউনিকেশন।
এখন আমরা দেখব ইমপারেটিভ পদ্ধতিতে কিভাবে ডেটাবেজ থেকে একটা ব্লকিং কল এর মাধ্যমে ডেটা পেতে পারি।
ডেটা সোর্স থেকে যখন একটা রিকোয়েস্ট এর বিপরীতে সব ডেটা আসা কমপ্লিট হয়ে যায় তখন onComplete() এর মাধ্যমে জানিয়ে দেয়া হয় অথবা যদি কোন এরর পায় সেটা হচ্ছে onError() ইভেন্ট এর মাধ্যমে জানিয়ে দেয়।
এখন আমরা দেখব ইভেন্ট-ড্রিভেন স্ট্রিমিং পদ্ধতিতে কিভাবে ডেটাবেজ থেকে একটা নন ব্লকিং কল এর মাধ্যমে ডেটা পেতে পারি
উপরের ছবিতে আমরা দেখতে পাচ্ছি ডেটাবেজ সার্ভার থেকে n সংখ্যক ডেটা নিয়ে আসার জন্য আমাদের অ্যাপ্লিকেশন সার্ভার থেকে একটা রিকোয়েস্ট ইভেন্ট ডেটাবেজ সার্ভারে পাবলিশ করে বের হয়ে চলে এসেছে। যখন ডেটাবেজ সার্ভারের ডেটা রেডি করা কমপ্লিট হবে তখন একটা একটা করে ডেটা রিটার্ন করা শুরু করবে তখন n সংখ্যক ডাটা onNext() মেথডের মাধ্যমে স্ট্রিমিং করে একটি একটি করে ডেটা রিটার্ন করবে যখন ডেটা রিটার্ন করা শেষ হয়ে যাবে তখন onComplete() মেথড এর মাধ্যমে জানিয়ে দিবে যেই পরিমাণে ডেটা নেয়ার জন্য রিকোয়েস্ট করেছিলে সবগুলো ডেটা পাঠানো শেষ।
এখন আমরা একটি এরর সিনারিও দেখব
উপরের ছবিতে আমরা দেখতে পাচ্ছি ডেটাবেজ থেকে n সংখ্যক ডেটা নিয়ে আসার জন্য আমাদের অ্যাপ্লিকেশন সার্ভার থেকে একটা রিকোয়েস্ট ইভেন্ট ডেটাবেজ সার্ভারে পাবলিশ করে বের হয়ে এসেছে। যখন ডেটাবেজ সার্ভারের ডেটা তৈরি করা কমপ্লিট হবে তখন একটি একটি করে ডেটা রিটার্ন করা শুরু করবে, n সংখ্যক ডেটা onNext() মেথডের মাধ্যমে স্ট্রিমিং করে রিটার্ন করবে যখন ডেটা রিটার্ন করা শেষ হয়ে যাবে তখন onComplete() মেথড এর মাধ্যমে জানিয়ে দিবে যেই পরিমাণে ডেটা নেয়ার জন্য রিকোয়েস্ট করেছিলে সবগুলো ডেটা পাঠানো শেষ।
ধরুন ডেটাবেজ সার্ভার ডেটা তৈরী করতে গেল, ডেটা তৈরি করতে পারল না বরং এরর রিটার্ন করল, যখন onNext() মেথডের মাধ্যমে স্ট্রিমিং করে ডেটা রিসিভ করতে যাব তখন এক্সেপশন পাবো, এই ক্ষেত্রে স্টিমের ভিতরে যেহেতু এরর পাওয়া গেছে তাই পরবর্তী রিকোয়েস্টগুলো প্রসেস করবে না বরং onError() মেথড এর মাধ্যমে জানিয়ে দিবে যে ভাই এরর পেয়েছি আমি এক্সেপশন থ্র করে দিলাম।
এমন একটা কেস তৈরি হইল যেখানে অ্যাপ্লিকেশন সার্ভার থেকে ডেটা শুধু ডেটাবেজে সেইভ হবে কিন্তু ডেটাবেজ থেকে কিছু রিটার্ন করবে না সে ক্ষেত্রে অ্যাপ্লিকেশন সার্ভার ডেটা সেইভ করার জন্য রিকোয়েস্ট নিয়ে গেল ডেটাবেজ সার্ভার ইমিডিয়েটলি রিকোয়েস্ট টা রিসিভ করে রিপ্লাইয়ে অ্যাপ্লিকেশন সার্ভার কে বলে দিবে ডকুমেন্টস প্রসেস শেষে আমি জানাব এবং আস্তে আস্তে ডেটাগুলোকে ইভেন্টের মাধ্যমে রিড করে সেইভ করতে শুরু করবে যখন ডাটা গুলো সেইভ করা শেষ হয়ে যাবে তখন onComplete() ইভেন্টের মাধ্যমে অ্যাপ্লিকেশনকে জানিয়ে দিবে যে ডেটা সেইভ করা শেষ এখন আমি একটু রেস্ট নেই।
রিএক্টিভ স্ট্রিমের একটা ফিচার হচ্ছে ব্যাক প্রেসার, দেখা গেল যে ডেটাবেজ থেকে প্রচুর পরিমানে ডেটা আসতেছে কিন্তু অ্যাপ্লিকেশন সার্ভারের এত হিউজ পরিমান ডেটা স্বল্প সময়ে প্রসেস করার চ্যালেঞ্জিং হয়ে যাচ্ছে এই ক্ষেত্রে অ্যাপ্লিকেশন সার্ভার ব্যাক প্রেসার এর মাধ্যমে তাকে জানিয়ে দিতে পারে যে আস্তে আস্তে ডাটাগুলো স্ট্রিমে পাঠাও যাতে আমার ডেটা প্রসেস করতে সমস্যা না হয়।
ধরুন ডেটা সোর্স এর কাছে একটি রিকোয়েস্টের এগেইনষ্টে n সংখ্যক ডেটা আছে, এখন অন নেক্সট মেথড n সংখ্যক বার কল করার মাধ্যমে স্ট্রিম আকারের একটি একটি করে ডেটা রিটার্ন করবে এবং সর্বশেষ অন কমপ্লিট মেথড কল করে ডাটা ট্রান্সফার সাকসেসফুল জানিয়ে দেয়।
এমন একটা ক্ষেত্র তৈরি হলো যে পাবলিশারের কাছে একটি রিকোয়েস্টের এগেইনষ্টে ২০ টি ডেটা আছে কিন্তু ১৫ তম ডেটা পাওয়ার পরে আর কোন ডেটা প্রসেস করার দরকার নেই সেক্ষেত্রে সাবস্ক্রিপশন ইন্টারফেসের cancel() মেথড কল করার মাধ্যমে আমরা সাবস্ক্রিপশন ক্যন্সেল করে দিতে পারি, তাহলে পাবলিশার নতুন কোন ইভেন্টে সাবস্ক্রাইবারের কাছে পাঠাবে না।প্রজেক্ট রিয়েক্টরের চারটি প্রধান ইন্টারফেস হচ্ছে
এই চারটি ইন্টারফেসই স্ট্রিম এর ফ্লো কন্ট্রোল করার জন্য নিজেদের মধ্যে নিজেরা যোগাযোগ রক্ষা করে।
Publisher
পাবলিশার হচ্ছে ডেটা সোর্স যেমন ডেটাবেজ, নেটওয়ার্ক অথবা অন্য কোন ফাইল। সাবস্ক্রাইবার ডেটা সোর্স থেকে ডেটা রিড করে।
package org.reactivestreams; public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
Subscriber
সাবস্ক্রাইবার ইন্টারফেসের চারটি মেথড হচ্ছে
package org.reactivestreams; public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
Subscription
package org.reactivestreams; public interface Subscription { public void request(long n); public void cancel(); }
Processor
প্রসেসর ইন্টারফেস পাবলিশার এবং সাবস্ক্রাইবার দুইটি ইন্টারফেসকে এক্সটেন্ড করে। প্রসেসরের নিজস্ব মেথড নেই। প্রসেসর নিয়ে অন্য পর্বে বিস্তারিত আলোচনা করব। চলুন একটা উদাহরণ দেখে আসি।
package com.abdullah.reactive; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; public class FluxTest { @Test public void pubSub() { Flux<String> flux = Flux.just("red", "white", "blue"); flux // .log() // .map(String::toUpperCase) .subscribe(new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(String t) { System.out.println(t); } @Override public void onError(Throwable t) { } @Override public void onComplete() { System.out.println("completed"); } }); } }
এখানে flux হচ্ছে রিএক্টিভ স্ট্রিমস পাবলিশার, যেখানে ডেটা সোর্স থেকে ডেটা আসে যতক্ষণ পর্যন্ত সাবস্ক্রাইব মেথড কল না হচ্ছে ততক্ষণ পর্যন্ত “Flux<String> flux = Flux.just(“red”, “white”, “blue”);” এই লাইন রান করবে না। এখানে ডেটা যেকোনো সোর্স থেকে আসতে পারে। flux কে সাবস্ক্রাইবার সাবস্ক্রাইব করতেছে, onNext(String t) মেথড কলের মাধ্যমে সোর্স থেকে একটি একটি ভ্যালু রিড করতেছে এবং যখন সবগুলি ভ্যালু রিড করা শেষ তখন onComplete() ইভেন্ট ট্রিগার হচ্ছে।
Volunteer at Java User Group Bangladesh (JUGBD)
cmabdullah21@gmail.com @cmabdullah21