1
2
3
4
5
6
7 package ca.uhn.cache.util;
8
9 import org.apache.commons.logging.Log;
10 import org.apache.commons.logging.LogFactory;
11
12 import EDU.oswego.cs.dl.util.concurrent.Executor;
13 import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
14
15 import ca.uhn.cache.IQuery;
16 import ca.uhn.cache.IQueryResult;
17 import ca.uhn.cache.ISemanticCache;
18 import ca.uhn.cache.exception.CacheException;
19
20 /***
21 * TODO: TESTS
22 *
23 * Ecapsulates the common caching algorithm of querying a cache, getting a
24 * remainder, querying the original data source, and combining the results.
25 *
26 * Objects of this class hold the state of a single query instance.
27 *
28 * It is left to the caller to provide results from the original data source.
29 *
30 * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
31 * @version $Revision: 1.1 $ updated on $Date: 2005/01/24 22:51:49 $ by $Author: bryan_tripp $
32 */
33 public class QueryProcessor {
34
35 private static final Log ourLog = LogFactory.getLog(QueryProcessor.class);
36
37 private final IQuery myQuery;
38 private final ISemanticCache myCache;
39 private IQuery[] myRemainderQueries;
40 private IQueryResult myCombinedResult;
41 private CacheException myException;
42 private Executor myExecutor;
43
44 /***
45 * Constructor for STATIONARY DATA ONLY. See IUnstationaryDataSource for discussion.
46 *
47 * @param theQuery the query to process
48 * @param theCache the cache against which to process it
49 * @param theMaxGroups maximum number of remainder queries
50 * @param theExecutor handler of concurrent tasks (defaults to ThreadedExecutor if null)
51 *
52 * @throws CacheException if there is a problem finding the query remainder
53 */
54 public QueryProcessor(final IQuery theQuery, final ISemanticCache theCache,
55 int theMaxGroups, Executor theExecutor) throws CacheException {
56 myQuery = theQuery;
57 myCache = theCache;
58 myExecutor = theExecutor;
59
60 if (myExecutor == null) {
61 myExecutor = new ThreadedExecutor();
62 }
63
64 myRemainderQueries = theCache.remainder(theQuery, theMaxGroups);
65 init();
66 }
67
68 /***
69 * Performs startup tasks (called by constructor). The default is to start
70 * running the cache query. Subclasses can over-ride.
71 */
72 protected void init() {
73 final QueryProcessor processor = this;
74
75 Runnable getter = new Runnable() {
76 public void run() {
77 try {
78 processor.setCombinedResult(myCache.get(myQuery));
79 } catch (CacheException e) {
80 processor.declareException("Problem getting data from cache", e);
81 }
82 }
83 };
84
85 thread(getter);
86 }
87
88 /***
89 * @return queries defining those parts of the query associated with this processor
90 * that are not contained in the cache
91 */
92 public IQuery[] getRemainderQueries() {
93 return myRemainderQueries;
94 }
95
96 /***
97 * Provide an initial value for the combined result.
98 * @param theResult initial value
99 */
100 protected void setCombinedResult(IQueryResult theResult) {
101 myCombinedResult = theResult;
102 }
103
104 /***
105 * @param theMessage description
106 * @param theException an exception that has been encountered that may render query
107 * results incorrect (for example exception while querying cache or source data)
108 */
109 public void declareException(String theMessage, Exception theException) {
110 myException = new CacheException(theMessage, theException) {};
111 }
112
113 /***
114 * @param theQuery scope of the results (from a remainder query)
115 * @param theSourceResult corresponding results
116 */
117 public void setSourceResult(final IQuery theQuery, final IQueryResult theSourceResult) {
118
119 final QueryProcessor processor = this;
120 Runnable combiner = new Runnable() {
121 public void run() {
122 try {
123 synchronized (processor) {
124 IQueryResult current = processor.getCombinedResult();
125 IQueryResult combined = current.append(theSourceResult);
126 processor.setCombinedResult(combined);
127 }
128 } catch (CacheException e) {
129
130 ourLog.error("Can't get cache result with which to combine source data", e);
131 }
132 }
133 };
134 thread(combiner);
135
136 final ISemanticCache cache = myCache;
137 Runnable filler = new Runnable() {
138 public void run() {
139 try {
140 cache.put(theQuery, theSourceResult);
141 } catch (CacheException e) {
142 ourLog.error("Can't populate cache with queried data", e);
143 }
144 }
145 };
146 thread(filler);
147 }
148
149 /***
150 * Blocks until a result is available or an exception is encountered.
151 *
152 * @return results including both cached data and data from the source (as they are added)
153 * @throws CacheException if a problem has been encountered getting data from the cache
154 */
155 public IQueryResult getCombinedResult() throws CacheException {
156 while (myCombinedResult == null && myException == null) {
157 try {
158 Thread.sleep(1);
159 } catch (InterruptedException e) {
160
161 }
162 }
163
164 if (myException != null) {
165 throw myException;
166 }
167
168 return myCombinedResult;
169 }
170
171 /***
172 * Runs a task in its own thread.
173 *
174 * @param theTask the task to run
175 */
176 protected void thread(Runnable theTask) {
177 try {
178 myExecutor.execute(theTask);
179 } catch (InterruptedException e) {
180 ourLog.error("Task interrupted", e);
181 }
182 }
183
184 }