3838import java .nio .channels .InterruptedByTimeoutException ;
3939import java .util .ArrayList ;
4040import java .util .List ;
41+ import java .util .Map ;
4142import java .util .concurrent .BrokenBarrierException ;
4243import java .util .concurrent .CountDownLatch ;
4344import java .util .concurrent .CyclicBarrier ;
5051import org .junit .After ;
5152import org .junit .Assert ;
5253import org .junit .Before ;
54+ import org .junit .Rule ;
5355import org .junit .Test ;
56+ import org .junit .rules .TestRule ;
57+ import org .junit .runners .model .Statement ;
5458
5559@ SuppressWarnings ("serial" )
5660public class ConcurrentExportTests {
@@ -61,29 +65,84 @@ private static Matcher<Throwable> throwsInterruptedByTimeout() {
6165 return Matchers .instanceOf (InterruptedByTimeoutException .class );
6266 }
6367
68+ // --- Debugging helpers ---
69+
70+ private static final long testStartTime = System .currentTimeMillis ();
71+
72+ private static String timestamp () {
73+ return String .format ("[t=%5dms]" , System .currentTimeMillis () - testStartTime );
74+ }
75+
76+ private static void log (String msg ) {
77+ System .err .println (timestamp () + " [" + Thread .currentThread ().getName () + "] " + msg );
78+ }
79+
80+ private static void dumpAllThreads () {
81+ Map <Thread , StackTraceElement []> traces = Thread .getAllStackTraces ();
82+ traces .forEach ((thread , stack ) -> {
83+ System .err .println (" Thread \" " + thread .getName () + "\" [" + thread .getState () + "]"
84+ + (thread .isInterrupted () ? " INTERRUPTED" : "" ));
85+ for (int i = 0 ; i < Math .min (stack .length , 8 ); i ++) {
86+ System .err .println (" at " + stack [i ]);
87+ }
88+ });
89+ }
90+
91+ /** Dumps all threads at 75% of TEST_TIMEOUT so the dump appears before JUnit kills the test. */
92+ @ Rule
93+ public final TestRule watchdog = (base , description ) -> new Statement () {
94+ @ Override
95+ public void evaluate () throws Throwable {
96+ Thread watchdogThread = new Thread (() -> {
97+ try {
98+ Thread .sleep (TEST_TIMEOUT * 3 / 4 );
99+ System .err .println (timestamp () + " *** WATCHDOG: \" " + description .getMethodName ()
100+ + "\" appears stuck — thread dump:" );
101+ dumpAllThreads ();
102+ } catch (InterruptedException e ) {
103+ // test finished in time
104+ }
105+ }, "watchdog" );
106+ watchdogThread .setDaemon (true );
107+ watchdogThread .start ();
108+ try {
109+ base .evaluate ();
110+ } finally {
111+ watchdogThread .interrupt ();
112+ }
113+ }
114+ };
115+
116+ // --- Inner writer with per-download ID and lifecycle logging ---
117+
64118 private class ConcurrentStreamResourceWriter
65119 extends ConfigurableConcurrentStreamResourceWriter {
66120
121+ private final String id ;
67122 private boolean interruptedByTimeout ;
68123 private boolean accepted ;
69124 private boolean finished ;
70125
71- public ConcurrentStreamResourceWriter (StreamResourceWriter delegate ) {
126+ public ConcurrentStreamResourceWriter (String id , StreamResourceWriter delegate ) {
72127 super (delegate );
128+ this .id = id ;
73129 }
74130
75131 @ Override
76132 protected void onTimeout () {
133+ log ("download-" + id + ": onTimeout() — semaphore not acquired within timeout" );
77134 interruptedByTimeout = true ;
78135 }
79136
80137 @ Override
81138 protected void onAccept () {
139+ log ("download-" + id + ": onAccept()" );
82140 accepted = true ;
83141 }
84142
85143 @ Override
86144 protected void onFinish () {
145+ log ("download-" + id + ": onFinish()" );
87146 finished = true ;
88147 }
89148
@@ -139,42 +198,55 @@ private interface MockDownload {
139198 boolean isAccepted ();
140199 }
141200
142- private Thread newThread (Runnable target ) {
143- Thread thread = new Thread (target );
201+ private Thread newThread (Runnable target , String name ) {
202+ Thread thread = new Thread (target , name );
144203 threads .add (thread );
145204 return thread ;
146205 }
147206
148- private MockDownload newDownload () {
207+ private MockDownload newDownload (String id ) {
149208
150209 CyclicBarrier barrier = this .barrier ;
151210 CountDownLatch latch = new CountDownLatch (1 );
152211
153212 ConcurrentStreamResourceWriter writer =
154- new ConcurrentStreamResourceWriter ((stream , session ) -> {
213+ new ConcurrentStreamResourceWriter (id , (stream , session ) -> {
214+ log ("download-" + id + ": delegate running — counting down latch" );
155215 latch .countDown ();
216+ log ("download-" + id + ": delegate entering barrier (waiting=" + barrier .getNumberWaiting () + ")" );
156217 await (barrier );
218+ log ("download-" + id + ": delegate exited barrier" );
157219 });
158220
159221 writer .setUi (new UI ());
160222 Exchanger <Throwable > exchanger = new Exchanger <>();
161223
162224 Thread thread = newThread (() -> {
163225
226+ log ("download-" + id + ": thread started (interrupted=" + Thread .currentThread ().isInterrupted () + ")" );
164227 Throwable throwable = null ;
165228 try {
166229 writer .accept (NULL_OUTPUT_STREAM , createSession ());
167230 } catch (Throwable t ) {
231+ log ("download-" + id + ": writer.accept() threw " + t .getClass ().getSimpleName ());
168232 throwable = t ;
169233 }
170234
171- latch .countDown ();
235+ if (latch .getCount () != 0 ) {
236+ log ("download-" + id + ": counting down latch (fallback) and entering exchanger" );
237+ latch .countDown ();
238+ } else {
239+ log ("download-" + id + ": entering exchanger" );
240+ }
241+
172242 try {
173243 exchanger .exchange (throwable );
244+ log ("download-" + id + ": exchanger completed" );
174245 } catch (InterruptedException e ) {
246+ log ("download-" + id + ": exchanger interrupted — thread will exit without exchanging!" );
175247 return ;
176248 }
177- });
249+ }, "download-" + id );
178250
179251 return new MockDownload () {
180252 @ Override
@@ -244,9 +316,12 @@ public boolean isFinished() {
244316 }
245317
246318 private static void await (CyclicBarrier barrier ) {
319+ log ("await(barrier): entering (waiting=" + barrier .getNumberWaiting () + ", parties=" + barrier .getParties () + ")" );
247320 try {
248321 barrier .await ();
322+ log ("await(barrier): returned" );
249323 } catch (Exception e ) {
324+ log ("await(barrier): threw " + e .getClass ().getSimpleName ());
250325 sneakyThrow (e );
251326 }
252327 }
@@ -309,8 +384,8 @@ public void testUnlimitedDownloads()
309384 ConcurrentStreamResourceWriter .setLimit (Float .POSITIVE_INFINITY );
310385 initializeCyclicBarrier (2 );
311386
312- var q1 = newDownload ().await ();
313- var q2 = newDownload ().await ();
387+ var q1 = newDownload ("q1" ).await ();
388+ var q2 = newDownload ("q2" ).await ();
314389
315390 assertThat (q1 .get (), nullValue ());
316391 assertThat (q2 .get (), nullValue ());
@@ -322,8 +397,8 @@ public void testConcurrentSuccess()
322397 ConcurrentStreamResourceWriter .setLimit (2 );
323398 initializeCyclicBarrier (2 );
324399
325- var q1 = newDownload ().await ();
326- var q2 = newDownload ().await ();
400+ var q1 = newDownload ("q1" ).await ();
401+ var q2 = newDownload ("q2" ).await ();
327402
328403 assertThat (q1 .get (), nullValue ());
329404 assertThat (q2 .get (), nullValue ());
@@ -335,8 +410,8 @@ public void testInterruptedByTimeout1()
335410 ConcurrentStreamResourceWriter .setLimit (1 );
336411 initializeCyclicBarrier (2 );
337412
338- var q1 = newDownload ().await ();
339- var q2 = newDownload ().start ();
413+ var q1 = newDownload ("q1" ).await ();
414+ var q2 = newDownload ("q2" ).start ();
340415 assertThat (barrier .getNumberWaiting (), equalTo (1 ));
341416 await (barrier );
342417
@@ -352,9 +427,9 @@ public void testInterruptedByTimeout2()
352427 ConcurrentStreamResourceWriter .setLimit (2 );
353428 initializeCyclicBarrier (3 );
354429
355- var q1 = newDownload ().await ();
356- var q2 = newDownload ().await ();
357- var q3 = newDownload ().withCost (2 ).start ();
430+ var q1 = newDownload ("q1" ).await ();
431+ var q2 = newDownload ("q2" ).await ();
432+ var q3 = newDownload ("q3" ).withCost (2 ).start ();
358433 assertThat (barrier .getNumberWaiting (), equalTo (2 ));
359434 await (barrier );
360435
@@ -369,9 +444,9 @@ public void testInterruptedByTimeout3()
369444 ConcurrentStreamResourceWriter .setLimit (2 );
370445 initializeCyclicBarrier (2 );
371446
372- var q1 = newDownload ().withCost (2 ).await ();
373- var q2 = newDownload ().start ();
374- var q3 = newDownload ().start ();
447+ var q1 = newDownload ("q1" ).withCost (2 ).await ();
448+ var q2 = newDownload ("q2" ).start ();
449+ var q3 = newDownload ("q3" ).start ();
375450 assertThat (barrier .getNumberWaiting (), equalTo (1 ));
376451 await (barrier );
377452
@@ -385,10 +460,10 @@ public void testInterruptedByTimeout3()
385460 public void testAcceptFinish () throws InterruptedException {
386461 ConcurrentStreamResourceWriter .setLimit (2 );
387462 initializeCyclicBarrier (2 );
388- var q1 = newDownload ().await ();
463+ var q1 = newDownload ("q1" ).await ();
389464 assertTrue ("Download has not been accepted" , q1 .isAccepted ());
390465 assertFalse ("Download has finished too early" , q1 .isFinished ());
391- var q2 = newDownload ().await ();
466+ var q2 = newDownload ("q2" ).await ();
392467 assertTrue ("Download has not been accepted" , q2 .isAccepted ());
393468 assertThat (q1 .get (), nullValue ());
394469 assertThat (q2 .get (), nullValue ());
@@ -403,12 +478,12 @@ public void testFailOnUiClose() throws InterruptedException, BrokenBarrierExcept
403478
404479 initializeCyclicBarrier (2 );
405480 CyclicBarrier b1 = barrier ;
406- var q1 = newDownload ().await ();
481+ var q1 = newDownload ("q1" ).await ();
407482 assertTrue ("Download has not been accepted" , q1 .isAccepted ());
408483 assertFalse ("Download has finished too early" , q1 .isFinished ());
409484
410485 initializeCyclicBarrier (2 );
411- var q2 = newDownload ().withTimeout (TEST_TIMEOUT ).start ();
486+ var q2 = newDownload ("q2" ).withTimeout (TEST_TIMEOUT ).start ();
412487 assertTrue ("Download has not been accepted" , q1 .isAccepted ());
413488 assertFalse ("Download has finished too early" , q1 .isFinished ());
414489
0 commit comments