1 package org.apache.turbine.services.schedule;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.util.Iterator;
23 import java.util.List;
24
25 import javax.servlet.ServletConfig;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.torque.TorqueException;
30 import org.apache.torque.util.Criteria;
31 import org.apache.turbine.services.InitializationException;
32 import org.apache.turbine.services.TurbineBaseService;
33 import org.apache.turbine.util.TurbineException;
34
35 /**
36 * Service for a cron like scheduler.
37 *
38 * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
39 * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
40 * @version $Id: TurbineSchedulerService.java 534527 2007-05-02 16:10:59Z tv $
41 */
42 public class TurbineSchedulerService
43 extends TurbineBaseService
44 implements ScheduleService
45 {
46 /** Logging */
47 protected static Log log = LogFactory.getLog(ScheduleService.LOGGER_NAME);
48
49 /** The queue */
50 protected JobQueue scheduleQueue = null;
51
52 /** Current status of the scheduler */
53 protected boolean enabled = false;
54
55 /** The main loop for starting jobs. */
56 protected MainLoop mainLoop;
57
58 /** The thread used to process commands. */
59 protected Thread thread;
60
61 /**
62 * Creates a new instance.
63 */
64 public TurbineSchedulerService()
65 {
66 mainLoop = null;
67 thread = null;
68 }
69
70 /**
71 * Initializes the SchedulerService.
72 *
73 * @throws InitializationException Something went wrong in the init
74 * stage
75 */
76 @Override
77 public void init()
78 throws InitializationException
79 {
80 try
81 {
82 setEnabled(getConfiguration().getBoolean("enabled", true));
83 scheduleQueue = new JobQueue();
84 mainLoop = new MainLoop();
85
86 // Load all from cold storage.
87 List<JobEntry> jobs = JobEntryPeer.doSelect(new Criteria());
88
89 if (jobs != null && jobs.size() > 0)
90 {
91 Iterator<JobEntry> it = jobs.iterator();
92 while (it.hasNext())
93 {
94 it.next().calcRunTime();
95 }
96 scheduleQueue.batchLoad(jobs);
97
98 restart();
99 }
100
101 setInit(true);
102 }
103 catch (Exception e)
104 {
105 String errorMessage = "Could not initialize the scheduler service";
106 log.error(errorMessage, e);
107 throw new InitializationException(errorMessage, e);
108 }
109 }
110
111 /**
112 * Called the first time the Service is used.<br>
113 *
114 * Load all the jobs from cold storage. Add jobs to the queue
115 * (sorted in ascending order by runtime) and start the scheduler
116 * thread.
117 *
118 * @param config A ServletConfig.
119 * @deprecated use init() instead.
120 */
121 @Deprecated
122 public void init(ServletConfig config) throws InitializationException
123 {
124 init();
125 }
126
127 /**
128 * Shutdowns the service.
129 *
130 * This methods interrupts the housekeeping thread.
131 */
132 @Override
133 public void shutdown()
134 {
135 if (getThread() != null)
136 {
137 getThread().interrupt();
138 }
139 }
140
141 /**
142 * Get a specific Job from Storage.
143 *
144 * @param oid The int id for the job.
145 * @return A JobEntry.
146 * @exception TurbineException job could not be retreived.
147 */
148 public JobEntry getJob(int oid)
149 throws TurbineException
150 {
151 try
152 {
153 JobEntry je = JobEntryPeer.retrieveByPK(oid);
154 return scheduleQueue.getJob(je);
155 }
156 catch (TorqueException e)
157 {
158 String errorMessage = "Error retrieving job from persistent storage.";
159 log.error(errorMessage, e);
160 throw new TurbineException(errorMessage, e);
161 }
162 }
163
164 /**
165 * Add a new job to the queue.
166 *
167 * @param je A JobEntry with the job to add.
168 * @throws TurbineException job could not be added
169 */
170 public void addJob(JobEntry je)
171 throws TurbineException
172 {
173 updateJob(je);
174 }
175
176 /**
177 * Remove a job from the queue.
178 *
179 * @param je A JobEntry with the job to remove.
180 * @exception TurbineException job could not be removed
181 */
182 public void removeJob(JobEntry je)
183 throws TurbineException
184 {
185 try
186 {
187 // First remove from DB.
188 Criteria c = new Criteria().add(JobEntryPeer.JOB_ID, je.getPrimaryKey());
189 JobEntryPeer.doDelete(c);
190
191 // Remove from the queue.
192 scheduleQueue.remove(je);
193
194 // restart the scheduler
195 restart();
196 }
197 catch (Exception e)
198 {
199 String errorMessage = "Problem removing Scheduled Job: " + je.getTask();
200 log.error(errorMessage, e);
201 throw new TurbineException(errorMessage, e);
202 }
203 }
204
205 /**
206 * Add or update a job.
207 *
208 * @param je A JobEntry with the job to modify
209 * @throws TurbineException job could not be updated
210 */
211 public void updateJob(JobEntry je)
212 throws TurbineException
213 {
214 try
215 {
216 je.calcRunTime();
217
218 // Update the queue.
219 if (je.isNew())
220 {
221 scheduleQueue.add(je);
222 }
223 else
224 {
225 scheduleQueue.modify(je);
226 }
227
228 je.save();
229
230 restart();
231 }
232 catch (Exception e)
233 {
234 String errorMessage = "Problem updating Scheduled Job: " + je.getTask();
235 log.error(errorMessage, e);
236 throw new TurbineException(errorMessage, e);
237 }
238 }
239
240 /**
241 * List jobs in the queue. This is used by the scheduler UI.
242 *
243 * @return A List of jobs.
244 */
245 public List<JobEntry> listJobs()
246 {
247 return scheduleQueue.list();
248 }
249
250 /**
251 * Sets the enabled status of the scheduler
252 *
253 * @param enabled
254 *
255 */
256 protected void setEnabled(boolean enabled)
257 {
258 this.enabled = enabled;
259 }
260
261 /**
262 * Determines if the scheduler service is currently enabled.
263 *
264 * @return Status of the scheduler service.
265 */
266 public boolean isEnabled()
267 {
268 return enabled;
269 }
270
271 /**
272 * Starts or restarts the scheduler if not already running.
273 */
274 public synchronized void startScheduler()
275 {
276 setEnabled(true);
277 restart();
278 }
279
280 /**
281 * Stops the scheduler if it is currently running.
282 */
283 public synchronized void stopScheduler()
284 {
285 log.info("Stopping job scheduler");
286 Thread thread = getThread();
287 if (thread != null)
288 {
289 thread.interrupt();
290 }
291 enabled = false;
292 }
293
294 /**
295 * Return the thread being used to process commands, or null if
296 * there is no such thread. You can use this to invoke any
297 * special methods on the thread, for example, to interrupt it.
298 *
299 * @return A Thread.
300 */
301 public synchronized Thread getThread()
302 {
303 return thread;
304 }
305
306 /**
307 * Set thread to null to indicate termination.
308 */
309 protected synchronized void clearThread()
310 {
311 thread = null;
312 }
313
314 /**
315 * Start (or restart) a thread to process commands, or wake up an
316 * existing thread if one is already running. This method can be
317 * invoked if the background thread crashed due to an
318 * unrecoverable exception in an executed command.
319 */
320 public synchronized void restart()
321 {
322 if (enabled)
323 {
324 log.info("Starting job scheduler");
325 if (thread == null)
326 {
327 // Create the the housekeeping thread of the scheduler. It will wait
328 // for the time when the next task needs to be started, and then
329 // launch a worker thread to execute the task.
330 thread = new Thread(mainLoop, ScheduleService.SERVICE_NAME);
331 // Indicate that this is a system thread. JVM will quit only when there
332 // are no more enabled user threads. Settings threads spawned internally
333 // by Turbine as daemons allows commandline applications using Turbine
334 // to terminate in an orderly manner.
335 thread.setDaemon(true);
336 thread.start();
337 }
338 else
339 {
340 notify();
341 }
342 }
343 }
344
345 /**
346 * Return the next Job to execute, or null if thread is
347 * interrupted.
348 *
349 * @return A JobEntry.
350 * @exception TurbineException a generic exception.
351 */
352 protected synchronized JobEntry nextJob()
353 throws TurbineException
354 {
355 try
356 {
357 while (!Thread.interrupted())
358 {
359 // Grab the next job off the queue.
360 JobEntry je = scheduleQueue.getNext();
361
362 if (je == null)
363 {
364 // Queue must be empty. Wait on it.
365 wait();
366 }
367 else
368 {
369 long now = System.currentTimeMillis();
370 long when = je.getNextRuntime();
371
372 if (when > now)
373 {
374 // Wait till next runtime.
375 wait(when - now);
376 }
377 else
378 {
379 // Update the next runtime for the job.
380 scheduleQueue.updateQueue(je);
381 // Return the job to run it.
382 return je;
383 }
384 }
385 }
386 }
387 catch (InterruptedException ex)
388 {
389 // ignore
390 }
391
392 // On interrupt.
393 return null;
394 }
395
396 /**
397 * Inner class. This is isolated in its own Runnable class just
398 * so that the main class need not implement Runnable, which would
399 * allow others to directly invoke run, which is not supported.
400 */
401 protected class MainLoop
402 implements Runnable
403 {
404 /**
405 * Method to run the class.
406 */
407 public void run()
408 {
409 String taskName = null;
410 try
411 {
412 while (enabled)
413 {
414 JobEntry je = nextJob();
415 if (je != null)
416 {
417 taskName = je.getTask();
418
419 // Start the thread to run the job.
420 Runnable wt = new WorkerThread(je);
421 Thread helper = new Thread(wt);
422 helper.start();
423 }
424 else
425 {
426 break;
427 }
428 }
429 }
430 catch (Exception e)
431 {
432 log.error("Error running a Scheduled Job: " + taskName, e);
433 enabled = false;
434 }
435 finally
436 {
437 clearThread();
438 }
439 }
440 }
441 }