2021SC@SDUSC
EventLoop is an internal class for asynchronous programming. In the asynchronous programming model, Blocking operations (such as I/O or long-running calculations) in the EventLoop thread must be avoided. Asynchronous versions of such operations should be used. EventLoop represents an infinite loop selector.select with only one blocking operation (), which selects a set of keys whose corresponding channels are ready for I/O operations. Using the keys and queues with tasks added externally to the EventLoop, it runs from the method run A thread in () started executing asynchronously and the method was overridden because EventLoop is an implementation of Runnable. When this EventLoop has no selected key and its queue containing tasks is empty, its work will end.
This article looks at how the connection between eventloops is implemented
Connect asynchronously to the given socket address.
public void connect(SocketAddress address, @NotNull Callback<SocketChannel> cb) { connect(address, 0, cb); } public void connect(SocketAddress address, @Nullable Duration timeout, @NotNull Callback<SocketChannel> cb) { connect(address, timeout == null ? 0L : timeout.toMillis(), cb); }
Asynchronously connects to the given socket address with the specified timeout value. A timeout of zero is interpreted as the default system timeout
Address is the address of the socketChannel;
Timeout is the timeout value to use, in milliseconds. 0 is the default system connection timeout.
public void connect(@NotNull SocketAddress address, long timeout, @NotNull Callback<SocketChannel> cb) { if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread"); SocketChannel channel; try { channel = SocketChannel.open(); } catch (IOException e) { try { cb.accept(null, e); } catch (Throwable e1) { handleError(eventloopFatalErrorHandler, e1, cb); } return; } try { channel.configureBlocking(false); channel.connect(address); if (timeout == 0) { channel.register(ensureSelector(), SelectionKey.OP_CONNECT, cb); } else { ScheduledRunnable scheduledTimeout = delay(timeout, () -> { closeChannel(channel, null); cb.accept(null, new AsyncTimeoutException("Connection timed out")); }); channel.register(ensureSelector(), SelectionKey.OP_CONNECT, (Callback<SocketChannel>) (result, e) -> { scheduledTimeout.cancel(); cb.accept(result, e); }); } if (selector != null) { selector.wakeup(); } } catch (IOException e) { closeChannel(channel, null); try { cb.accept(null, e); } catch (Throwable e1) { handleError(eventloopFatalErrorHandler, e1, cb); } } } public long tick() { if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread"); return (long) loop << 32 | tick; }
Publish the new task to the header of localTask} s. This method is recommended because the task will be executed as soon as possible without invalidating the CPU cache.
runnale represents a runnable task.
*/ public void post(@NotNull @Async.Schedule Runnable runnable) { if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread"); localTasks.addFirst(runnable); }
The new task will be published to the end of localTask s.
public void postLast(@NotNull @Async.Schedule Runnable runnable) { if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread"); localTasks.addLast(runnable); } public void postNext(@NotNull @Async.Schedule Runnable runnable) { if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread"); nextTasks.add(runnable); }
Publish a new task from another thread. This is the preferred way to communicate with eventloop from other threads.
@Override public void execute(@NotNull @Async.Schedule Runnable runnable) { concurrentTasks.offer(runnable); if (selector != null) { selector.wakeup(); } }
Schedule new tasks. Use this runnable object to return ScheduledRunnable.
@Override public @NotNull ScheduledRunnable schedule(long timestamp, @NotNull @Async.Schedule Runnable runnable) { if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread"); return addScheduledTask(timestamp, runnable, false); }
Schedule new background tasks. Use this runnable object to return ScheduledRunnable. If eventloop contains only background tasks, it will be closed
@Override public @NotNull ScheduledRunnable scheduleBackground(long timestamp, @NotNull @Async.Schedule Runnable runnable) { if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread"); return addScheduledTask(timestamp, runnable, true); } private @NotNull ScheduledRunnable addScheduledTask(long timestamp, Runnable runnable, boolean background) { ScheduledRunnable scheduledTask = ScheduledRunnable.create(timestamp, runnable); PriorityQueue<ScheduledRunnable> taskQueue = background ? backgroundTasks : scheduledTasks; taskQueue.offer(scheduledTask); return scheduledTask; }
Notifies the event loop about concurrent operations in other threads. Eventloop will not exit until all external tasks are completed.
public void startExternalTask() { externalTasksCount.incrementAndGet(); }
Notifies the event loop of the completion of the corresponding operation in other threads. Not calling this method will prevent the event loop from exiting.
public void completeExternalTask() { externalTasksCount.decrementAndGet(); } public long refreshTimestampAndGet() { refreshTimestamp(); return timestamp; } private void refreshTimestamp() { timestamp = timeProvider.currentTimeMillis(); }
Returns the current time of this event loop
@Override public long currentTimeMillis() { return timestamp; } @Override public @NotNull Eventloop getEventloop() { return this; }
Submit runnable to eventloop for execution. Runnable is executed in the eventloop thread
Calculation is the calculation to be performed;
Returns completabilefuture when the calculation is completed;
@Override public @NotNull CompletableFuture<Void> submit(@NotNull RunnableEx computation) { CompletableFuture<Void> future = new CompletableFuture<>(); execute(() -> { try { computation.run(); } catch (Exception ex) { handleError(eventloopFatalErrorHandler, ex, computation); future.completeExceptionally(ex); return; } future.complete(null); }); return future; } @Override public <T> @NotNull CompletableFuture<T> submit(AsyncComputation<? extends T> computation) { CompletableFuture<T> future = new CompletableFuture<>(); execute(() -> { try { computation.run((result, e) -> { if (e == null) { future.complete(result); } else { future.completeExceptionally(e); } }); } catch (Exception ex) { handleError(eventloopFatalErrorHandler, ex, computation); future.completeExceptionally(ex); } }); return future; }