001// License: GPL. For details, see Readme.txt file. 002package org.openstreetmap.gui.jmapviewer; 003 004import java.util.concurrent.BlockingDeque; 005import java.util.concurrent.LinkedBlockingDeque; 006import java.util.concurrent.TimeUnit; 007 008import org.openstreetmap.gui.jmapviewer.interfaces.TileJob; 009 010/** 011 * A generic class that processes a list of {@link Runnable} one-by-one using 012 * one or more {@link Thread}-instances. The number of instances varies between 013 * 1 and {@link #workerThreadMaxCount} (default: 8). If an instance is idle 014 * more than {@link #workerThreadTimeout} seconds (default: 30), the instance 015 * ends itself. 016 * 017 * @author Jan Peter Stotz 018 */ 019public class JobDispatcher { 020 021 private static final JobDispatcher instance = new JobDispatcher(); 022 023 /** 024 * @return the singelton instance of the {@link JobDispatcher} 025 */ 026 public static JobDispatcher getInstance() { 027 return instance; 028 } 029 030 private JobDispatcher() { 031 addWorkerThread().firstThread = true; 032 } 033 034 protected BlockingDeque<TileJob> jobQueue = new LinkedBlockingDeque<>(); 035 036 protected static int workerThreadMaxCount = 8; 037 038 /** 039 * Specifies the time span in seconds that a worker thread waits for new 040 * jobs to perform. If the time span has elapsed the worker thread 041 * terminates itself. Only the first worker thread works differently, it 042 * ignores the timeout and will never terminate itself. 043 */ 044 protected static int workerThreadTimeout = 30; 045 046 /** 047 * Type of queue, FIFO if <code>false</code>, LIFO if <code>true</code> 048 */ 049 protected boolean modeLIFO = false; 050 051 /** 052 * Total number of worker threads currently idle or active 053 */ 054 protected int workerThreadCount = 0; 055 056 /** 057 * Number of worker threads currently idle 058 */ 059 protected int workerThreadIdleCount = 0; 060 061 /** 062 * Just an id for identifying an worker thread instance 063 */ 064 protected int workerThreadId = 0; 065 066 /** 067 * Removes all jobs from the queue that are currently not being processed. 068 */ 069 public void cancelOutstandingJobs() { 070 jobQueue.clear(); 071 } 072 073 /** 074 * Function to set the maximum number of workers for tile loading. 075 */ 076 static public void setMaxWorkers(int workers) { 077 workerThreadMaxCount = workers; 078 } 079 080 /** 081 * Function to set the LIFO/FIFO mode for tile loading job. 082 * 083 * @param lifo <code>true</code> for LIFO mode, <code>false</code> for FIFO mode 084 */ 085 public void setLIFO(boolean lifo) { 086 modeLIFO = lifo; 087 } 088 089 /** 090 * Adds a job to the queue. 091 * Jobs for tiles already contained in the are ignored (using a <code>null</code> tile 092 * prevents skipping). 093 * 094 * @param job the the job to be added 095 */ 096 public void addJob(TileJob job) { 097 try { 098 if(job.getTile() != null) { 099 for(TileJob oldJob : jobQueue) { 100 if(oldJob.getTile() == job.getTile()) { 101 return; 102 } 103 } 104 } 105 jobQueue.put(job); 106 if (workerThreadIdleCount == 0 && workerThreadCount < workerThreadMaxCount) 107 addWorkerThread(); 108 } catch (InterruptedException e) { 109 } 110 } 111 112 protected JobThread addWorkerThread() { 113 JobThread jobThread = new JobThread(++workerThreadId); 114 synchronized (this) { 115 workerThreadCount++; 116 } 117 jobThread.start(); 118 return jobThread; 119 } 120 121 public class JobThread extends Thread { 122 123 Runnable job; 124 boolean firstThread = false; 125 126 public JobThread(int threadId) { 127 super("OSMJobThread " + threadId); 128 setDaemon(true); 129 job = null; 130 } 131 132 @Override 133 public void run() { 134 executeJobs(); 135 synchronized (instance) { 136 workerThreadCount--; 137 } 138 } 139 140 protected void executeJobs() { 141 while (!isInterrupted()) { 142 try { 143 synchronized (instance) { 144 workerThreadIdleCount++; 145 } 146 if(modeLIFO) { 147 if (firstThread) 148 job = jobQueue.takeLast(); 149 else 150 job = jobQueue.pollLast(workerThreadTimeout, TimeUnit.SECONDS); 151 } else { 152 if (firstThread) 153 job = jobQueue.take(); 154 else 155 job = jobQueue.poll(workerThreadTimeout, TimeUnit.SECONDS); 156 } 157 } catch (InterruptedException e1) { 158 return; 159 } finally { 160 synchronized (instance) { 161 workerThreadIdleCount--; 162 } 163 } 164 if (job == null) 165 return; 166 try { 167 job.run(); 168 job = null; 169 } catch (Exception e) { 170 e.printStackTrace(); 171 } 172 } 173 } 174 } 175 176}