From f064d1f0c3e5e03599b17d8e8969efca8cb8fcc7 Mon Sep 17 00:00:00 2001 From: Gene Gleyzer Date: Sat, 21 Dec 2024 10:48:18 -0500 Subject: [PATCH] Optimize FiberQueue for large number of fibers --- .../src/main/java/org/xvm/runtime/Fiber.java | 29 ++++++++++- .../main/java/org/xvm/runtime/FiberQueue.java | 51 +++++++++++++++---- .../java/org/xvm/runtime/ServiceContext.java | 2 +- 3 files changed, 70 insertions(+), 12 deletions(-) diff --git a/javatools/src/main/java/org/xvm/runtime/Fiber.java b/javatools/src/main/java/org/xvm/runtime/Fiber.java index d66335a7d8..34cea7b2bd 100644 --- a/javatools/src/main/java/org/xvm/runtime/Fiber.java +++ b/javatools/src/main/java/org/xvm/runtime/Fiber.java @@ -208,6 +208,10 @@ public ObjectHandle getAsyncSection() */ public void setStatus(FiberStatus status, long cOps) { + if (m_status == FiberStatus.Waiting) + { + f_context.f_queueSuspended.exitWait(); + } switch (m_status = status) { default: @@ -220,6 +224,8 @@ public void setStatus(FiberStatus status, long cOps) break; case Waiting: + f_context.f_queueSuspended.enterWait(); + // fall through case Paused: long cNanos = f_context.f_container.nanoTime() - m_nanoStarted; m_nanoStarted = 0; @@ -541,10 +547,26 @@ public Frame getBlocker() /** * Set or clear the frame that blocks this fiber's execution. + * + * @param frameBlocker null if no blocker exists or the blocker fiber itself + * @param lStamp if no blocker exists, the "wait exit stamp", otherwise the "wait enter + * stamp" of the fiber queue when this check occurred */ - protected void setBlocker(Frame frameBlocker) + protected void setBlocker(Frame frameBlocker, long lStamp) { m_frameBlocker = frameBlocker; + m_lLastStamp = lStamp; + } + + /** + * @return true iff there were no wait status changes on the FiberQueue that would change + * the blocker computation result for the fiber + */ + protected boolean noChange(long lLastEnterStamp, long lLastExitStamp) + { + return m_frameBlocker == null + ? lLastEnterStamp == m_lLastStamp + : lLastExitStamp == m_lLastStamp; } @@ -785,6 +807,11 @@ public int proceed(Frame frameCaller) */ private Frame m_frameBlocker; + /** + * THe FiberQueue stamp of the last time it looked for blockers for this fiber. + */ + private long m_lLastStamp; + /** * The counter used to create fibers ids. */ diff --git a/javatools/src/main/java/org/xvm/runtime/FiberQueue.java b/javatools/src/main/java/org/xvm/runtime/FiberQueue.java index 2d630b040d..284e242583 100644 --- a/javatools/src/main/java/org/xvm/runtime/FiberQueue.java +++ b/javatools/src/main/java/org/xvm/runtime/FiberQueue.java @@ -19,6 +19,12 @@ public class FiberQueue private int m_ixTail = 0; // past the tail - insertion point private int m_cSize = 0; + // this value increases every time any fiber that belong to this queue enters the "Waiting" state + private long m_lWaitEnterStamp = 0; + + // this value increases every time any fiber that belong to this queue exits the "Waiting" state + private long m_lWaitExitStamp = 0; + public FiberQueue(ServiceContext ctx) { f_context = ctx; @@ -129,6 +135,22 @@ public Frame getAny() return null; } + /** + * Called when any fiber changes its state to {@link FiberStatus#Waiting} + */ + protected void enterWait() + { + m_lWaitEnterStamp++; + } + + /** + * Called when any fiber changes its state from {@link FiberStatus#Waiting} to any other. + */ + protected void exitWait() + { + m_lWaitExitStamp++; + } + /** * Report on the status of the fiber queue. Temporary: for debugging only. */ @@ -254,8 +276,19 @@ private boolean canResume(Fiber fiber) * * @return true iff there are any non-concurrent waiting frames */ +static int HIT = 0; +static int ALL = 0; private boolean isAnyNonConcurrentWaiting(Fiber fiberCandidate) { +//if (++ALL % 100_000 == 0) +// { +// System.err.println("*** hits=" + HIT + " out of " + ALL); +// } + if (fiberCandidate.noChange(m_lWaitEnterStamp, m_lWaitExitStamp)) + { +//++HIT; + return fiberCandidate.getBlocker() != null; + } Frame[] aFrame = m_aFrame; Fiber fiberCaller = fiberCandidate.getCaller(); @@ -267,21 +300,19 @@ private boolean isAnyNonConcurrentWaiting(Fiber fiberCandidate) } Fiber fiber = frame.f_fiber; - if (fiber != fiberCandidate) + if (fiber != fiberCandidate && + fiber.getStatus() == FiberStatus.Waiting) { - if (fiber.getStatus() == FiberStatus.Waiting) + if (frame.isSafeStack() || + fiberCaller != null && fiberCaller.isContinuationOf(fiber)) { - if (frame.isSafeStack() || - fiberCaller != null && fiberCaller.isContinuationOf(fiber)) - { - continue; - } - fiberCandidate.setBlocker(frame); - return true; + continue; } + fiberCandidate.setBlocker(frame, m_lWaitExitStamp); + return true; } } - fiberCandidate.setBlocker(null); + fiberCandidate.setBlocker(null, m_lWaitEnterStamp); return false; } diff --git a/javatools/src/main/java/org/xvm/runtime/ServiceContext.java b/javatools/src/main/java/org/xvm/runtime/ServiceContext.java index 4e5e832699..dc257f5902 100644 --- a/javatools/src/main/java/org/xvm/runtime/ServiceContext.java +++ b/javatools/src/main/java/org/xvm/runtime/ServiceContext.java @@ -2405,7 +2405,7 @@ public interface TypeSupplier /** * The queue of suspended fibers. */ - private final FiberQueue f_queueSuspended = new FiberQueue(this); + protected final FiberQueue f_queueSuspended = new FiberQueue(this); /** * The reentrancy policy. Must be the same names as in natural Service.Synchronicity.