001/* 002 * This file is part of the Jikes RVM project (http://jikesrvm.org). 003 * 004 * This file is licensed to You under the Eclipse Public License (EPL); 005 * You may not use this file except in compliance with the License. You 006 * may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/eclipse-1.0.php 009 * 010 * See the COPYRIGHT.txt file distributed with this work for information 011 * regarding copyright ownership. 012 */ 013package org.mmtk.utility.deque; 014 015import static org.mmtk.utility.Constants.*; 016 017import org.mmtk.vm.VM; 018 019import org.vmmagic.pragma.*; 020import org.vmmagic.unboxed.*; 021 022/** 023 * This class implements a local (<i>unsynchronized</i>) queue. 024 * A queue is strictly FIFO.<p> 025 * 026 * Each instance stores word-sized values into a local buffer. When 027 * the buffer is full, or if the <code>flushLocal()</code> method is 028 * called, the buffer enqueued at the tail of a 029 * <code>SharedDeque</code>. 030 * 031 * The implementation is intended to be as efficient as possible, in 032 * time and space, and is the basis for the <code>TraceBuffer</code> used by 033 * heap trace generation. Each instance adds a single field to those inherited 034 * from the SSB: a bump pointer. 035 * 036 * Preconditions: Buffers are always aligned on buffer-size address 037 * boundaries.<p> 038 * 039 * Invariants: Buffers are filled such that tuples (of the specified 040 * arity) are packed to the low end of the buffer. Thus buffer 041 * underflows will always arise when then cursor is buffer-size aligned. 042 */ 043@Uninterruptible class LocalQueue extends LocalSSB { 044 045 /** 046 * Constructor 047 * 048 * @param queue The shared queue to which this local ssb will append 049 * its buffers (when full or flushed). 050 */ 051 LocalQueue(SharedDeque queue) { 052 super(queue); 053 } 054 055 /**************************************************************************** 056 * 057 * Protected instance methods and fields 058 */ 059 060 /** the start of the buffer */ 061 @Entrypoint 062 protected Address head; 063 064 @Override 065 public void resetLocal() { 066 super.resetLocal(); 067 head = Deque.HEAD_INITIAL_VALUE; 068 } 069 070 /** 071 * Check whether there are values in the buffer for a pending dequeue. 072 * If there is not data, grab the first buffer on the shared queue 073 * (after freeing the buffer). 074 * 075 * @param arity The arity of the values stored in this queue: the 076 * buffer must contain enough space for this many words. 077 * @return whether there are values available for a dequeue 078 */ 079 @Inline 080 protected final boolean checkDequeue(int arity) { 081 if (bufferOffset(head).isZero()) { 082 return dequeueUnderflow(arity); 083 } else { 084 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(bufferOffset(head).sGE(Word.fromIntZeroExtend(arity).lsh(LOG_BYTES_IN_ADDRESS).toOffset())); 085 return true; 086 } 087 } 088 089 /** 090 * Dequeue a value from the buffer. This is <i>unchecked</i>. The 091 * caller must first call <code>checkDequeue()</code> to ensure the 092 * buffer has and entry to be removed. 093 * 094 * @return The first entry on the queue. 095 */ 096 @Inline 097 protected final Address uncheckedDequeue() { 098 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(bufferOffset(head).sGE(Offset.fromIntZeroExtend(BYTES_IN_ADDRESS))); 099 head = head.minus(BYTES_IN_ADDRESS); 100 return head.loadAddress(); 101 } 102 103 /** 104 * The head is empty (or null), and the shared queue has no buffers 105 * available. If the tail has sufficient entries, consume the tail. 106 * Otherwise try wait on the global queue until either all other 107 * clients of the queue reach exhaustion or a buffer becomes 108 * available. 109 * 110 * @param arity The arity of this buffer 111 * @return True if the consumer has eaten all the entries 112 */ 113 protected final boolean headStarved(int arity) { 114 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == queue.getArity()); 115 116 // If the tail has entries... 117 if (tail.NE(tailBufferEnd)) { 118 head = normalizeTail(arity).plus(BYTES_IN_ADDRESS); 119 tail = Deque.TAIL_INITIAL_VALUE; 120 tailBufferEnd = Deque.TAIL_INITIAL_VALUE; 121 // Return that we acquired more entries 122 return false; 123 } 124 // Wait for another entry to materialize... 125 head = queue.dequeueAndWait(arity); 126 // return true if a) there is a head buffer, and b) it is non-empty 127 return (head.EQ(Deque.HEAD_INITIAL_VALUE) || bufferOffset(head).isZero()); 128 } 129 130 /**************************************************************************** 131 * 132 * Private instance methods 133 */ 134 135 /** 136 * There are not sufficient entries in the head buffer for a pending 137 * dequeue. Acquire a new head buffer. If the shared queue has no 138 * buffers available, consume the tail if necessary. Return false 139 * if entries cannot be acquired. 140 * 141 * @param arity The arity of this buffer (used for sanity test only). 142 * @return True if there the head buffer has been successfully 143 * replenished. 144 */ 145 @NoInline 146 private boolean dequeueUnderflow(int arity) { 147 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == queue.getArity()); 148 do { 149 if (head.NE(Deque.HEAD_INITIAL_VALUE)) 150 queue.free(head); 151 head = queue.dequeue(arity); 152 } while (head.NE(Deque.HEAD_INITIAL_VALUE) && bufferOffset(head).isZero()); 153 154 if (head.EQ(Deque.HEAD_INITIAL_VALUE)) 155 return !headStarved(arity); 156 157 return true; 158 } 159}