• Skip to content
  • Skip to link menu
  • KDE API Reference
  • kdelibs-4.10.5 API Reference
  • KDE Home
  • Contact Us
 

ThreadWeaver

  • threadweaver
  • Weaver
WeaverImpl.cpp
Go to the documentation of this file.
1 /* -*- C++ -*-
2 
3 This file implements the WeaverImpl class.
4 
5 
6 $ Author: Mirko Boehm $
7 $ Copyright: (C) 2005, 2006 Mirko Boehm $
8 $ Contact: mirko@kde.org
9 http://www.kde.org
10 http://www.hackerbuero.org $
11 
12  This library is free software; you can redistribute it and/or
13  modify it under the terms of the GNU Library General Public
14  License as published by the Free Software Foundation; either
15  version 2 of the License, or (at your option) any later version.
16 
17  This library is distributed in the hope that it will be useful,
18  but WITHOUT ANY WARRANTY; without even the implied warranty of
19  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20  Library General Public License for more details.
21 
22  You should have received a copy of the GNU Library General Public License
23  along with this library; see the file COPYING.LIB. If not, write to
24  the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
25  Boston, MA 02110-1301, USA.
26 
27 $Id: WeaverImpl.cpp 30 2005-08-16 16:16:04Z mirko $
28 
29 */
30 
31 #include "WeaverImpl.h"
32 
33 #include <QtCore/QObject>
34 #include <QtCore/QMutex>
35 #include <QtCore/QDebug>
36 
37 #include "Job.h"
38 #include "State.h"
39 #include "Thread.h"
40 #include "ThreadWeaver.h"
41 #include "DebuggingAids.h"
42 #include "WeaverObserver.h"
43 #include "SuspendedState.h"
44 #include "SuspendingState.h"
45 #include "DestructedState.h"
46 #include "WorkingHardState.h"
47 #include "ShuttingDownState.h"
48 #include "InConstructionState.h"
49 
50 using namespace ThreadWeaver;
51 
52 WeaverImpl::WeaverImpl( QObject* parent )
53  : WeaverInterface(parent)
54  , m_active(0)
55  , m_inventoryMax( 4 )
56  , m_mutex ( new QMutex( QMutex::Recursive ) )
57  , m_finishMutex( new QMutex )
58  , m_jobAvailableMutex ( new QMutex )
59  , m_state (0)
60 {
61  // initialize state objects:
62  m_states[InConstruction] = new InConstructionState( this );
63  setState ( InConstruction );
64  m_states[WorkingHard] = new WorkingHardState( this );
65  m_states[Suspending] = new SuspendingState( this );
66  m_states[Suspended] = new SuspendedState( this );
67  m_states[ShuttingDown] = new ShuttingDownState( this );
68  m_states[Destructed] = new DestructedState( this );
69 
70  // FIXME (0.7) this is supposedly unnecessary
71  connect ( this, SIGNAL (asyncThreadSuspended(ThreadWeaver::Thread*)),
72  SIGNAL (threadSuspended(ThreadWeaver::Thread*)),
73  Qt::QueuedConnection );
74  setState( WorkingHard );
75 }
76 
77 WeaverImpl::~WeaverImpl()
78 { // the constructor may only be called from the thread that owns this
79  // object (everything else would be what we professionals call "insane")
80  REQUIRE( QThread::currentThread() == thread() );
81  debug ( 3, "WeaverImpl dtor: destroying inventory.\n" );
82  setState ( ShuttingDown );
83 
84  m_jobAvailable.wakeAll();
85 
86  // problem: Some threads might not be asleep yet, just finding
87  // out if a job is available. Those threads will suspend
88  // waiting for their next job (a rare case, but not impossible).
89  // Therefore, if we encounter a thread that has not exited, we
90  // have to wake it again (which we do in the following for
91  // loop).
92 
93  while (!m_inventory.isEmpty())
94  {
95  Thread* th=m_inventory.takeFirst();
96  if ( !th->isFinished() )
97  {
98  for ( ;; )
99  {
100  m_jobAvailable.wakeAll();
101  if ( th->wait( 100 ) ) break;
102  debug ( 1, "WeaverImpl::~WeaverImpl: thread %i did not exit as expected, "
103  "retrying.\n", th->id() );
104  }
105  }
106  emit ( threadExited ( th ) );
107  delete th;
108  }
109 
110  m_inventory.clear();
111  delete m_mutex;
112  delete m_finishMutex;
113  delete m_jobAvailableMutex;
114  debug ( 3, "WeaverImpl dtor: done\n" );
115  setState ( Destructed ); // m_state = Halted;
116  // FIXME: delete state objects. what sense does DestructedState make then?
117  // FIXME: make state objects static, since they are
118 }
119 
120 void WeaverImpl::setState ( StateId id )
121 {
122  if ( m_state==0 || m_state->stateId() != id )
123  {
124  m_state = m_states[id];
125  debug ( 2, "WeaverImpl::setState: state changed to \"%s\".\n",
126  m_state->stateName().toLatin1().constData() );
127  if ( id == Suspended )
128  {
129  emit ( suspended() );
130  }
131 
132  m_state->activated();
133 
134  emit ( stateChanged ( m_state ) );
135  }
136 }
137 
138 const State& WeaverImpl::state() const
139 {
140  return *m_state;
141 }
142 
143 void WeaverImpl::setMaximumNumberOfThreads( int cap )
144 {
145  Q_ASSERT_X ( cap > 0, "Weaver Impl", "Thread inventory size has to be larger than zero." );
146  QMutexLocker l (m_mutex);
147  m_inventoryMax = cap;
148 }
149 
150 int WeaverImpl::maximumNumberOfThreads() const
151 {
152  QMutexLocker l (m_mutex);
153  return m_inventoryMax;
154 }
155 
156 int WeaverImpl::currentNumberOfThreads () const
157 {
158  QMutexLocker l (m_mutex);
159  return m_inventory.count ();
160 }
161 
162 void WeaverImpl::registerObserver ( WeaverObserver *ext )
163 {
164  connect ( this, SIGNAL (stateChanged(ThreadWeaver::State*)),
165  ext, SIGNAL (weaverStateChanged(ThreadWeaver::State*)) );
166  connect ( this, SIGNAL (threadStarted(ThreadWeaver::Thread*)),
167  ext, SIGNAL (threadStarted(ThreadWeaver::Thread*)) );
168  connect ( this, SIGNAL (threadBusy(ThreadWeaver::Thread*,ThreadWeaver::Job*)),
169  ext, SIGNAL (threadBusy(ThreadWeaver::Thread*,ThreadWeaver::Job*)) );
170  connect ( this, SIGNAL (threadSuspended(ThreadWeaver::Thread*)),
171  ext, SIGNAL (threadSuspended(ThreadWeaver::Thread*)) );
172  connect ( this, SIGNAL (threadExited(ThreadWeaver::Thread*)) ,
173  ext, SIGNAL (threadExited(ThreadWeaver::Thread*)) );
174 }
175 
176 void WeaverImpl::enqueue(Job* job)
177 {
178  adjustInventory ( 1 );
179  if (job)
180  {
181  debug ( 3, "WeaverImpl::enqueue: queueing job %p of type %s.\n",
182  (void*)job, job->metaObject()->className() );
183  QMutexLocker l (m_mutex);
184  job->aboutToBeQueued ( this );
185  // find positiEon for insertion:;
186  // FIXME (after 0.6) optimize: factor out queue management into own class,
187  // and use binary search for insertion (not done yet because
188  // refactoring already planned):
189  int i = m_assignments.size();
190  if (i > 0)
191  {
192  while ( i > 0 && m_assignments.at(i - 1)->priority() < job->priority() ) --i;
193  m_assignments.insert( i, (job) );
194  } else {
195  m_assignments.append (job);
196  }
197  assignJobs();
198  }
199 }
200 
201 void WeaverImpl::adjustInventory ( int numberOfNewJobs )
202 {
203  QMutexLocker l (m_mutex);
204 
205  // no of threads that can be created:
206  const int reserve = m_inventoryMax - m_inventory.count();
207 
208  if ( reserve > 0 )
209  {
210  for ( int i = 0; i < qMin ( reserve, numberOfNewJobs ); ++i )
211  {
212  Thread *th = createThread();
213  th->moveToThread( th ); // be sane from the start
214  m_inventory.append(th);
215  connect ( th, SIGNAL (jobStarted(ThreadWeaver::Thread*,ThreadWeaver::Job*)),
216  SIGNAL (threadBusy(ThreadWeaver::Thread*,ThreadWeaver::Job*)) );
217  connect ( th, SIGNAL (jobDone(ThreadWeaver::Job*)),
218  SIGNAL (jobDone(ThreadWeaver::Job*)) );
219  connect ( th, SIGNAL (started(ThreadWeaver::Thread*)),
220  SIGNAL (threadStarted(ThreadWeaver::Thread*)) );
221 
222  th->start ();
223  debug ( 2, "WeaverImpl::adjustInventory: thread created, "
224  "%i threads in inventory.\n", currentNumberOfThreads() );
225  }
226  }
227 }
228 
229 Thread* WeaverImpl::createThread()
230 {
231  return new Thread( this );
232 }
233 
234 bool WeaverImpl::dequeue ( Job* job )
235 {
236  bool result;
237  {
238  QMutexLocker l (m_mutex);
239 
240  int i = m_assignments.indexOf ( job );
241  if ( i != -1 )
242  {
243  job->aboutToBeDequeued( this );
244 
245  m_assignments.removeAt( i );
246  result = true;
247  debug( 3, "WeaverImpl::dequeue: job %p dequeued, %i jobs left.\n",
248  (void*)job, m_assignments.size() );
249  } else {
250  debug( 3, "WeaverImpl::dequeue: job %p not found in queue.\n", (void*)job );
251  result = false;
252  }
253  }
254 
255  // from the queues point of view, a job is just as finished if
256  // it gets dequeued:
257  m_jobFinished.wakeOne();
258  return result;
259 }
260 
261 void WeaverImpl::dequeue ()
262 {
263  debug( 3, "WeaverImpl::dequeue: dequeueing all jobs.\n" );
264  QMutexLocker l (m_mutex);
265  for ( int index = 0; index < m_assignments.size(); ++index )
266  {
267  m_assignments.at( index )->aboutToBeDequeued( this );
268  }
269  m_assignments.clear();
270 
271  ENSURE ( m_assignments.isEmpty() );
272 }
273 
274 void WeaverImpl::suspend ()
275 {
276  m_state->suspend();
277 }
278 
279 void WeaverImpl::resume ( )
280 {
281  m_state->resume();
282 }
283 
284 void WeaverImpl::assignJobs()
285 {
286  m_jobAvailable.wakeAll();
287 }
288 
289 bool WeaverImpl::isEmpty() const
290 {
291  QMutexLocker l (m_mutex);
292  return m_assignments.isEmpty();
293 }
294 
295 
296 void WeaverImpl::incActiveThreadCount()
297 {
298  adjustActiveThreadCount ( 1 );
299 }
300 
301 void WeaverImpl::decActiveThreadCount()
302 {
303  adjustActiveThreadCount ( -1 );
304  // the done job could have freed another set of jobs, and we do not know how
305  // many - therefore we need to wake all threads:
306  m_jobFinished.wakeAll();
307 }
308 
309 void WeaverImpl::adjustActiveThreadCount( int diff )
310 {
311  QMutexLocker l (m_mutex);
312  m_active += diff;
313  debug ( 4, "WeaverImpl::adjustActiveThreadCount: %i active threads (%i jobs"
314  " in queue).\n", m_active, queueLength() );
315 
316  if ( m_assignments.isEmpty() && m_active == 0)
317  {
318  P_ASSERT ( diff < 0 ); // cannot reach Zero otherwise
319  emit ( finished() );
320  }
321 }
322 
323 int WeaverImpl::activeThreadCount()
324 {
325  QMutexLocker l (m_mutex);
326  return m_active;
327 }
328 
329 Job* WeaverImpl::takeFirstAvailableJob()
330 {
331  QMutexLocker l (m_mutex);
332  Job *next = 0;
333  for (int index = 0; index < m_assignments.size(); ++index)
334  {
335  if ( m_assignments.at(index)->canBeExecuted() )
336  {
337  next = m_assignments.at(index);
338  m_assignments.removeAt (index);
339  break;
340  }
341  }
342  return next;
343 }
344 
345 Job* WeaverImpl::applyForWork(Thread *th, Job* previous)
346 {
347  if (previous)
348  { // cleanup and send events:
349  decActiveThreadCount();
350  }
351  return m_state->applyForWork ( th, 0 );
352 }
353 
354 void WeaverImpl::waitForAvailableJob(Thread* th)
355 {
356  m_state->waitForAvailableJob ( th );
357 }
358 
359 void WeaverImpl::blockThreadUntilJobsAreBeingAssigned ( Thread *th )
360 { // th is the thread that calls this method:
361  Q_UNUSED ( th );
362  debug ( 4, "WeaverImpl::blockThread...: thread %i blocked.\n", th->id());
363  emit asyncThreadSuspended ( th );
364  QMutexLocker l( m_jobAvailableMutex );
365  m_jobAvailable.wait( m_jobAvailableMutex );
366  debug ( 4, "WeaverImpl::blockThread...: thread %i resumed.\n", th->id());
367 }
368 
369 int WeaverImpl::queueLength() const
370 {
371  QMutexLocker l (m_mutex);
372  return m_assignments.count();
373 }
374 
375 bool WeaverImpl::isIdle () const
376 {
377  QMutexLocker l (m_mutex);
378  return isEmpty() && m_active == 0;
379 }
380 
381 void WeaverImpl::finish()
382 {
383 #ifdef QT_NO_DEBUG
384  const int MaxWaitMilliSeconds = 200;
385 #else
386  const int MaxWaitMilliSeconds = 2000;
387 #endif
388 
389  while ( !isIdle() )
390  {
391  debug (2, "WeaverImpl::finish: not done, waiting.\n" );
392  QMutexLocker l( m_finishMutex );
393  if ( m_jobFinished.wait( m_finishMutex, MaxWaitMilliSeconds ) == false )
394  {
395  debug ( 2, "WeaverImpl::finish: wait timed out, %i jobs left, waking threads.\n",
396  queueLength() );
397  m_jobAvailable.wakeAll();
398  }
399  }
400  debug (2, "WeaverImpl::finish: done.\n\n\n" );
401 }
402 
403 void WeaverImpl::requestAbort()
404 {
405  QMutexLocker l (m_mutex);
406  for ( int i = 0; i<m_inventory.size(); ++i )
407  {
408  m_inventory[i]->requestAbort();
409  }
410 }
411 
412 void WeaverImpl::dumpJobs()
413 {
414  QMutexLocker l (m_mutex);
415  debug( 0, "WeaverImpl::dumpJobs: current jobs:\n" );
416  for ( int index = 0; index < m_assignments.size(); ++index )
417  {
418  debug( 0, "--> %4i: %p %s (priority %i)\n", index, (void*)m_assignments.at( index ),
419  m_assignments.at( index )->metaObject()->className(),
420  m_assignments.at(index)->priority() );
421  }
422 }
423 
424 #include "WeaverImpl.moc"
This file is part of the KDE documentation.
Documentation copyright © 1996-2013 The KDE developers.
Generated on Tue Jul 16 2013 11:45:26 by doxygen 1.8.1.1 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

ThreadWeaver

Skip menu "ThreadWeaver"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • File Members
  • Related Pages

kdelibs-4.10.5 API Reference

Skip menu "kdelibs-4.10.5 API Reference"
  • DNSSD
  • Interfaces
  •   KHexEdit
  •   KMediaPlayer
  •   KSpeech
  •   KTextEditor
  • kconf_update
  • KDE3Support
  •   KUnitTest
  • KDECore
  • KDED
  • KDEsu
  • KDEUI
  • KDEWebKit
  • KDocTools
  • KFile
  • KHTML
  • KImgIO
  • KInit
  • kio
  • KIOSlave
  • KJS
  •   KJS-API
  •   WTF
  • kjsembed
  • KNewStuff
  • KParts
  • KPty
  • Kross
  • KUnitConversion
  • KUtils
  • Nepomuk
  • Plasma
  • Solid
  • Sonnet
  • ThreadWeaver
Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal