akonadi
resourcebase.cpp
00001 /* 00002 Copyright (c) 2006 Till Adam <adam@kde.org> 00003 Copyright (c) 2007 Volker Krause <vkrause@kde.org> 00004 00005 This library is free software; you can redistribute it and/or modify it 00006 under the terms of the GNU Library General Public License as published by 00007 the Free Software Foundation; either version 2 of the License, or (at your 00008 option) any later version. 00009 00010 This library is distributed in the hope that it will be useful, but WITHOUT 00011 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 00012 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 00013 License for more details. 00014 00015 You should have received a copy of the GNU Library General Public License 00016 along with this library; see the file COPYING.LIB. If not, write to the 00017 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 00018 02110-1301, USA. 00019 */ 00020 00021 #include "resourcebase.h" 00022 #include "agentbase_p.h" 00023 00024 #include "resourceadaptor.h" 00025 #include "collectiondeletejob.h" 00026 #include "collectionsync_p.h" 00027 #include "dbusconnectionpool.h" 00028 #include "itemsync.h" 00029 #include "resourcescheduler_p.h" 00030 #include "tracerinterface.h" 00031 #include "xdgbasedirs_p.h" 00032 00033 #include "changerecorder.h" 00034 #include "collectionfetchjob.h" 00035 #include "collectionfetchscope.h" 00036 #include "collectionmodifyjob.h" 00037 #include "itemfetchjob.h" 00038 #include "itemfetchscope.h" 00039 #include "itemmodifyjob.h" 00040 #include "itemmodifyjob_p.h" 00041 #include "session.h" 00042 #include "resourceselectjob_p.h" 00043 #include "monitor_p.h" 00044 #include "servermanager_p.h" 00045 00046 #include <kaboutdata.h> 00047 #include <kcmdlineargs.h> 00048 #include <kdebug.h> 00049 #include <klocale.h> 00050 00051 #include <QtCore/QDebug> 00052 #include <QtCore/QDir> 00053 #include <QtCore/QHash> 00054 #include <QtCore/QSettings> 00055 #include <QtCore/QTimer> 00056 #include <QtGui/QApplication> 00057 #include <QtDBus/QtDBus> 00058 00059 using namespace Akonadi; 00060 00061 class Akonadi::ResourceBasePrivate : public AgentBasePrivate 00062 { 00063 Q_OBJECT 00064 Q_CLASSINFO( "D-Bus Interface", "org.kde.dfaure" ) 00065 00066 public: 00067 ResourceBasePrivate( ResourceBase *parent ) 00068 : AgentBasePrivate( parent ), 00069 scheduler( 0 ), 00070 mItemSyncer( 0 ), 00071 mItemSyncFetchScope( 0 ), 00072 mItemTransactionMode( ItemSync::SingleTransaction ), 00073 mCollectionSyncer( 0 ), 00074 mHierarchicalRid( false ), 00075 mUnemittedProgress( 0 ) 00076 { 00077 Internal::setClientType( Internal::Resource ); 00078 mStatusMessage = defaultReadyMessage(); 00079 mProgressEmissionCompressor.setInterval( 1000 ); 00080 mProgressEmissionCompressor.setSingleShot( true ); 00081 } 00082 00083 ~ResourceBasePrivate() 00084 { 00085 delete mItemSyncFetchScope; 00086 } 00087 00088 Q_DECLARE_PUBLIC( ResourceBase ) 00089 00090 void delayedInit() 00091 { 00092 if ( !DBusConnectionPool::threadConnection().registerService( QLatin1String( "org.freedesktop.Akonadi.Resource." ) + mId ) ) { 00093 QString reason = DBusConnectionPool::threadConnection().lastError().message(); 00094 if ( reason.isEmpty() ) { 00095 reason = QString::fromLatin1( "this service is probably running already." ); 00096 } 00097 kError() << "Unable to register service at D-Bus: " << reason; 00098 00099 if ( QThread::currentThread() == QCoreApplication::instance()->thread() ) 00100 QCoreApplication::instance()->exit(1); 00101 00102 } else { 00103 AgentBasePrivate::delayedInit(); 00104 } 00105 } 00106 00107 virtual void changeProcessed() 00108 { 00109 mChangeRecorder->changeProcessed(); 00110 if ( !mChangeRecorder->isEmpty() ) 00111 scheduler->scheduleChangeReplay(); 00112 scheduler->taskDone(); 00113 } 00114 00115 void slotAbortRequested(); 00116 00117 void slotDeliveryDone( KJob* job ); 00118 void slotCollectionSyncDone( KJob *job ); 00119 void slotLocalListDone( KJob *job ); 00120 void slotSynchronizeCollection( const Collection &col ); 00121 void slotCollectionListDone( KJob *job ); 00122 void slotSynchronizeCollectionAttributes( const Collection &col ); 00123 void slotCollectionListForAttributesDone( KJob *job ); 00124 void slotCollectionAttributesSyncDone( KJob *job ); 00125 00126 void slotItemSyncDone( KJob *job ); 00127 00128 void slotPercent( KJob* job, unsigned long percent ); 00129 void slotDelayedEmitProgress(); 00130 void slotDeleteResourceCollection(); 00131 void slotDeleteResourceCollectionDone( KJob *job ); 00132 void slotCollectionDeletionDone( KJob *job ); 00133 00134 void slotPrepareItemRetrieval( const Akonadi::Item &item ); 00135 void slotPrepareItemRetrievalResult( KJob* job ); 00136 00137 void changeCommittedResult( KJob* job ); 00138 00139 void slotSessionReconnected() 00140 { 00141 Q_Q( ResourceBase ); 00142 00143 new ResourceSelectJob( q->identifier() ); 00144 } 00145 00146 void createItemSyncInstanceIfMissing() 00147 { 00148 Q_Q( ResourceBase ); 00149 Q_ASSERT_X( scheduler->currentTask().type == ResourceScheduler::SyncCollection, 00150 "createItemSyncInstance", "Calling items retrieval methods although no item retrieval is in progress" ); 00151 if ( !mItemSyncer ) { 00152 mItemSyncer = new ItemSync( q->currentCollection() ); 00153 mItemSyncer->setTransactionMode( mItemTransactionMode ); 00154 if ( mItemSyncFetchScope ) 00155 mItemSyncer->setFetchScope( *mItemSyncFetchScope ); 00156 mItemSyncer->setProperty( "collection", QVariant::fromValue( q->currentCollection() ) ); 00157 connect( mItemSyncer, SIGNAL( percent( KJob*, unsigned long ) ), q, SLOT( slotPercent( KJob*, unsigned long ) ) ); 00158 connect( mItemSyncer, SIGNAL( result( KJob* ) ), q, SLOT( slotItemSyncDone( KJob* ) ) ); 00159 } 00160 Q_ASSERT( mItemSyncer ); 00161 } 00162 00163 public Q_SLOTS: 00164 Q_SCRIPTABLE void dump() 00165 { 00166 scheduler->dump(); 00167 } 00168 00169 Q_SCRIPTABLE void clear() 00170 { 00171 scheduler->clear(); 00172 } 00173 00174 protected Q_SLOTS: 00175 // reimplementations from AgentbBasePrivate, containing sanity checks that only apply to resources 00176 // such as making sure that RIDs are present as well as translations of cross-resource moves 00177 // TODO: we could possibly add recovery code for no-RID notifications by re-enquing those to the change recorder 00178 // as the corresponding Add notifications, although that contains a risk of endless fail/retry loops 00179 00180 void itemAdded(const Akonadi::Item& item, const Akonadi::Collection& collection) 00181 { 00182 if ( collection.remoteId().isEmpty() ) { 00183 changeProcessed(); 00184 return; 00185 } 00186 AgentBasePrivate::itemAdded( item, collection ); 00187 } 00188 00189 void itemChanged(const Akonadi::Item& item, const QSet< QByteArray >& partIdentifiers) 00190 { 00191 if ( item.remoteId().isEmpty() ) { 00192 changeProcessed(); 00193 return; 00194 } 00195 AgentBasePrivate::itemChanged( item, partIdentifiers ); 00196 } 00197 00198 // TODO move the move translation code from AgebtBasePrivate here, it's wrong for agents 00199 void itemMoved(const Akonadi::Item &item, const Akonadi::Collection &source, const Akonadi::Collection &destination) 00200 { 00201 if ( item.remoteId().isEmpty() || destination.remoteId().isEmpty() || destination == source ) { 00202 changeProcessed(); 00203 return; 00204 } 00205 AgentBasePrivate::itemMoved( item, source, destination ); 00206 } 00207 00208 void itemRemoved(const Akonadi::Item& item) 00209 { 00210 if ( item.remoteId().isEmpty() ) { 00211 changeProcessed(); 00212 return; 00213 } 00214 AgentBasePrivate::itemRemoved( item ); 00215 } 00216 00217 void collectionAdded(const Akonadi::Collection& collection, const Akonadi::Collection& parent) 00218 { 00219 if ( parent.remoteId().isEmpty() ) { 00220 changeProcessed(); 00221 return; 00222 } 00223 AgentBasePrivate::collectionAdded( collection, parent ); 00224 } 00225 00226 void collectionChanged(const Akonadi::Collection& collection) 00227 { 00228 if ( collection.remoteId().isEmpty() ) { 00229 changeProcessed(); 00230 return; 00231 } 00232 AgentBasePrivate::collectionChanged( collection ); 00233 } 00234 00235 void collectionChanged(const Akonadi::Collection& collection, const QSet< QByteArray >& partIdentifiers) 00236 { 00237 if ( collection.remoteId().isEmpty() ) { 00238 changeProcessed(); 00239 return; 00240 } 00241 AgentBasePrivate::collectionChanged( collection, partIdentifiers ); 00242 } 00243 00244 // TODO move the move translation code from AgebtBasePrivate here, it's wrong for agents 00245 void collectionMoved(const Akonadi::Collection& collection, const Akonadi::Collection& source, const Akonadi::Collection& destination) 00246 { 00247 if ( collection.remoteId().isEmpty() || destination.remoteId().isEmpty() || source == destination ) { 00248 changeProcessed(); 00249 return; 00250 } 00251 AgentBasePrivate::collectionMoved( collection, source, destination ); 00252 } 00253 00254 void collectionRemoved(const Akonadi::Collection& collection) 00255 { 00256 if ( collection.remoteId().isEmpty() ) { 00257 changeProcessed(); 00258 return; 00259 } 00260 AgentBasePrivate::collectionRemoved( collection ); 00261 } 00262 00263 public: 00264 // synchronize states 00265 Collection currentCollection; 00266 00267 ResourceScheduler *scheduler; 00268 ItemSync *mItemSyncer; 00269 ItemFetchScope *mItemSyncFetchScope; 00270 ItemSync::TransactionMode mItemTransactionMode; 00271 CollectionSync *mCollectionSyncer; 00272 bool mHierarchicalRid; 00273 QTimer mProgressEmissionCompressor; 00274 int mUnemittedProgress; 00275 QMap<Akonadi::Collection::Id, QVariantMap> mUnemittedAdvancedStatus; 00276 }; 00277 00278 ResourceBase::ResourceBase( const QString & id ) 00279 : AgentBase( new ResourceBasePrivate( this ), id ) 00280 { 00281 Q_D( ResourceBase ); 00282 00283 new Akonadi__ResourceAdaptor( this ); 00284 00285 d->scheduler = new ResourceScheduler( this ); 00286 00287 d->mChangeRecorder->setChangeRecordingEnabled( true ); 00288 connect( d->mChangeRecorder, SIGNAL( changesAdded() ), 00289 d->scheduler, SLOT( scheduleChangeReplay() ) ); 00290 00291 d->mChangeRecorder->setResourceMonitored( d->mId.toLatin1() ); 00292 d->mChangeRecorder->fetchCollection( true ); 00293 00294 connect( d->scheduler, SIGNAL( executeFullSync() ), 00295 SLOT( retrieveCollections() ) ); 00296 connect( d->scheduler, SIGNAL( executeCollectionTreeSync() ), 00297 SLOT( retrieveCollections() ) ); 00298 connect( d->scheduler, SIGNAL( executeCollectionSync( const Akonadi::Collection& ) ), 00299 SLOT( slotSynchronizeCollection( const Akonadi::Collection& ) ) ); 00300 connect( d->scheduler, SIGNAL( executeCollectionAttributesSync( const Akonadi::Collection& ) ), 00301 SLOT( slotSynchronizeCollectionAttributes(Akonadi::Collection)) ); 00302 connect( d->scheduler, SIGNAL( executeItemFetch( const Akonadi::Item&, const QSet<QByteArray>& ) ), 00303 SLOT( slotPrepareItemRetrieval(Akonadi::Item)) ); 00304 connect( d->scheduler, SIGNAL( executeResourceCollectionDeletion() ), 00305 SLOT( slotDeleteResourceCollection() ) ); 00306 connect( d->scheduler, SIGNAL( status( int, const QString& ) ), 00307 SIGNAL( status( int, const QString& ) ) ); 00308 connect( d->scheduler, SIGNAL( executeChangeReplay() ), 00309 d->mChangeRecorder, SLOT( replayNext() ) ); 00310 connect( d->scheduler, SIGNAL( fullSyncComplete() ), SIGNAL( synchronized() ) ); 00311 connect( d->mChangeRecorder, SIGNAL( nothingToReplay() ), d->scheduler, SLOT( taskDone() ) ); 00312 connect( d->mChangeRecorder, SIGNAL( collectionRemoved( const Akonadi::Collection& ) ), 00313 d->scheduler, SLOT( collectionRemoved( const Akonadi::Collection& ) ) ); 00314 connect( this, SIGNAL( abortRequested() ), this, SLOT( slotAbortRequested() ) ); 00315 connect( this, SIGNAL( synchronized() ), d->scheduler, SLOT( taskDone() ) ); 00316 connect( this, SIGNAL( agentNameChanged( const QString& ) ), 00317 this, SIGNAL( nameChanged( const QString& ) ) ); 00318 00319 connect( &d->mProgressEmissionCompressor, SIGNAL( timeout() ), 00320 this, SLOT( slotDelayedEmitProgress() ) ); 00321 00322 d->scheduler->setOnline( d->mOnline ); 00323 if ( !d->mChangeRecorder->isEmpty() ) 00324 d->scheduler->scheduleChangeReplay(); 00325 00326 DBusConnectionPool::threadConnection().registerObject( QLatin1String( "/Debug" ), d, QDBusConnection::ExportScriptableSlots ); 00327 00328 new ResourceSelectJob( identifier() ); 00329 00330 connect( d->mChangeRecorder->session(), SIGNAL( reconnected() ), SLOT( slotSessionReconnected() ) ); 00331 } 00332 00333 ResourceBase::~ResourceBase() 00334 { 00335 } 00336 00337 void ResourceBase::synchronize() 00338 { 00339 d_func()->scheduler->scheduleFullSync(); 00340 } 00341 00342 void ResourceBase::setName( const QString &name ) 00343 { 00344 AgentBase::setAgentName( name ); 00345 } 00346 00347 QString ResourceBase::name() const 00348 { 00349 return AgentBase::agentName(); 00350 } 00351 00352 QString ResourceBase::parseArguments( int argc, char **argv ) 00353 { 00354 QString identifier; 00355 if ( argc < 3 ) { 00356 kDebug() << "Not enough arguments passed..."; 00357 exit( 1 ); 00358 } 00359 00360 for ( int i = 1; i < argc - 1; ++i ) { 00361 if ( QLatin1String( argv[ i ] ) == QLatin1String( "--identifier" ) ) 00362 identifier = QLatin1String( argv[ i + 1 ] ); 00363 } 00364 00365 if ( identifier.isEmpty() ) { 00366 kDebug() << "Identifier argument missing"; 00367 exit( 1 ); 00368 } 00369 00370 const QFileInfo fi( QString::fromLocal8Bit( argv[0] ) ); 00371 // strip off full path and possible .exe suffix 00372 const QByteArray catalog = fi.baseName().toLatin1(); 00373 00374 KCmdLineArgs::init( argc, argv, identifier.toLatin1(), catalog, 00375 ki18nc( "@title application name", "Akonadi Resource" ), "0.1", 00376 ki18nc( "@title application description", "Akonadi Resource" ) ); 00377 00378 KCmdLineOptions options; 00379 options.add( "identifier <argument>", 00380 ki18nc( "@label commandline option", "Resource identifier" ) ); 00381 KCmdLineArgs::addCmdLineOptions( options ); 00382 00383 return identifier; 00384 } 00385 00386 int ResourceBase::init( ResourceBase *r ) 00387 { 00388 QApplication::setQuitOnLastWindowClosed( false ); 00389 KGlobal::locale()->insertCatalog( QLatin1String( "libakonadi" ) ); 00390 int rv = kapp->exec(); 00391 delete r; 00392 return rv; 00393 } 00394 00395 void ResourceBasePrivate::slotAbortRequested() 00396 { 00397 Q_Q( ResourceBase ); 00398 00399 scheduler->cancelQueues(); 00400 QMetaObject::invokeMethod( q, "abortActivity" ); 00401 } 00402 00403 void ResourceBase::itemRetrieved( const Item &item ) 00404 { 00405 Q_D( ResourceBase ); 00406 Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::FetchItem ); 00407 if ( !item.isValid() ) { 00408 d->scheduler->currentTask().sendDBusReplies( false ); 00409 d->scheduler->taskDone(); 00410 return; 00411 } 00412 00413 Item i( item ); 00414 QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts; 00415 foreach ( const QByteArray &part, requestedParts ) { 00416 if ( !item.loadedPayloadParts().contains( part ) ) { 00417 kWarning() << "Item does not provide part" << part; 00418 } 00419 } 00420 00421 ItemModifyJob *job = new ItemModifyJob( i ); 00422 // FIXME: remove once the item with which we call retrieveItem() has a revision number 00423 job->disableRevisionCheck(); 00424 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotDeliveryDone( KJob* ) ) ); 00425 } 00426 00427 void ResourceBasePrivate::slotDeliveryDone(KJob * job) 00428 { 00429 Q_Q( ResourceBase ); 00430 Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::FetchItem ); 00431 if ( job->error() ) { 00432 emit q->error( QLatin1String( "Error while creating item: " ) + job->errorString() ); 00433 } 00434 scheduler->currentTask().sendDBusReplies( !job->error() ); 00435 scheduler->taskDone(); 00436 } 00437 00438 void ResourceBase::collectionAttributesRetrieved( const Collection &collection ) 00439 { 00440 Q_D( ResourceBase ); 00441 Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes ); 00442 if ( !collection.isValid() ) { 00443 emit attributesSynchronized( d->scheduler->currentTask().collection.id() ); 00444 d->scheduler->taskDone(); 00445 return; 00446 } 00447 00448 CollectionModifyJob *job = new CollectionModifyJob( collection ); 00449 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionAttributesSyncDone( KJob* ) ) ); 00450 } 00451 00452 void ResourceBasePrivate::slotCollectionAttributesSyncDone(KJob * job) 00453 { 00454 Q_Q( ResourceBase ); 00455 Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes ); 00456 if ( job->error() ) { 00457 emit q->error( QLatin1String( "Error while updating collection: " ) + job->errorString() ); 00458 } 00459 emit q->attributesSynchronized( scheduler->currentTask().collection.id() ); 00460 scheduler->taskDone(); 00461 } 00462 00463 void ResourceBasePrivate::slotDeleteResourceCollection() 00464 { 00465 Q_Q( ResourceBase ); 00466 00467 CollectionFetchJob *job = new CollectionFetchJob( Collection::root(), CollectionFetchJob::FirstLevel ); 00468 job->fetchScope().setResource( q->identifier() ); 00469 connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotDeleteResourceCollectionDone( KJob* ) ) ); 00470 } 00471 00472 void ResourceBasePrivate::slotDeleteResourceCollectionDone( KJob *job ) 00473 { 00474 Q_Q( ResourceBase ); 00475 if ( job->error() ) { 00476 emit q->error( job->errorString() ); 00477 scheduler->taskDone(); 00478 } else { 00479 const CollectionFetchJob *fetchJob = static_cast<const CollectionFetchJob*>( job ); 00480 00481 if ( !fetchJob->collections().isEmpty() ) { 00482 CollectionDeleteJob *job = new CollectionDeleteJob( fetchJob->collections().first() ); 00483 connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotCollectionDeletionDone( KJob* ) ) ); 00484 } else { 00485 // there is no resource collection, so just ignore the request 00486 scheduler->taskDone(); 00487 } 00488 } 00489 } 00490 00491 void ResourceBasePrivate::slotCollectionDeletionDone( KJob *job ) 00492 { 00493 Q_Q( ResourceBase ); 00494 if ( job->error() ) { 00495 emit q->error( job->errorString() ); 00496 } 00497 00498 scheduler->taskDone(); 00499 } 00500 00501 void ResourceBase::changeCommitted( const Item& item ) 00502 { 00503 Q_D( ResourceBase ); 00504 ItemModifyJob *job = new ItemModifyJob( item ); 00505 job->d_func()->setClean(); 00506 job->disableRevisionCheck(); // TODO: remove, but where/how do we handle the error? 00507 job->setIgnorePayload( true ); // we only want to reset the dirty flag and update the remote id 00508 d->changeProcessed(); 00509 } 00510 00511 void ResourceBase::changeCommitted( const Collection &collection ) 00512 { 00513 CollectionModifyJob *job = new CollectionModifyJob( collection ); 00514 connect( job, SIGNAL( result( KJob* ) ), SLOT( changeCommittedResult( KJob* ) ) ); 00515 } 00516 00517 void ResourceBasePrivate::changeCommittedResult( KJob *job ) 00518 { 00519 Q_Q( ResourceBase ); 00520 if ( job->error() ) 00521 emit q->error( i18nc( "@info", "Updating local collection failed: %1.", job->errorText() ) ); 00522 mChangeRecorder->d_ptr->invalidateCache( static_cast<CollectionModifyJob*>( job )->collection() ); 00523 changeProcessed(); 00524 } 00525 00526 bool ResourceBase::requestItemDelivery( qint64 uid, const QString & remoteId, 00527 const QString &mimeType, const QStringList &_parts ) 00528 { 00529 Q_D( ResourceBase ); 00530 if ( !isOnline() ) { 00531 emit error( i18nc( "@info", "Cannot fetch item in offline mode." ) ); 00532 return false; 00533 } 00534 00535 setDelayedReply( true ); 00536 // FIXME: we need at least the revision number too 00537 Item item( uid ); 00538 item.setMimeType( mimeType ); 00539 item.setRemoteId( remoteId ); 00540 00541 QSet<QByteArray> parts; 00542 Q_FOREACH( const QString &str, _parts ) 00543 parts.insert( str.toLatin1() ); 00544 00545 d->scheduler->scheduleItemFetch( item, parts, message().createReply() ); 00546 00547 return true; 00548 } 00549 00550 void ResourceBase::collectionsRetrieved( const Collection::List & collections ) 00551 { 00552 Q_D( ResourceBase ); 00553 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00554 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00555 "ResourceBase::collectionsRetrieved()", 00556 "Calling collectionsRetrieved() although no collection retrieval is in progress" ); 00557 if ( !d->mCollectionSyncer ) { 00558 d->mCollectionSyncer = new CollectionSync( identifier() ); 00559 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); 00560 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); 00561 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) ); 00562 } 00563 d->mCollectionSyncer->setRemoteCollections( collections ); 00564 } 00565 00566 void ResourceBase::collectionsRetrievedIncremental( const Collection::List & changedCollections, 00567 const Collection::List & removedCollections ) 00568 { 00569 Q_D( ResourceBase ); 00570 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00571 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00572 "ResourceBase::collectionsRetrievedIncremental()", 00573 "Calling collectionsRetrievedIncremental() although no collection retrieval is in progress" ); 00574 if ( !d->mCollectionSyncer ) { 00575 d->mCollectionSyncer = new CollectionSync( identifier() ); 00576 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); 00577 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); 00578 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) ); 00579 } 00580 d->mCollectionSyncer->setRemoteCollections( changedCollections, removedCollections ); 00581 } 00582 00583 void ResourceBase::setCollectionStreamingEnabled( bool enable ) 00584 { 00585 Q_D( ResourceBase ); 00586 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00587 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00588 "ResourceBase::setCollectionStreamingEnabled()", 00589 "Calling setCollectionStreamingEnabled() although no collection retrieval is in progress" ); 00590 if ( !d->mCollectionSyncer ) { 00591 d->mCollectionSyncer = new CollectionSync( identifier() ); 00592 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); 00593 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); 00594 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) ); 00595 } 00596 d->mCollectionSyncer->setStreamingEnabled( enable ); 00597 } 00598 00599 void ResourceBase::collectionsRetrievalDone() 00600 { 00601 Q_D( ResourceBase ); 00602 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00603 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00604 "ResourceBase::collectionsRetrievalDone()", 00605 "Calling collectionsRetrievalDone() although no collection retrieval is in progress" ); 00606 // streaming enabled, so finalize the sync 00607 if ( d->mCollectionSyncer ) { 00608 d->mCollectionSyncer->retrievalDone(); 00609 } 00610 // user did the sync himself, we are done now 00611 else { 00612 // FIXME: we need the same special case for SyncAll as in slotCollectionSyncDone here! 00613 d->scheduler->taskDone(); 00614 } 00615 } 00616 00617 void ResourceBasePrivate::slotCollectionSyncDone( KJob * job ) 00618 { 00619 Q_Q( ResourceBase ); 00620 mCollectionSyncer = 0; 00621 if ( job->error() ) { 00622 if ( job->error() != Job::UserCanceled ) 00623 emit q->error( job->errorString() ); 00624 } else { 00625 if ( scheduler->currentTask().type == ResourceScheduler::SyncAll ) { 00626 CollectionFetchJob *list = new CollectionFetchJob( Collection::root(), CollectionFetchJob::Recursive ); 00627 list->setFetchScope( q->changeRecorder()->collectionFetchScope() ); 00628 list->fetchScope().setResource( mId ); 00629 q->connect( list, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalListDone( KJob* ) ) ); 00630 return; 00631 } 00632 } 00633 scheduler->taskDone(); 00634 } 00635 00636 void ResourceBasePrivate::slotLocalListDone( KJob * job ) 00637 { 00638 Q_Q( ResourceBase ); 00639 if ( job->error() ) { 00640 emit q->error( job->errorString() ); 00641 } else { 00642 Collection::List cols = static_cast<CollectionFetchJob*>( job )->collections(); 00643 foreach ( const Collection &col, cols ) { 00644 scheduler->scheduleSync( col ); 00645 } 00646 scheduler->scheduleFullSyncCompletion(); 00647 } 00648 scheduler->taskDone(); 00649 } 00650 00651 void ResourceBasePrivate::slotSynchronizeCollection( const Collection &col ) 00652 { 00653 Q_Q( ResourceBase ); 00654 currentCollection = col; 00655 // check if this collection actually can contain anything 00656 QStringList contentTypes = currentCollection.contentMimeTypes(); 00657 contentTypes.removeAll( Collection::mimeType() ); 00658 if ( !contentTypes.isEmpty() || (col.rights() & (Collection::CanLinkItem)) ) { // HACK to check for virtual collections 00659 emit q->status( AgentBase::Running, i18nc( "@info:status", "Syncing collection '%1'", currentCollection.name() ) ); 00660 q->retrieveItems( currentCollection ); 00661 return; 00662 } 00663 scheduler->taskDone(); 00664 } 00665 00666 void ResourceBasePrivate::slotSynchronizeCollectionAttributes( const Collection &col ) 00667 { 00668 Q_Q( ResourceBase ); 00669 QMetaObject::invokeMethod( q, "retrieveCollectionAttributes", Q_ARG( Akonadi::Collection, col ) ); 00670 } 00671 00672 void ResourceBasePrivate::slotPrepareItemRetrieval( const Akonadi::Item &item ) 00673 { 00674 Q_Q( ResourceBase ); 00675 ItemFetchJob *fetch = new ItemFetchJob( item, this ); 00676 fetch->fetchScope().setAncestorRetrieval( q->changeRecorder()->itemFetchScope().ancestorRetrieval() ); 00677 fetch->fetchScope().setCacheOnly( true ); 00678 00679 // copy list of attributes to fetch 00680 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes(); 00681 foreach ( const QByteArray &attribute, attributes ) 00682 fetch->fetchScope().fetchAttribute( attribute ); 00683 00684 q->connect( fetch, SIGNAL( result( KJob* ) ), SLOT( slotPrepareItemRetrievalResult( KJob* ) ) ); 00685 } 00686 00687 void ResourceBasePrivate::slotPrepareItemRetrievalResult( KJob* job ) 00688 { 00689 Q_Q( ResourceBase ); 00690 Q_ASSERT_X( scheduler->currentTask().type == ResourceScheduler::FetchItem, 00691 "ResourceBasePrivate::slotPrepareItemRetrievalResult()", 00692 "Preparing item retrieval although no item retrieval is in progress" ); 00693 if ( job->error() ) { 00694 q->cancelTask( job->errorText() ); 00695 return; 00696 } 00697 ItemFetchJob *fetch = qobject_cast<ItemFetchJob*>( job ); 00698 if ( fetch->items().count() != 1 ) { 00699 q->cancelTask( i18n( "The requested item no longer exists" ) ); 00700 return; 00701 } 00702 const Item item = fetch->items().first(); 00703 const QSet<QByteArray> parts = scheduler->currentTask().itemParts; 00704 if ( !q->retrieveItem( item, parts ) ) 00705 q->cancelTask(); 00706 } 00707 00708 void ResourceBase::itemsRetrievalDone() 00709 { 00710 Q_D( ResourceBase ); 00711 // streaming enabled, so finalize the sync 00712 if ( d->mItemSyncer ) { 00713 d->mItemSyncer->deliveryDone(); 00714 } 00715 // user did the sync himself, we are done now 00716 else { 00717 d->scheduler->taskDone(); 00718 } 00719 } 00720 00721 void ResourceBase::clearCache() 00722 { 00723 Q_D( ResourceBase ); 00724 d->scheduler->scheduleResourceCollectionDeletion(); 00725 } 00726 00727 Collection ResourceBase::currentCollection() const 00728 { 00729 Q_D( const ResourceBase ); 00730 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection , 00731 "ResourceBase::currentCollection()", 00732 "Trying to access current collection although no item retrieval is in progress" ); 00733 return d->currentCollection; 00734 } 00735 00736 Item ResourceBase::currentItem() const 00737 { 00738 Q_D( const ResourceBase ); 00739 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::FetchItem , 00740 "ResourceBase::currentItem()", 00741 "Trying to access current item although no item retrieval is in progress" ); 00742 return d->scheduler->currentTask().item; 00743 } 00744 00745 void ResourceBase::synchronizeCollectionTree() 00746 { 00747 d_func()->scheduler->scheduleCollectionTreeSync(); 00748 } 00749 00750 void ResourceBase::cancelTask() 00751 { 00752 Q_D( ResourceBase ); 00753 switch ( d->scheduler->currentTask().type ) { 00754 case ResourceScheduler::FetchItem: 00755 itemRetrieved( Item() ); // sends the error reply and 00756 break; 00757 case ResourceScheduler::ChangeReplay: 00758 d->changeProcessed(); 00759 break; 00760 case ResourceScheduler::SyncCollectionTree: 00761 case ResourceScheduler::SyncAll: 00762 if ( d->mCollectionSyncer ) 00763 d->mCollectionSyncer->rollback(); 00764 else 00765 d->scheduler->taskDone(); 00766 break; 00767 case ResourceScheduler::SyncCollection: 00768 if ( d->mItemSyncer ) 00769 d->mItemSyncer->rollback(); 00770 else 00771 d->scheduler->taskDone(); 00772 break; 00773 default: 00774 d->scheduler->taskDone(); 00775 } 00776 } 00777 00778 void ResourceBase::cancelTask( const QString &msg ) 00779 { 00780 cancelTask(); 00781 00782 emit error( msg ); 00783 } 00784 00785 void ResourceBase::deferTask() 00786 { 00787 Q_D( ResourceBase ); 00788 d->scheduler->deferTask(); 00789 } 00790 00791 void ResourceBase::doSetOnline( bool state ) 00792 { 00793 d_func()->scheduler->setOnline( state ); 00794 } 00795 00796 void ResourceBase::synchronizeCollection( qint64 collectionId ) 00797 { 00798 synchronizeCollection( collectionId, false ); 00799 } 00800 00801 void ResourceBase::synchronizeCollection( qint64 collectionId, bool recursive ) 00802 { 00803 CollectionFetchJob* job = new CollectionFetchJob( Collection( collectionId ), recursive ? CollectionFetchJob::Recursive : CollectionFetchJob::Base ); 00804 job->setFetchScope( changeRecorder()->collectionFetchScope() ); 00805 job->fetchScope().setResource( identifier() ); 00806 job->setProperty( "recursive", recursive ); 00807 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionListDone( KJob* ) ) ); 00808 } 00809 00810 void ResourceBasePrivate::slotCollectionListDone( KJob *job ) 00811 { 00812 if ( !job->error() ) { 00813 Collection::List list = static_cast<CollectionFetchJob*>( job )->collections(); 00814 if ( !list.isEmpty() ) { 00815 if ( job->property( "recursive" ).toBool() ) { 00816 Q_FOREACH ( const Collection &collection, list ) { 00817 scheduler->scheduleSync( collection ); 00818 } 00819 } else { 00820 scheduler->scheduleSync( list.first() ); 00821 } 00822 } 00823 } 00824 // TODO: error handling 00825 } 00826 00827 void ResourceBase::synchronizeCollectionAttributes( qint64 collectionId ) 00828 { 00829 CollectionFetchJob* job = new CollectionFetchJob( Collection( collectionId ), CollectionFetchJob::Base ); 00830 job->setFetchScope( changeRecorder()->collectionFetchScope() ); 00831 job->fetchScope().setResource( identifier() ); 00832 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionListForAttributesDone( KJob* ) ) ); 00833 } 00834 00835 void ResourceBasePrivate::slotCollectionListForAttributesDone( KJob *job ) 00836 { 00837 if ( !job->error() ) { 00838 Collection::List list = static_cast<CollectionFetchJob*>( job )->collections(); 00839 if ( !list.isEmpty() ) { 00840 Collection col = list.first(); 00841 scheduler->scheduleAttributesSync( col ); 00842 } 00843 } 00844 // TODO: error handling 00845 } 00846 00847 void ResourceBase::setTotalItems( int amount ) 00848 { 00849 kDebug() << amount; 00850 Q_D( ResourceBase ); 00851 setItemStreamingEnabled( true ); 00852 d->mItemSyncer->setTotalItems( amount ); 00853 } 00854 00855 void ResourceBase::setItemStreamingEnabled( bool enable ) 00856 { 00857 Q_D( ResourceBase ); 00858 d->createItemSyncInstanceIfMissing(); 00859 d->mItemSyncer->setStreamingEnabled( enable ); 00860 } 00861 00862 void ResourceBase::itemsRetrieved( const Item::List &items ) 00863 { 00864 Q_D( ResourceBase ); 00865 d->createItemSyncInstanceIfMissing(); 00866 d->mItemSyncer->setFullSyncItems( items ); 00867 } 00868 00869 void ResourceBase::itemsRetrievedIncremental( const Item::List &changedItems, const Item::List &removedItems ) 00870 { 00871 Q_D( ResourceBase ); 00872 d->createItemSyncInstanceIfMissing(); 00873 d->mItemSyncer->setIncrementalSyncItems( changedItems, removedItems ); 00874 } 00875 00876 void ResourceBasePrivate::slotItemSyncDone( KJob *job ) 00877 { 00878 mItemSyncer = 0; 00879 Q_Q( ResourceBase ); 00880 if ( job->error() && job->error() != Job::UserCanceled ) { 00881 emit q->error( job->errorString() ); 00882 } 00883 scheduler->taskDone(); 00884 } 00885 00886 00887 void ResourceBasePrivate::slotDelayedEmitProgress() 00888 { 00889 Q_Q( ResourceBase ); 00890 emit q->percent( mUnemittedProgress ); 00891 00892 Q_FOREACH( const QVariantMap &statusMap, mUnemittedAdvancedStatus ) { 00893 emit q->advancedStatus( statusMap ); 00894 } 00895 mUnemittedProgress = 0; 00896 mUnemittedAdvancedStatus.clear(); 00897 } 00898 00899 void ResourceBasePrivate::slotPercent( KJob *job, unsigned long percent ) 00900 { 00901 Q_Q( ResourceBase ); 00902 00903 mUnemittedProgress = percent; 00904 00905 const Collection collection = job->property( "collection" ).value<Collection>(); 00906 if ( collection.isValid() ) { 00907 QVariantMap statusMap; 00908 statusMap.insert( QLatin1String( "key" ), QString::fromLatin1( "collectionSyncProgress" ) ); 00909 statusMap.insert( QLatin1String( "collectionId" ), collection.id() ); 00910 statusMap.insert( QLatin1String( "percent" ), static_cast<unsigned int>( percent ) ); 00911 00912 mUnemittedAdvancedStatus[collection.id()] = statusMap; 00913 } 00914 // deliver completion right away, intermediate progress at 1s intervals 00915 if ( percent == 100 ) { 00916 mProgressEmissionCompressor.stop(); 00917 slotDelayedEmitProgress(); 00918 } else if ( !mProgressEmissionCompressor.isActive() ) { 00919 mProgressEmissionCompressor.start(); 00920 } 00921 } 00922 00923 void ResourceBase::setHierarchicalRemoteIdentifiersEnabled( bool enable ) 00924 { 00925 Q_D( ResourceBase ); 00926 d->mHierarchicalRid = enable; 00927 } 00928 00929 void ResourceBase::scheduleCustomTask( QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority ) 00930 { 00931 Q_D( ResourceBase ); 00932 d->scheduler->scheduleCustomTask( receiver, method, argument, priority ); 00933 } 00934 00935 void ResourceBase::taskDone() 00936 { 00937 Q_D( ResourceBase ); 00938 d->scheduler->taskDone(); 00939 } 00940 00941 void ResourceBase::retrieveCollectionAttributes( const Collection &collection ) 00942 { 00943 collectionAttributesRetrieved( collection ); 00944 } 00945 00946 void Akonadi::ResourceBase::abortActivity() 00947 { 00948 00949 } 00950 00951 void ResourceBase::setItemTransactionMode(ItemSync::TransactionMode mode) 00952 { 00953 Q_D( ResourceBase ); 00954 d->mItemTransactionMode = mode; 00955 } 00956 00957 void ResourceBase::setItemSynchronizationFetchScope(const ItemFetchScope& fetchScope) 00958 { 00959 Q_D( ResourceBase ); 00960 if ( !d->mItemSyncFetchScope ) 00961 d->mItemSyncFetchScope = new ItemFetchScope; 00962 *(d->mItemSyncFetchScope) = fetchScope; 00963 } 00964 00965 #include "resourcebase.moc" 00966 #include "moc_resourcebase.cpp"