001/* 002 * This file is part of the Jikes RVM project (http://jikesrvm.org). 003 * 004 * This file is licensed to You under the Eclipse Public License (EPL); 005 * You may not use this file except in compliance with the License. You 006 * may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/eclipse-1.0.php 009 * 010 * See the COPYRIGHT.txt file distributed with this work for information 011 * regarding copyright ownership. 012 */ 013package org.mmtk.utility.deque; 014 015import static org.mmtk.utility.Constants.*; 016 017import org.mmtk.policy.RawPageSpace; 018import org.mmtk.policy.Space; 019import org.mmtk.utility.Log; 020import org.mmtk.vm.Lock; 021import org.mmtk.vm.VM; 022import org.vmmagic.pragma.Entrypoint; 023import org.vmmagic.pragma.Inline; 024import org.vmmagic.pragma.Uninterruptible; 025import org.vmmagic.unboxed.Address; 026import org.vmmagic.unboxed.Offset; 027 028/** 029 * This supports <i>unsynchronized</i> enqueuing and dequeuing of buffers 030 * for shared use. The data can be added to and removed from either end 031 * of the deque. 032 */ 033@Uninterruptible 034public class SharedDeque extends Deque { 035 private static final boolean DISABLE_WAITING = true; 036 private static final Offset NEXT_OFFSET = Offset.zero(); 037 private static final Offset PREV_OFFSET = Offset.fromIntSignExtend(BYTES_IN_ADDRESS); 038 039 private static final boolean TRACE = false; 040 private static final boolean TRACE_DETAIL = false; 041 private static final boolean TRACE_BLOCKERS = false; 042 043 /**************************************************************************** 044 * 045 * Public instance methods 046 */ 047 048 /** 049 * @param name the queue's human-readable name 050 * @param rps the space to get pages from 051 * @param arity the arity (number of words per entry) of this queue 052 */ 053 public SharedDeque(String name, RawPageSpace rps, int arity) { 054 this.rps = rps; 055 this.arity = arity; 056 this.name = name; 057 lock = VM.newLock("SharedDeque"); 058 clearCompletionFlag(); 059 head = HEAD_INITIAL_VALUE; 060 tail = TAIL_INITIAL_VALUE; 061 } 062 063 /** @return the arity (words per entry) of this queue */ 064 @Inline 065 final int getArity() { 066 return arity; 067 } 068 069 /** 070 * Enqueue a block on the head or tail of the shared queue 071 * 072 * @param buf the block to enqueue 073 * @param arity the arity of this queue 074 * @param toTail whether to enqueue to the tail of the shared queue 075 */ 076 final void enqueue(Address buf, int arity, boolean toTail) { 077 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity); 078 lock(); 079 if (toTail) { 080 // Add to the tail of the queue 081 setNext(buf, Address.zero()); 082 if (tail.EQ(TAIL_INITIAL_VALUE)) 083 head = buf; 084 else 085 setNext(tail, buf); 086 setPrev(buf, tail); 087 tail = buf; 088 } else { 089 // Add to the head of the queue 090 setPrev(buf, Address.zero()); 091 if (head.EQ(HEAD_INITIAL_VALUE)) 092 tail = buf; 093 else 094 setPrev(head, buf); 095 setNext(buf, head); 096 head = buf; 097 } 098 bufsenqueued++; 099 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(checkDequeLength(bufsenqueued)); 100 unlock(); 101 } 102 103 public final void clearDeque(int arity) { 104 Address buf = dequeue(arity); 105 while (!buf.isZero()) { 106 free(bufferStart(buf)); 107 buf = dequeue(arity); 108 } 109 setCompletionFlag(); 110 } 111 112 @Inline 113 final Address dequeue(int arity) { 114 return dequeue(arity, false); 115 } 116 117 final Address dequeue(int arity, boolean fromTail) { 118 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity); 119 return dequeue(false, fromTail); 120 } 121 122 @Inline 123 final Address dequeueAndWait(int arity) { 124 return dequeueAndWait(arity, false); 125 } 126 127 final Address dequeueAndWait(int arity, boolean fromTail) { 128 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity); 129 Address buf = dequeue(false, fromTail); 130 if (buf.isZero() && (!complete())) { 131 buf = dequeue(true, fromTail); // Wait inside dequeue 132 } 133 return buf; 134 } 135 136 /** 137 * Prepare for parallel processing. All active GC threads will 138 * participate, and pop operations will block until all work 139 * is complete. 140 */ 141 public final void prepare() { 142 if (DISABLE_WAITING) { 143 prepareNonBlocking(); 144 } else { 145 /* This should be the normal mode of operation once performance is fixed */ 146 prepare(VM.activePlan.collector().parallelWorkerCount()); 147 } 148 } 149 150 /** 151 * Prepare for processing where pop operations on the deques 152 * will never block. 153 */ 154 public final void prepareNonBlocking() { 155 prepare(1); 156 } 157 158 /** 159 * Prepare for parallel processing where a specific number 160 * of threads take part. 161 * 162 * @param consumers # threads taking part. 163 */ 164 private void prepare(int consumers) { 165 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(numConsumersWaiting == 0); 166 setNumConsumers(consumers); 167 clearCompletionFlag(); 168 } 169 170 public final void reset() { 171 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(numConsumersWaiting == 0); 172 clearCompletionFlag(); 173 setNumConsumersWaiting(0); 174 assertExhausted(); 175 } 176 177 public final void assertExhausted() { 178 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(head.isZero() && tail.isZero()); 179 } 180 181 @Inline 182 final Address alloc() { 183 Address rtn = rps.acquire(PAGES_PER_BUFFER); 184 if (rtn.isZero()) { 185 Space.printUsageMB(); 186 VM.assertions.fail("Failed to allocate space for queue. Is metadata virtual memory exhausted?"); 187 } 188 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(rtn.EQ(bufferStart(rtn))); 189 return rtn; 190 } 191 192 @Inline 193 final void free(Address buf) { 194 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(buf.EQ(bufferStart(buf)) && !buf.isZero()); 195 rps.release(buf); 196 } 197 198 @Inline 199 public final int enqueuedPages() { 200 return bufsenqueued * PAGES_PER_BUFFER; 201 } 202 203 /**************************************************************************** 204 * 205 * Private instance methods and fields 206 */ 207 208 /** The name of this shared deque - for diagnostics */ 209 private final String name; 210 211 /** Raw page space from which to allocate */ 212 private final RawPageSpace rps; 213 214 /** Number of words per entry */ 215 private final int arity; 216 217 /** Completion flag - set when all consumers have arrived at the barrier */ 218 @Entrypoint 219 private volatile int completionFlag; 220 221 /** # active threads - processing is complete when # waiting == this */ 222 @Entrypoint 223 private volatile int numConsumers; 224 225 /** # threads waiting */ 226 @Entrypoint 227 private volatile int numConsumersWaiting; 228 229 /** Head of the shared deque */ 230 @Entrypoint 231 protected volatile Address head; 232 233 /** Tail of the shared deque */ 234 @Entrypoint 235 protected volatile Address tail; 236 @Entrypoint 237 private volatile int bufsenqueued; 238 private final Lock lock; 239 240 private static final long WARN_PERIOD = (long)(2 * 1E9); 241 private static final long TIMEOUT_PERIOD = 10 * WARN_PERIOD; 242 243 /** 244 * Dequeue a block from the shared pool. If 'waiting' is true, and the 245 * queue is empty, wait for either a new block to show up or all the 246 * other consumers to join us. 247 * 248 * @param waiting whether to wait to dequeue a block if none is present 249 * @param fromTail whether to dequeue from the tail 250 * @return the Address of the block 251 */ 252 private Address dequeue(boolean waiting, boolean fromTail) { 253 lock(); 254 Address rtn = ((fromTail) ? tail : head); 255 if (rtn.isZero()) { 256 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(tail.isZero() && head.isZero()); 257 // no buffers available 258 if (waiting) { 259 int ordinal = TRACE ? 0 : VM.activePlan.collector().getId(); 260 setNumConsumersWaiting(numConsumersWaiting + 1); 261 while (rtn.isZero()) { 262 if (numConsumersWaiting == numConsumers) 263 setCompletionFlag(); 264 if (TRACE) { 265 Log.write("-- ("); Log.write(ordinal); 266 Log.write(") joining wait queue of SharedDeque("); 267 Log.write(name); Log.write(") "); 268 Log.write(numConsumersWaiting); Log.write("/"); 269 Log.write(numConsumers); 270 Log.write(" consumers waiting"); 271 if (complete()) Log.write(" WAIT COMPLETE"); 272 Log.writeln(); 273 if (TRACE_BLOCKERS) 274 VM.assertions.dumpStack(); 275 } 276 unlock(); 277 // Spin and wait 278 spinWait(fromTail); 279 280 if (complete()) { 281 if (TRACE) { 282 Log.write("-- ("); Log.write(ordinal); Log.writeln(") EXITING"); 283 } 284 lock(); 285 setNumConsumersWaiting(numConsumersWaiting - 1); 286 unlock(); 287 return Address.zero(); 288 } 289 lock(); 290 // Re-get the list head/tail while holding the lock 291 rtn = ((fromTail) ? tail : head); 292 } 293 setNumConsumersWaiting(numConsumersWaiting - 1); 294 if (TRACE) { 295 Log.write("-- ("); Log.write(ordinal); Log.write(") resuming work "); 296 Log.write(" n="); Log.writeln(numConsumersWaiting); 297 } 298 } else { 299 unlock(); 300 return Address.zero(); 301 } 302 } 303 if (fromTail) { 304 // dequeue the tail buffer 305 setTail(getPrev(tail)); 306 if (head.EQ(rtn)) { 307 setHead(Address.zero()); 308 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(tail.isZero()); 309 } else { 310 setNext(tail, Address.zero()); 311 } 312 } else { 313 // dequeue the head buffer 314 setHead(getNext(head)); 315 if (tail.EQ(rtn)) { 316 setTail(Address.zero()); 317 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(head.isZero()); 318 } else { 319 setPrev(head, Address.zero()); 320 } 321 } 322 bufsenqueued--; 323 unlock(); 324 return rtn; 325 } 326 327 /** 328 * Spinwait for GC work to arrive 329 * 330 * @param fromTail Check the head or the tail ? 331 */ 332 private void spinWait(boolean fromTail) { 333 long startNano = 0; 334 long lastElapsedNano = 0; 335 while (true) { 336 long startCycles = VM.statistics.cycles(); 337 long endCycles = startCycles + ((long) 1e9); // a few hundred milliseconds more or less. 338 long nowCycles; 339 do { 340 VM.memory.isync(); 341 Address rtn = ((fromTail) ? tail : head); 342 if (!rtn.isZero() || complete()) return; 343 nowCycles = VM.statistics.cycles(); 344 } while (startCycles < nowCycles && nowCycles < endCycles); /* check against both ends to guard against CPU migration */ 345 346 /* 347 * According to the cycle counter, we've been spinning for a while. 348 * Time to check nanoTime and see if we should print a warning and/or fail. 349 * We lock the deque while doing this to avoid interleaved messages from multiple threads. 350 */ 351 lock(); 352 if (startNano == 0) { 353 startNano = VM.statistics.nanoTime(); 354 } else { 355 long nowNano = VM.statistics.nanoTime(); 356 long elapsedNano = nowNano - startNano; 357 if (elapsedNano - lastElapsedNano > WARN_PERIOD) { 358 Log.write("GC Warning: SharedDeque("); Log.write(name); 359 Log.write(") wait has reached "); Log.write(VM.statistics.nanosToSecs(elapsedNano)); 360 Log.write(", "); Log.write(numConsumersWaiting); Log.write("/"); 361 Log.write(numConsumers); Log.writeln(" threads waiting"); 362 lastElapsedNano = elapsedNano; 363 } 364 if (elapsedNano > TIMEOUT_PERIOD) { 365 unlock(); // To allow other GC threads to die in turn 366 VM.assertions.fail("GC Error: SharedDeque Timeout"); 367 } 368 } 369 unlock(); 370 } 371 } 372 373 /** 374 * Set the "next" pointer in a buffer forming the linked buffer chain. 375 * 376 * @param buf The buffer whose next field is to be set. 377 * @param next The reference to which next should point. 378 */ 379 private static void setNext(Address buf, Address next) { 380 buf.store(next, NEXT_OFFSET); 381 } 382 383 /** 384 * Get the "next" pointer in a buffer forming the linked buffer chain. 385 * 386 * @param buf The buffer whose next field is to be returned. 387 * @return The next field for this buffer. 388 */ 389 protected final Address getNext(Address buf) { 390 return buf.loadAddress(NEXT_OFFSET); 391 } 392 393 /** 394 * Set the "prev" pointer in a buffer forming the linked buffer chain. 395 * 396 * @param buf The buffer whose next field is to be set. 397 * @param prev The reference to which prev should point. 398 */ 399 private void setPrev(Address buf, Address prev) { 400 buf.store(prev, PREV_OFFSET); 401 } 402 403 /** 404 * Get the "next" pointer in a buffer forming the linked buffer chain. 405 * 406 * @param buf The buffer whose next field is to be returned. 407 * @return The next field for this buffer. 408 */ 409 protected final Address getPrev(Address buf) { 410 return buf.loadAddress(PREV_OFFSET); 411 } 412 413 /** 414 * Check the number of buffers in the work queue (for debugging 415 * purposes). 416 * 417 * @param length The number of buffers believed to be in the queue. 418 * @return True if the length of the queue matches length. 419 */ 420 private boolean checkDequeLength(int length) { 421 Address top = head; 422 int l = 0; 423 while (!top.isZero() && l <= length) { 424 top = getNext(top); 425 l++; 426 } 427 return l == length; 428 } 429 430 /** 431 * Lock this shared queue. We use one simple low-level lock to 432 * synchronize access to the shared queue of buffers. 433 */ 434 private void lock() { 435 lock.acquire(); 436 } 437 438 /** 439 * Release the lock. We use one simple low-level lock to synchronize 440 * access to the shared queue of buffers. 441 */ 442 private void unlock() { 443 lock.release(); 444 } 445 446 /** 447 * @return whether the current round of processing is complete 448 */ 449 private boolean complete() { 450 return completionFlag == 1; 451 } 452 453 /** 454 * Set the completion flag. 455 */ 456 @Inline 457 private void setCompletionFlag() { 458 if (TRACE_DETAIL) { 459 Log.writeln("# setCompletionFlag: "); 460 } 461 completionFlag = 1; 462 } 463 464 /** 465 * Clear the completion flag. 466 */ 467 @Inline 468 private void clearCompletionFlag() { 469 if (TRACE_DETAIL) { 470 Log.writeln("# clearCompletionFlag: "); 471 } 472 completionFlag = 0; 473 } 474 475 @Inline 476 private void setNumConsumers(int newNumConsumers) { 477 if (TRACE_DETAIL) { 478 Log.write("# Num consumers "); Log.writeln(newNumConsumers); 479 } 480 numConsumers = newNumConsumers; 481 } 482 483 @Inline 484 private void setNumConsumersWaiting(int newNCW) { 485 if (TRACE_DETAIL) { 486 Log.write("# Num consumers waiting "); Log.writeln(newNCW); 487 } 488 numConsumersWaiting = newNCW; 489 } 490 491 @Inline 492 private void setHead(Address newHead) { 493 head = newHead; 494 VM.memory.sync(); 495 } 496 497 @Inline 498 private void setTail(Address newTail) { 499 tail = newTail; 500 VM.memory.sync(); 501 } 502}