ActiveJ framework learning -- Datastream of Async I/O

2021SC@SDUSC

This article continues with the code of Datastream. Before that, let's review what Datastream is.

Datastream is an extremely fast implementation of reactive streams. It is useful for intra and inter-server communication and asynchronous data processing.

Datastream is a very fast implementation of reactive flow. It is very useful for communication and asynchronous data processing within and between servers

  • A modern implementation of asynchronous reaction flow (different from flow in Java 8 and traditional thread based blocking flow).
  • It has the asynchrony of extremely effective back pressure control to deal with the natural imbalance of data source speed
  • Composable stream operations (mapper, restorer, filter, classifier, merge / splitter, compression, serialization).
  • Stream based network and file I/O over Eventloop module
  • Compatible with CSP module

Next, the AbstractStreamSupplier class. This class is an abstract class of StreamSupplier, which helps it deal with state transitions and helps to realize basic behavior.

The main methods are as follows:

	protected void onInit() {
	}

	public final void send(T item) {
		dataAcceptorBuffered.accept(item);
	}

	public final Promise<Void> sendEndOfStream() {
		if (CHECK) checkState(eventloop.inEventloopThread(), "Not in eventloop thread");
		if (endOfStreamRequest) return flushPromise;
		if (flushAsync > 0) {
			asyncEnd();
		}
		endOfStreamRequest = true;
		//noinspection unchecked
		this.dataAcceptorBuffered = (StreamDataAcceptor<T>) NO_ACCEPTOR;
		flush();
		return getFlushPromise();
	}

	public final @NotNull Promise<Void> getFlushPromise() {
		if (isEndOfStream()) {
			return endOfStream;
		} else if (flushPromise != null) {
			return flushPromise;
		} else if (dataAcceptor != null) {
			return Promise.complete();
		} else {
			flushPromise = new SettablePromise<>();
			return flushPromise;
		}
	}

	private void ensureInitialized() {
		if (!initialized) {
			initialized = true;
			onInit();
		}
	}

	protected void onResumed() {
	}

	protected void onSuspended() {
	}

	public final @Nullable StreamDataAcceptor<T> getDataAcceptor() {
		return dataAcceptor;
	}

	public final boolean isReady() {
		return dataAcceptor != null;
	}

	protected void onError(Exception e) {
	}

	protected void onCleanup() {
	}
  • onInit(): this method will be called only once: in the next eventloop after the vendor is created, or before onStart() or onError (Exception) is called
  • send(T item): send the given item through this supplier. If the vendor is suspended, this method stores the item in an internal buffer and cannot be called when the vendor reaches sendEndOfStream(). 
  • sendEndOfStream(): put the supplier in the closed state without error. This operation is final and cannot be undone. Only the first call will have any impact.
  • getFlushPromise(): returns the promise that will be completed when all data items are propagated to the actual data recipient
  • ensureInitialized(): initialize this vendor by calling onInit() only if it has not been initialized. 
  • onResumed(): called when the vendor changes from a suspended state to a normal state.
  • onSuspended(): called when the vendor changes the normal state to the suspended state.
  • getDataAcceptor(): when the supplier is in the suspended state, return the current data acceptor (the last data acceptor set using the updateDataAcceptor() method) or null
  • isReady(): returns true when the supplier is in normal status, and false when it is suspended or closed.
  • onError(Exception e): this method will be called when the vendor changes to the closed state by mistake.
  • onCleanup(): this method will be called asynchronously after the vendor is changed to off state, regardless of the error.

In addition, there is flush(). Causes this vendor to attempt to provide its buffer and update the current status accordingly.

	private void flush() {
		if (CHECK) checkState(eventloop.inEventloopThread(), "Not in eventloop thread");
		flushRequest = true;
		if (flushRunning || flushAsync > 0) return; // recursive call
		if (endOfStream.isComplete()) return;
		if (!isStarted()) return;

		flushRunning = true;
		while (flushRequest) {
			flushRequest = false;
			while (isReady() && !buffer.isEmpty()) {
				T item = buffer.pollFirst();
				this.dataAcceptor.accept(item);
			}
			if (isReady() && !isEndOfStream()) {
				onResumed();
			}
		}
		flushRunning = false;

		if (flushAsync > 0) return;
		if (!buffer.isEmpty()) return;
		if (endOfStream.isComplete()) return;

		if (!endOfStreamRequest) {
			if (this.flushPromise != null) {
				SettablePromise<Void> flushPromise = this.flushPromise;
				this.flushPromise = null;
				flushPromise.set(null);
			}
			return;
		}

		dataAcceptor = null;
		if (flushPromise != null) {
			flushPromise.set(null);
		}
		endOfStream.set(null);
	}

Keywords: Java

Added by kafmil on Mon, 27 Dec 2021 13:15:35 +0200