sort.h

00001 /***************************************************************************
00002  *  include/stxxl/bits/algo/sort.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2003 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00007  *  Copyright (C) 2006 Johannes Singler <singler@ira.uka.de>
00008  *  Copyright (C) 2008 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00009  *
00010  *  Distributed under the Boost Software License, Version 1.0.
00011  *  (See accompanying file LICENSE_1_0.txt or copy at
00012  *  http://www.boost.org/LICENSE_1_0.txt)
00013  **************************************************************************/
00014 
00015 #ifndef STXXL_SORT_HEADER
00016 #define STXXL_SORT_HEADER
00017 
00018 #include <list>
00019 #include <functional>
00020 
00021 #include <stxxl/bits/mng/mng.h>
00022 #include <stxxl/bits/common/rand.h>
00023 #include <stxxl/bits/mng/adaptor.h>
00024 #include <stxxl/bits/common/simple_vector.h>
00025 #include <stxxl/bits/common/switch.h>
00026 #include <stxxl/bits/common/settings.h>
00027 #include <stxxl/bits/mng/block_alloc_interleaved.h>
00028 #include <stxxl/bits/algo/intksort.h>
00029 #include <stxxl/bits/algo/adaptor.h>
00030 #include <stxxl/bits/algo/async_schedule.h>
00031 #include <stxxl/bits/mng/block_prefetcher.h>
00032 #include <stxxl/bits/mng/buf_writer.h>
00033 #include <stxxl/bits/algo/run_cursor.h>
00034 #include <stxxl/bits/algo/losertree.h>
00035 #include <stxxl/bits/algo/inmemsort.h>
00036 #include <stxxl/bits/parallel.h>
00037 
00038 
00039 //#define SORT_OPTIMAL_PREFETCHING
00040 //#define INTERLEAVED_ALLOC
00041 //#define STXXL_CHECK_ORDER_IN_SORTS
00042 
00043 __STXXL_BEGIN_NAMESPACE
00044 
00047 
00048 
00051 namespace sort_local
00052 {
00053     template <typename BIDTp_, typename ValTp_>
00054     struct trigger_entry
00055     {
00056         typedef BIDTp_ bid_type;
00057         typedef ValTp_ value_type;
00058 
00059         bid_type bid;
00060         value_type value;
00061 
00062         operator bid_type ()
00063         {
00064             return bid;
00065         }
00066     };
00067 
00068     template <typename BIDTp_, typename ValTp_, typename ValueCmp_>
00069     struct trigger_entry_cmp : public std::binary_function<trigger_entry<BIDTp_, ValTp_>, trigger_entry<BIDTp_, ValTp_>, bool>
00070     {
00071         typedef trigger_entry<BIDTp_, ValTp_> trigger_entry_type;
00072         ValueCmp_ cmp;
00073         trigger_entry_cmp(ValueCmp_ c) : cmp(c) { }
00074         trigger_entry_cmp(const trigger_entry_cmp & a) : cmp(a.cmp) { }
00075         bool operator () (const trigger_entry_type & a, const trigger_entry_type & b) const
00076         {
00077             return cmp(a.value, b.value);
00078         }
00079     };
00080 
00081     template <typename block_type,
00082               typename prefetcher_type,
00083               typename value_cmp>
00084     struct run_cursor2_cmp
00085     {
00086         typedef run_cursor2<block_type, prefetcher_type> cursor_type;
00087         value_cmp cmp;
00088 
00089         run_cursor2_cmp(value_cmp c) : cmp(c) { }
00090         run_cursor2_cmp(const run_cursor2_cmp & a) : cmp(a.cmp) { }
00091         inline bool operator () (const cursor_type & a, const cursor_type & b)
00092         {
00093             if (UNLIKELY(b.empty()))
00094                 return true;
00095             // sentinel emulation
00096             if (UNLIKELY(a.empty()))
00097                 return false;
00098             //sentinel emulation
00099 
00100             return (cmp(a.current(), b.current()));
00101         }
00102     };
00103 
00104     template <typename block_type, typename bid_type>
00105     struct read_next_after_write_completed
00106     {
00107         block_type * block;
00108         bid_type bid;
00109         request_ptr * req;
00110         void operator () (request * /*completed_req*/)
00111         {
00112             * req = block->read(bid);
00113         }
00114     };
00115 
00116 
00117     template <
00118         typename block_type,
00119         typename run_type,
00120         typename input_bid_iterator,
00121         typename value_cmp>
00122     void
00123     create_runs(
00124         input_bid_iterator it,
00125         run_type ** runs,
00126         int_type nruns,
00127         int_type _m,
00128         value_cmp cmp)
00129     {
00130         typedef typename block_type::value_type type;
00131         typedef typename block_type::bid_type bid_type;
00132         STXXL_VERBOSE1("stxxl::create_runs nruns=" << nruns << " m=" << _m);
00133 
00134         int_type m2 = _m / 2;
00135         block_manager * bm = block_manager::get_instance();
00136         block_type * Blocks1 = new block_type[m2];
00137         block_type * Blocks2 = new block_type[m2];
00138         bid_type * bids1 = new bid_type[m2];
00139         bid_type * bids2 = new bid_type[m2];
00140         request_ptr * read_reqs1 = new request_ptr[m2];
00141         request_ptr * read_reqs2 = new request_ptr[m2];
00142         request_ptr * write_reqs = new request_ptr[m2];
00143         read_next_after_write_completed<block_type, bid_type> * next_run_reads =
00144             new read_next_after_write_completed<block_type, bid_type>[m2];
00145 
00146         disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00147 
00148         int_type i;
00149         int_type run_size = 0, next_run_size = 0;
00150 
00151         assert(nruns >= 2);
00152 
00153         run_size = runs[0]->size();
00154         assert(run_size == m2);
00155 
00156         for (i = 0; i < run_size; ++i)
00157         {
00158             STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks1[i].elem));
00159             bids1[i] = *(it++);
00160             read_reqs1[i] = Blocks1[i].read(bids1[i]);
00161         }
00162 
00163         run_size = runs[1]->size();
00164 
00165         for (i = 0; i < run_size; ++i)
00166         {
00167             STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks2[i].elem));
00168             bids2[i] = *(it++);
00169             read_reqs2[i] = Blocks2[i].read(bids2[i]);
00170         }
00171 
00172         for (int_type k = 0; k < nruns - 1; ++k)
00173         {
00174             run_type * run = runs[k];
00175             run_size = run->size();
00176             assert(run_size == m2);
00177             next_run_size = runs[k + 1]->size();
00178             assert((next_run_size == m2) || (next_run_size <= m2 && k == nruns - 2));
00179 
00180             STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1");
00181             wait_all(read_reqs1, run_size);
00182             STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1");
00183             for (i = 0; i < run_size; ++i)
00184                 bm->delete_block(bids1[i]);
00185 
00186             if (block_type::has_filler)
00187                 std::sort(
00188                     ArrayOfSequencesIterator<
00189                         block_type, typename block_type::value_type, block_type::size
00190                         >(Blocks1, 0),
00191                     ArrayOfSequencesIterator<
00192                         block_type, typename block_type::value_type, block_type::size
00193                         >(Blocks1, run_size * block_type::size),
00194                     cmp);
00195             else
00196                 std::sort(Blocks1[0].elem, Blocks1[run_size].elem, cmp);
00197 
00198 
00199             STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00200             if (k > 0)
00201                 wait_all(write_reqs, m2);
00202             STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00203 
00204             int_type runplus2size = (k < nruns - 2) ? runs[k + 2]->size() : 0;
00205             for (i = 0; i < m2; ++i)
00206             {
00207                 STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem));
00208                 (*run)[i].value = Blocks1[i][0];
00209                 if (i >= runplus2size) {
00210                     write_reqs[i] = Blocks1[i].write((*run)[i].bid);
00211                 }
00212                 else
00213                 {
00214                     next_run_reads[i].block = Blocks1 + i;
00215                     next_run_reads[i].req = read_reqs1 + i;
00216                     bids1[i] = next_run_reads[i].bid = *(it++);
00217                     write_reqs[i] = Blocks1[i].write((*run)[i].bid, next_run_reads[i]);
00218                 }
00219             }
00220             std::swap(Blocks1, Blocks2);
00221             std::swap(bids1, bids2);
00222             std::swap(read_reqs1, read_reqs2);
00223         }
00224 
00225         run_type * run = runs[nruns - 1];
00226         run_size = run->size();
00227         STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1");
00228         wait_all(read_reqs1, run_size);
00229         STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1");
00230         for (i = 0; i < run_size; ++i)
00231             bm->delete_block(bids1[i]);
00232 
00233         if (block_type::has_filler) {
00234             std::sort(
00235                 ArrayOfSequencesIterator<
00236                     block_type, typename block_type::value_type, block_type::size
00237                     >(Blocks1, 0),
00238                 ArrayOfSequencesIterator<
00239                     block_type, typename block_type::value_type, block_type::size
00240                     >(Blocks1, run_size * block_type::size),
00241                 cmp);
00242         } else {
00243             std::sort(Blocks1[0].elem, Blocks1[run_size].elem, cmp);
00244         }
00245 
00246         STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00247         wait_all(write_reqs, m2);
00248         STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00249 
00250         for (i = 0; i < run_size; ++i)
00251         {
00252             STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem));
00253             (*run)[i].value = Blocks1[i][0];
00254             write_reqs[i] = Blocks1[i].write((*run)[i].bid);
00255         }
00256 
00257         STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00258         wait_all(write_reqs, run_size);
00259         STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00260 
00261         delete[] Blocks1;
00262         delete[] Blocks2;
00263         delete[] bids1;
00264         delete[] bids2;
00265         delete[] read_reqs1;
00266         delete[] read_reqs2;
00267         delete[] write_reqs;
00268         delete[] next_run_reads;
00269     }
00270 
00271 
00272     template <typename block_type, typename run_type, typename value_cmp>
00273     bool check_sorted_runs(run_type ** runs,
00274                            unsigned_type nruns,
00275                            unsigned_type m,
00276                            value_cmp cmp)
00277     {
00278         typedef typename block_type::value_type value_type;
00279 
00280         //STXXL_VERBOSE1("check_sorted_runs  Runs: "<<nruns);
00281         STXXL_MSG("check_sorted_runs  Runs: " << nruns);
00282         unsigned_type irun = 0;
00283         for (irun = 0; irun < nruns; ++irun)
00284         {
00285             const unsigned_type nblocks_per_run = runs[irun]->size();
00286             unsigned_type blocks_left = nblocks_per_run;
00287             block_type * blocks = new block_type[m];
00288             request_ptr * reqs = new request_ptr[m];
00289             value_type last = cmp.min_value();
00290 
00291             for (unsigned_type off = 0; off < nblocks_per_run; off += m)
00292             {
00293                 const unsigned_type nblocks = STXXL_MIN(blocks_left, m);
00294                 const unsigned_type nelements = nblocks * block_type::size;
00295                 blocks_left -= nblocks;
00296 
00297                 for (unsigned_type j = 0; j < nblocks; ++j)
00298                 {
00299                     reqs[j] = blocks[j].read((*runs[irun])[j + off].bid);
00300                 }
00301                 wait_all(reqs, reqs + nblocks);
00302 
00303                 if (off && cmp(blocks[0][0], last))
00304                 {
00305                     STXXL_MSG("check_sorted_runs  wrong first value in the run " << irun);
00306                     STXXL_MSG(" first value: " << blocks[0][0]);
00307                     STXXL_MSG(" last  value: " << last);
00308                     for (unsigned_type k = 0; k < block_type::size; ++k)
00309                         STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k]);
00310 
00311                     return false;
00312                 }
00313 
00314                 for (unsigned_type j = 0; j < nblocks; ++j)
00315                 {
00316                     if (!(blocks[j][0] == (*runs[irun])[j + off].value))
00317                     {
00318                         STXXL_MSG("check_sorted_runs  wrong trigger in the run " << irun << " block " << (j + off));
00319                         STXXL_MSG("                   trigger value: " << (*runs[irun])[j + off].value);
00320                         STXXL_MSG("Data in the block:");
00321                         for (unsigned_type k = 0; k < block_type::size; ++k)
00322                             STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k]);
00323 
00324                         STXXL_MSG("BIDS:");
00325                         for (unsigned_type k = 0; k < nblocks; ++k)
00326                         {
00327                             if (k == j)
00328                                 STXXL_MSG("Bad one comes next.");
00329                             STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
00330                         }
00331 
00332                         return false;
00333                     }
00334                 }
00335                 if (!stxxl::is_sorted(
00336                         ArrayOfSequencesIterator<
00337                             block_type, typename block_type::value_type, block_type::size
00338                             >(blocks, 0),
00339                         ArrayOfSequencesIterator<
00340                             block_type, typename block_type::value_type, block_type::size
00341                             >(blocks, nelements),
00342                         cmp))
00343                 {
00344                     STXXL_MSG("check_sorted_runs  wrong order in the run " << irun);
00345                     STXXL_MSG("Data in blocks:");
00346                     for (unsigned_type j = 0; j < nblocks; ++j)
00347                     {
00348                         for (unsigned_type k = 0; k < block_type::size; ++k)
00349                             STXXL_MSG("     Element " << k << " in block " << (j + off) << " is :" << blocks[j][k]);
00350                     }
00351                     STXXL_MSG("BIDS:");
00352                     for (unsigned_type k = 0; k < nblocks; ++k)
00353                     {
00354                         STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
00355                     }
00356 
00357                     return false;
00358                 }
00359 
00360                 last = blocks[nblocks - 1][block_type::size - 1];
00361             }
00362 
00363             assert(blocks_left == 0);
00364             delete[] reqs;
00365             delete[] blocks;
00366         }
00367 
00368         return true;
00369     }
00370 
00371 
00372     template <typename block_type, typename run_type, typename value_cmp>
00373     void merge_runs(run_type ** in_runs, int_type nruns, run_type * out_run, unsigned_type _m, value_cmp cmp
00374                     )
00375     {
00376         typedef typename block_type::bid_type bid_type;
00377         typedef typename block_type::value_type value_type;
00378         typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
00379         typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
00380         typedef run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
00381 
00382         int_type i;
00383         run_type consume_seq(out_run->size());
00384 
00385         int_type * prefetch_seq = new int_type[out_run->size()];
00386 
00387         typename run_type::iterator copy_start = consume_seq.begin();
00388         for (i = 0; i < nruns; i++)
00389         {
00390             // TODO: try to avoid copy
00391             copy_start = std::copy(
00392                 in_runs[i]->begin(),
00393                 in_runs[i]->end(),
00394                 copy_start);
00395         }
00396 
00397         std::stable_sort(consume_seq.begin(), consume_seq.end(),
00398                          trigger_entry_cmp<bid_type, value_type, value_cmp>(cmp));
00399 
00400         int_type disks_number = config::get_instance()->disks_number();
00401 
00402 #ifdef PLAY_WITH_OPT_PREF
00403         const int_type n_write_buffers = 4 * disks_number;
00404 #else
00405         const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (3 * (int_type(_m) - nruns) / 4));
00406         const int_type n_write_buffers = STXXL_MAX(2 * disks_number, int_type(_m) - nruns - n_prefetch_buffers);
00407  #ifdef SORT_OPTIMAL_PREFETCHING
00408         // heuristic
00409         const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10;
00410  #endif
00411 #endif
00412 
00413 #ifdef SORT_OPTIMAL_PREFETCHING
00414         compute_prefetch_schedule(
00415             consume_seq,
00416             prefetch_seq,
00417             n_opt_prefetch_buffers,
00418             disks_number);
00419 #else
00420         for (i = 0; i < out_run->size(); i++)
00421             prefetch_seq[i] = i;
00422 
00423 #endif
00424 
00425         prefetcher_type prefetcher(consume_seq.begin(),
00426                                    consume_seq.end(),
00427                                    prefetch_seq,
00428                                    nruns + n_prefetch_buffers);
00429 
00430         buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2);
00431 
00432         int_type out_run_size = out_run->size();
00433 
00434         block_type * out_buffer = writer.get_free_block();
00435 
00436 //If parallelism is activated, one can still fall back to the
00437 //native merge routine by setting stxxl::SETTINGS::native_merge= true, //otherwise, it is used anyway.
00438 
00439         if (do_parallel_merge())
00440         {
00441 #if STXXL_PARALLEL_MULTIWAY_MERGE
00442 
00443 // begin of STL-style merging
00444 
00445             typedef stxxl::int64 diff_type;
00446             typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
00447             typedef typename std::vector<sequence>::size_type seqs_size_type;
00448             std::vector<sequence> seqs(nruns);
00449             std::vector<block_type *> buffers(nruns);
00450 
00451             for (int_type i = 0; i < nruns; i++)                //initialize sequences
00452             {
00453                 buffers[i] = prefetcher.pull_block();           //get first block of each run
00454                 seqs[i] = std::make_pair(buffers[i]->begin(), buffers[i]->end());
00455                 //this memory location stays the same, only the data is exchanged
00456             }
00457 
00458  #ifdef STXXL_CHECK_ORDER_IN_SORTS
00459             value_type last_elem = cmp.min_value();
00460  #endif
00461 
00462             for (int_type j = 0; j < out_run_size; ++j)                 //for the whole output run, out_run_size is in blocks
00463             {
00464                 diff_type rest = block_type::size;                      //elements still to merge for this output block
00465 
00466                 STXXL_VERBOSE1("output block " << j);
00467                 do {
00468                     value_type * min_last_element = NULL;               //no element found yet
00469                     diff_type total_size = 0;
00470 
00471                     for (seqs_size_type i = 0; i < seqs.size(); i++)
00472                     {
00473                         if (seqs[i].first == seqs[i].second)
00474                             continue; //run empty
00475 
00476                         if (min_last_element == NULL)
00477                             min_last_element = &(*(seqs[i].second - 1));
00478                         else
00479                             min_last_element = cmp(*min_last_element, *(seqs[i].second - 1)) ? min_last_element : &(*(seqs[i].second - 1));
00480 
00481                         total_size += seqs[i].second - seqs[i].first;
00482                         STXXL_VERBOSE1("last " << *(seqs[i].second - 1) << " block size " << (seqs[i].second - seqs[i].first));
00483                     }
00484 
00485                     assert(min_last_element != NULL);           //there must be some element
00486 
00487                     STXXL_VERBOSE1("min_last_element " << min_last_element << " total size " << total_size + (block_type::size - rest));
00488 
00489                     diff_type less_equal_than_min_last = 0;
00490                     //locate this element in all sequences
00491                     for (seqs_size_type i = 0; i < seqs.size(); i++)
00492                     {
00493                         if (seqs[i].first == seqs[i].second)
00494                             continue; //empty subsequence
00495 
00496                         typename block_type::iterator position = std::upper_bound(seqs[i].first, seqs[i].second, *min_last_element, cmp);
00497                         STXXL_VERBOSE1("greater equal than " << position - seqs[i].first);
00498                         less_equal_than_min_last += position - seqs[i].first;
00499                     }
00500 
00501                     STXXL_VERBOSE1("finished loop");
00502 
00503                     ptrdiff_t output_size = (std::min)(less_equal_than_min_last, rest);         //at most rest elements
00504 
00505                     STXXL_VERBOSE1("before merge" << output_size);
00506 
00507                     stxxl::parallel::multiway_merge(seqs.begin(), seqs.end(), out_buffer->end() - rest, cmp, output_size);
00508                     //sequence iterators are progressed appropriately
00509 
00510                     STXXL_VERBOSE1("after merge");
00511 
00512                     (*out_run)[j].value = (*out_buffer)[0];                     //save smallest value
00513 
00514                     rest -= output_size;
00515 
00516                     STXXL_VERBOSE1("so long");
00517 
00518                     for (seqs_size_type i = 0; i < seqs.size(); i++)
00519                     {
00520                         if (seqs[i].first == seqs[i].second)                                    //run empty
00521                         {
00522                             if (prefetcher.block_consumed(buffers[i]))
00523                             {
00524                                 seqs[i].first = buffers[i]->begin();                            //reset iterator
00525                                 seqs[i].second = buffers[i]->end();
00526                                 STXXL_VERBOSE1("block ran empty " << i);
00527                             }
00528                             else
00529                             {
00530                                 seqs.erase(seqs.begin() + i);                                   //remove this sequence
00531                                 buffers.erase(buffers.begin() + i);
00532                                 STXXL_VERBOSE1("seq removed " << i);
00533                             }
00534                         }
00535                     }
00536                 } while (rest > 0 && seqs.size() > 0);
00537 
00538  #ifdef STXXL_CHECK_ORDER_IN_SORTS
00539                 if (!stxxl::is_sorted(out_buffer->begin(), out_buffer->end(), cmp))
00540                 {
00541                     for (value_type * i = out_buffer->begin() + 1; i != out_buffer->end(); i++)
00542                         if (cmp(*i, *(i - 1)))
00543                         {
00544                             STXXL_VERBOSE1("Error at position " << (i - out_buffer->begin()));
00545                         }
00546                     assert(false);
00547                 }
00548 
00549                 if (j > 0) //do not check in first iteration
00550                     assert(cmp((*out_buffer)[0], last_elem) == false);
00551 
00552                 last_elem = (*out_buffer)[block_type::size - 1];
00553  #endif
00554 
00555 
00556                 out_buffer = writer.write(out_buffer, (*out_run)[j].bid);
00557             }
00558 
00559 // end of STL-style merging
00560 
00561 #else
00562             assert(false);
00563 #endif
00564         }
00565         else
00566         {
00567 // begin of native merging procedure
00568 
00569             loser_tree<run_cursor_type, run_cursor2_cmp_type, block_type::size>
00570             losers(&prefetcher, nruns, run_cursor2_cmp_type(cmp));
00571 
00572 #ifdef STXXL_CHECK_ORDER_IN_SORTS
00573             value_type last_elem = cmp.min_value();
00574 #endif
00575 
00576             for (i = 0; i < out_run_size; ++i)
00577             {
00578                 losers.multi_merge(out_buffer->elem);
00579                 (*out_run)[i].value = *(out_buffer->elem);
00580 
00581 #ifdef STXXL_CHECK_ORDER_IN_SORTS
00582                 assert(stxxl::is_sorted(
00583                            out_buffer->begin(),
00584                            out_buffer->end(),
00585                            cmp));
00586 
00587                 if (i)
00588                     assert(cmp(*(out_buffer->elem), last_elem) == false);
00589 
00590                 last_elem = (*out_buffer).elem[block_type::size - 1];
00591 #endif
00592 
00593                 out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
00594             }
00595 
00596 // end of native merging procedure
00597         }
00598 
00599         delete[] prefetch_seq;
00600 
00601         block_manager * bm = block_manager::get_instance();
00602         for (i = 0; i < nruns; ++i)
00603         {
00604             unsigned_type sz = in_runs[i]->size();
00605             for (unsigned_type j = 0; j < sz; ++j)
00606                 bm->delete_block((*in_runs[i])[j].bid);
00607 
00608 
00609             delete in_runs[i];
00610         }
00611     }
00612 
00613 
00614     template <typename block_type,
00615               typename alloc_strategy,
00616               typename input_bid_iterator,
00617               typename value_cmp>
00618     simple_vector<trigger_entry<typename block_type::bid_type, typename block_type::value_type> > *
00619     sort_blocks(input_bid_iterator input_bids,
00620                 unsigned_type _n,
00621                 unsigned_type _m,
00622                 value_cmp cmp
00623                 )
00624     {
00625         typedef typename block_type::value_type type;
00626         typedef typename block_type::bid_type bid_type;
00627         typedef trigger_entry<bid_type, type> trigger_entry_type;
00628         typedef simple_vector<trigger_entry_type> run_type;
00629         typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy;
00630 
00631         unsigned_type m2 = _m / 2;
00632         unsigned_type full_runs = _n / m2;
00633         unsigned_type partial_runs = ((_n % m2) ? 1 : 0);
00634         unsigned_type nruns = full_runs + partial_runs;
00635         unsigned_type i;
00636 
00637         config * cfg = config::get_instance();
00638         block_manager * mng = block_manager::get_instance();
00639         const unsigned_type ndisks = cfg->disks_number();
00640 
00641         //STXXL_VERBOSE ("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+" << partial_runs);
00642 
00643         double begin = timestamp(), after_runs_creation, end;
00644 
00645         run_type ** runs = new run_type *[nruns];
00646 
00647         for (i = 0; i < full_runs; i++)
00648             runs[i] = new run_type(m2);
00649 
00650 
00651         if (partial_runs)
00652             runs[i] = new run_type(_n - full_runs * m2);
00653 
00654 
00655         for (i = 0; i < nruns; ++i)
00656         {
00657             // FIXME: why has an alloc_strategy to take two arguments disk_index.begin(), disk_index.end() ???
00658             mng->new_blocks(alloc_strategy(0, ndisks),
00659                             trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->begin()),
00660                             trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->end()));
00661         }
00662 
00663         sort_local::create_runs<block_type,
00664                                 run_type,
00665                                 input_bid_iterator,
00666                                 value_cmp>(input_bids, runs, nruns, _m, cmp);
00667 
00668         after_runs_creation = timestamp();
00669 
00670         double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
00671 
00672         disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00673 
00674         // Optimal merging: merge r = pow(nruns,1/ceil(log(nruns)/log(m))) runs at once
00675 
00676         const int_type merge_factor = static_cast<int_type>(ceil(pow(nruns, 1. / ceil(log(double(nruns)) /
00677                                                                                       log(double(_m))))));
00678         run_type ** new_runs;
00679 
00680         while (nruns > 1)
00681         {
00682             int_type new_nruns = div_and_round_up(nruns, merge_factor);
00683             STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
00684                           " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns);
00685 
00686             new_runs = new run_type *[new_nruns];
00687 
00688             int_type runs_left = nruns;
00689             int_type cur_out_run = 0;
00690             int_type blocks_in_new_run = 0;
00691 
00692             while (runs_left > 0)
00693             {
00694                 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
00695                 blocks_in_new_run = 0;
00696                 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
00697                     blocks_in_new_run += runs[i]->size();
00698 
00699                 // allocate run
00700                 new_runs[cur_out_run++] = new run_type(blocks_in_new_run);
00701                 runs_left -= runs2merge;
00702             }
00703             // allocate blocks for the new runs
00704             if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && (input_bids->storage->get_id() == -1))
00705             {
00706                 // if we sort a file we can reuse the input bids for the output
00707                 input_bid_iterator cur = input_bids;
00708                 for (int_type i = 0; cur != (input_bids + _n); ++cur)
00709                 {
00710                     (*new_runs[0])[i++].bid = *cur;
00711                 }
00712 
00713                 bid_type & firstBID = (*new_runs[0])[0].bid;
00714                 if (firstBID.storage->get_id() != -1)
00715                 {
00716                     // the first block does not belong to the file
00717                     // need to reallocate it
00718                     mng->new_blocks(FR(), &firstBID, (&firstBID) + 1);
00719                 }
00720                 bid_type & lastBID = (*new_runs[0])[_n - 1].bid;
00721                 if (lastBID.storage->get_id() != -1)
00722                 {
00723                     // the first block does not belong to the file
00724                     // need to reallocate it
00725                     mng->new_blocks(FR(), &lastBID, (&lastBID) + 1);
00726                 }
00727             }
00728             else
00729             {
00730                 mng->new_blocks(interleaved_alloc_strategy(new_nruns, 0, ndisks),
00731                                 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run),
00732                                 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run));
00733             }
00734             // merge all
00735             runs_left = nruns;
00736             cur_out_run = 0;
00737             while (runs_left > 0)
00738             {
00739                 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
00740 #ifdef STXXL_CHECK_ORDER_IN_SORTS
00741                 assert((check_sorted_runs<block_type, run_type, value_cmp>(runs + nruns - runs_left, runs2merge, m2, cmp)));
00742 #endif
00743                 STXXL_VERBOSE("Merging " << runs2merge << " runs");
00744                 merge_runs<block_type, run_type>(runs + nruns - runs_left,
00745                                                  runs2merge, *(new_runs + (cur_out_run++)), _m, cmp
00746                                                  );
00747                 runs_left -= runs2merge;
00748             }
00749 
00750             nruns = new_nruns;
00751             delete[] runs;
00752             runs = new_runs;
00753         }
00754 
00755         run_type * result = *runs;
00756         delete[] runs;
00757 
00758         end = timestamp();
00759 
00760         STXXL_VERBOSE("Elapsed time        : " << end - begin << " s. Run creation time: " <<
00761                       after_runs_creation - begin << " s");
00762         STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s");
00763         STXXL_VERBOSE(*stats::get_instance());
00764         UNUSED(begin + io_wait_after_rf);
00765 
00766         return result;
00767     }
00768 }
00769 
00770 
00778 template <typename ExtIterator_, typename StrictWeakOrdering_>
00779 void sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp, unsigned_type M)
00780 {
00781     typedef simple_vector<sort_local::trigger_entry<typename ExtIterator_::bid_type,
00782                                                     typename ExtIterator_::vector_type::value_type> > run_type;
00783 
00784     typedef typename ExtIterator_::vector_type::value_type value_type;
00785     typedef typename ExtIterator_::block_type block_type;
00786 
00787     // verify strict weak ordering of the sentinels
00788     assert(!cmp(cmp.min_value(), cmp.min_value()));
00789     assert(cmp(cmp.min_value(), cmp.max_value()));
00790     assert(!cmp(cmp.max_value(), cmp.max_value()));
00791 
00792     unsigned_type n = 0;
00793 
00794     block_manager * mng = block_manager::get_instance();
00795 
00796     first.flush();
00797 
00798     if ((last - first) * sizeof(value_type) * sort_memory_usage_factor() < M)
00799     {
00800         stl_in_memory_sort(first, last, cmp);
00801     }
00802     else
00803     {
00804         assert(2 * block_type::raw_size * sort_memory_usage_factor() <= M);
00805 
00806         if (first.block_offset())
00807         {
00808             if (last.block_offset())              // first and last element are
00809             // not the first elements of their block
00810             {
00811                 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
00812                 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
00813                 typename ExtIterator_::bid_type first_bid, last_bid;
00814                 request_ptr req;
00815 
00816                 req = first_block->read(*first.bid());
00817                 mng->new_blocks(FR(), &first_bid, (&first_bid) + 1);                // try to overlap
00818                 mng->new_blocks(FR(), &last_bid, (&last_bid) + 1);
00819                 req->wait();
00820 
00821 
00822                 req = last_block->read(*last.bid());
00823 
00824                 unsigned_type i = 0;
00825                 for ( ; i < first.block_offset(); ++i)
00826                 {
00827                     first_block->elem[i] = cmp.min_value();
00828                 }
00829 
00830                 req->wait();
00831 
00832 
00833                 req = first_block->write(first_bid);
00834                 for (i = last.block_offset(); i < block_type::size; ++i)
00835                 {
00836                     last_block->elem[i] = cmp.max_value();
00837                 }
00838 
00839                 req->wait();
00840 
00841 
00842                 req = last_block->write(last_bid);
00843 
00844                 n = last.bid() - first.bid() + 1;
00845 
00846                 std::swap(first_bid, *first.bid());
00847                 std::swap(last_bid, *last.bid());
00848 
00849                 req->wait();
00850 
00851 
00852                 delete first_block;
00853                 delete last_block;
00854 
00855                 run_type * out =
00856                     sort_local::sort_blocks<
00857                         typename ExtIterator_::block_type,
00858                         typename ExtIterator_::vector_type::alloc_strategy,
00859                         typename ExtIterator_::bids_container_iterator>
00860                                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
00861 
00862 
00863                 first_block = new typename ExtIterator_::block_type;
00864                 last_block = new typename ExtIterator_::block_type;
00865                 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
00866                 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
00867                 request_ptr * reqs = new request_ptr[2];
00868 
00869                 reqs[0] = first_block->read(first_bid);
00870                 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
00871 
00872                 reqs[0]->wait();
00873                 reqs[1]->wait();
00874 
00875                 reqs[0] = last_block->read(last_bid);
00876                 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
00877 
00878                 for (i = first.block_offset(); i < block_type::size; i++)
00879                 {
00880                     first_block->elem[i] = sorted_first_block->elem[i];
00881                 }
00882 
00883                 reqs[0]->wait();
00884                 reqs[1]->wait();
00885 
00886                 req = first_block->write(first_bid);
00887 
00888                 for (i = 0; i < last.block_offset(); ++i)
00889                 {
00890                     last_block->elem[i] = sorted_last_block->elem[i];
00891                 }
00892 
00893                 req->wait();
00894 
00895                 req = last_block->write(last_bid);
00896 
00897                 mng->delete_block(out->begin()->bid);
00898                 mng->delete_block((*out)[out->size() - 1].bid);
00899 
00900                 *first.bid() = first_bid;
00901                 *last.bid() = last_bid;
00902 
00903                 typename run_type::iterator it = out->begin();
00904                 ++it;
00905                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
00906                 ++cur_bid;
00907 
00908                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
00909                 {
00910                     *cur_bid = (*it).bid;
00911                 }
00912 
00913                 delete first_block;
00914                 delete sorted_first_block;
00915                 delete sorted_last_block;
00916                 delete[] reqs;
00917                 delete out;
00918 
00919                 req->wait();
00920 
00921 
00922                 delete last_block;
00923             }
00924             else
00925             {
00926                 // first element is
00927                 // not the first element of its block
00928                 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
00929                 typename ExtIterator_::bid_type first_bid;
00930                 request_ptr req;
00931 
00932                 req = first_block->read(*first.bid());
00933                 mng->new_blocks(FR(), &first_bid, (&first_bid) + 1);                // try to overlap
00934                 req->wait();
00935 
00936 
00937                 unsigned_type i = 0;
00938                 for ( ; i < first.block_offset(); ++i)
00939                 {
00940                     first_block->elem[i] = cmp.min_value();
00941                 }
00942 
00943                 req = first_block->write(first_bid);
00944 
00945                 n = last.bid() - first.bid();
00946 
00947                 std::swap(first_bid, *first.bid());
00948 
00949                 req->wait();
00950 
00951 
00952                 delete first_block;
00953 
00954                 run_type * out =
00955                     sort_local::sort_blocks<
00956                         typename ExtIterator_::block_type,
00957                         typename ExtIterator_::vector_type::alloc_strategy,
00958                         typename ExtIterator_::bids_container_iterator>
00959                                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
00960 
00961 
00962                 first_block = new typename ExtIterator_::block_type;
00963 
00964                 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
00965 
00966                 request_ptr * reqs = new request_ptr[2];
00967 
00968                 reqs[0] = first_block->read(first_bid);
00969                 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
00970 
00971                 reqs[0]->wait();
00972                 reqs[1]->wait();
00973 
00974                 for (i = first.block_offset(); i < block_type::size; ++i)
00975                 {
00976                     first_block->elem[i] = sorted_first_block->elem[i];
00977                 }
00978 
00979                 req = first_block->write(first_bid);
00980 
00981                 mng->delete_block(out->begin()->bid);
00982 
00983                 *first.bid() = first_bid;
00984 
00985                 typename run_type::iterator it = out->begin();
00986                 ++it;
00987                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
00988                 ++cur_bid;
00989 
00990                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
00991                 {
00992                     *cur_bid = (*it).bid;
00993                 }
00994 
00995                 *cur_bid = (*it).bid;
00996 
00997                 delete sorted_first_block;
00998                 delete[] reqs;
00999                 delete out;
01000 
01001                 req->wait();
01002 
01003                 delete first_block;
01004             }
01005         }
01006         else
01007         {
01008             if (last.block_offset())            // last is
01009             // not the first element of its block
01010             {
01011                 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
01012                 typename ExtIterator_::bid_type last_bid;
01013                 request_ptr req;
01014                 unsigned_type i;
01015 
01016                 req = last_block->read(*last.bid());
01017                 mng->new_blocks(FR(), &last_bid, (&last_bid) + 1);
01018                 req->wait();
01019 
01020 
01021                 for (i = last.block_offset(); i < block_type::size; ++i)
01022                 {
01023                     last_block->elem[i] = cmp.max_value();
01024                 }
01025 
01026                 req = last_block->write(last_bid);
01027 
01028                 n = last.bid() - first.bid() + 1;
01029 
01030                 std::swap(last_bid, *last.bid());
01031 
01032                 req->wait();
01033 
01034 
01035                 delete last_block;
01036 
01037                 run_type * out =
01038                     sort_local::sort_blocks<
01039                         typename ExtIterator_::block_type,
01040                         typename ExtIterator_::vector_type::alloc_strategy,
01041                         typename ExtIterator_::bids_container_iterator>
01042                                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
01043 
01044 
01045                 last_block = new typename ExtIterator_::block_type;
01046                 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
01047                 request_ptr * reqs = new request_ptr[2];
01048 
01049                 reqs[0] = last_block->read(last_bid);
01050                 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
01051 
01052                 reqs[0]->wait();
01053                 reqs[1]->wait();
01054 
01055                 for (i = 0; i < last.block_offset(); ++i)
01056                 {
01057                     last_block->elem[i] = sorted_last_block->elem[i];
01058                 }
01059 
01060                 req = last_block->write(last_bid);
01061 
01062                 mng->delete_block((*out)[out->size() - 1].bid);
01063 
01064                 *last.bid() = last_bid;
01065 
01066                 typename run_type::iterator it = out->begin();
01067                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
01068 
01069                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
01070                 {
01071                     *cur_bid = (*it).bid;
01072                 }
01073 
01074                 delete sorted_last_block;
01075                 delete[] reqs;
01076                 delete out;
01077 
01078                 req->wait();
01079 
01080                 delete last_block;
01081             }
01082             else
01083             {
01084                 // first and last element are first elements of their of blocks
01085                 n = last.bid() - first.bid();
01086 
01087                 run_type * out =
01088                     sort_local::sort_blocks<typename ExtIterator_::block_type,
01089                                             typename ExtIterator_::vector_type::alloc_strategy,
01090                                             typename ExtIterator_::bids_container_iterator>
01091                                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
01092 
01093                 typename run_type::iterator it = out->begin();
01094                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
01095 
01096                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
01097                 {
01098                     *cur_bid = (*it).bid;
01099                 }
01100 
01101                 delete out;
01102             }
01103         }
01104     }
01105 
01106 #ifdef STXXL_CHECK_ORDER_IN_SORTS
01107     assert(stxxl::is_sorted(first, last, cmp));
01108 #endif
01109 }
01110 
01112 
01113 __STXXL_END_NAMESPACE
01114 
01115 #endif // !STXXL_SORT_HEADER
01116 // vim: et:ts=4:sw=4

Generated on Thu Jun 4 10:29:30 2009 for Stxxl by  doxygen 1.4.7