sort_stream.h

00001 /***************************************************************************
00002  *  include/stxxl/bits/stream/sort_stream.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2005 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00007  *  Copyright (C) 2006 Johannes Singler <singler@ira.uka.de>
00008  *
00009  *  Distributed under the Boost Software License, Version 1.0.
00010  *  (See accompanying file LICENSE_1_0.txt or copy at
00011  *  http://www.boost.org/LICENSE_1_0.txt)
00012  **************************************************************************/
00013 
00014 #ifndef STXXL_SORT_STREAM_HEADER
00015 #define STXXL_SORT_STREAM_HEADER
00016 
00017 #ifdef STXXL_BOOST_CONFIG
00018  #include <boost/config.hpp>
00019 #endif
00020 
00021 #include <stxxl/bits/stream/stream.h>
00022 #include <stxxl/sort>
00023 
00024 
00025 __STXXL_BEGIN_NAMESPACE
00026 
00027 namespace stream
00028 {
00031 
00032     template <class ValueType, class TriggerEntryType>
00033     struct sorted_runs
00034     {
00035         typedef TriggerEntryType trigger_entry_type;
00036         typedef ValueType value_type;
00037         typedef typename trigger_entry_type::bid_type bid_type;
00038         typedef stxxl::int64 size_type;
00039         typedef std::vector<trigger_entry_type> run_type;
00040         typedef typed_block<bid_type::size, value_type> block_type;
00041         size_type elements;
00042         std::vector<run_type> runs;
00043         std::vector<unsigned_type> runs_sizes;
00044 
00045         // Optimization:
00046         // if the input is small such that its total size is
00047         // less than B (block_type::size)
00048         // then input is sorted internally
00049         // and kept in the array "small"
00050         std::vector<ValueType> small_;
00051 
00052         sorted_runs() : elements(0) { }
00053 
00058         void deallocate_blocks()
00059         {
00060             block_manager * bm = block_manager::get_instance();
00061             for (unsigned_type i = 0; i < runs.size(); ++i)
00062                 bm->delete_blocks(
00063                     trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i].begin()),
00064                     trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i].end()));
00065 
00066 
00067             runs.clear();
00068         }
00069     };
00070 
00078     template <
00079         class Input_,
00080         class Cmp_,
00081         unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
00082         class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00083     class runs_creator : private noncopyable
00084     {
00085         Input_ & input;
00086         Cmp_ cmp;
00087 
00088     public:
00089         typedef Cmp_ cmp_type;
00090         typedef typename Input_::value_type value_type;
00091         typedef BID<BlockSize_> bid_type;
00092         typedef typed_block<BlockSize_, value_type> block_type;
00093         typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
00094         typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type;
00095 
00096     private:
00097         typedef typename sorted_runs_type::run_type run_type;
00098         sorted_runs_type result_; // stores the result (sorted runs)
00099         unsigned_type m_;         // memory for internal use in blocks
00100         bool result_computed;     // true result is already computed (used in 'result' method)
00101 
00102         void compute_result();
00103         void sort_run(block_type * run, unsigned_type elements)
00104         {
00105             if (block_type::has_filler)
00106                 std::sort(
00107 #if 1
00108                     ArrayOfSequencesIterator<
00109                         block_type, typename block_type::value_type, block_type::size
00110                         >(run, 0),
00111                     ArrayOfSequencesIterator<
00112                         block_type, typename block_type::value_type, block_type::size
00113                         >(run, elements),
00114 #else
00115                     TwoToOneDimArrayRowAdaptor<
00116                         block_type, value_type, block_type::size
00117                         >(run, 0),
00118                     TwoToOneDimArrayRowAdaptor<
00119                         block_type, value_type, block_type::size
00120                         >(run, elements),
00121 #endif
00122                     cmp);
00123 
00124             else
00125                 std::sort(run[0].elem, run[0].elem + elements, cmp);
00126         }
00127 
00128     public:
00133         runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) :
00134             input(i), cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false)
00135         {
00136             assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use);
00137         }
00138 
00142         const sorted_runs_type & result()
00143         {
00144             if (!result_computed)
00145             {
00146                 compute_result();
00147                 result_computed = true;
00148 #ifdef STXXL_PRINT_STAT_AFTER_RF
00149                 STXXL_MSG(*stats::get_instance());
00150 #endif
00151             }
00152             return result_;
00153         }
00154     };
00155 
00156 
00157     template <class Input_, class Cmp_, unsigned BlockSize_, class AllocStr_>
00158     void runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>::compute_result()
00159     {
00160         unsigned_type i = 0;
00161         unsigned_type m2 = m_ / 2;
00162         const unsigned_type el_in_run = m2 * block_type::size; // # el in a run
00163         STXXL_VERBOSE1("runs_creator::compute_result m2=" << m2);
00164         unsigned_type pos = 0;
00165 
00166 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
00167         block_type * Blocks1 = new block_type[m2 * 2];
00168 #else
00169 #if 0
00170         block_type * Blocks1 = new block_type[1];          // allocate only one block first
00171                                                            // if needed reallocate
00172         while (!input.empty() && pos != block_type::size)
00173         {
00174             Blocks1[pos / block_type::size][pos % block_type::size] = *input;
00175             ++input;
00176             ++pos;
00177         }
00178 #endif
00179 
00180         while (!input.empty() && pos != block_type::size)
00181         {
00182             result_.small_.push_back(*input);
00183             ++input;
00184             ++pos;
00185         }
00186 
00187         block_type * Blocks1;
00188 
00189         if (pos == block_type::size)
00190         {   // enlarge/reallocate Blocks1 array
00191             block_type * NewBlocks = new block_type[m2 * 2];
00192             std::copy(result_.small_.begin(), result_.small_.end(), NewBlocks[0].begin());
00193             result_.small_.clear();
00194             //delete [] Blocks1;
00195             Blocks1 = NewBlocks;
00196         }
00197         else
00198         {
00199             STXXL_VERBOSE1("runs_creator: Small input optimization, input length: " << pos);
00200             result_.elements = pos;
00201             std::sort(result_.small_.begin(), result_.small_.end(), cmp);
00202             return;
00203         }
00204 #endif
00205 
00206         while (!input.empty() && pos != el_in_run)
00207         {
00208             Blocks1[pos / block_type::size][pos % block_type::size] = *input;
00209             ++input;
00210             ++pos;
00211         }
00212 
00213         // sort first run
00214         sort_run(Blocks1, pos);
00215         result_.elements = pos;
00216         if (pos < block_type::size && input.empty()) // small input, do not flush it on the disk(s)
00217         {
00218             STXXL_VERBOSE1("runs_creator: Small input optimization, input length: " << pos);
00219             result_.small_.resize(pos);
00220             std::copy(Blocks1[0].begin(), Blocks1[0].begin() + pos, result_.small_.begin());
00221             delete[] Blocks1;
00222             return;
00223         }
00224 
00225 
00226         block_type * Blocks2 = Blocks1 + m2;
00227         block_manager * bm = block_manager::get_instance();
00228         request_ptr * write_reqs = new request_ptr[m2];
00229         run_type run;
00230 
00231 
00232         unsigned_type cur_run_size = div_and_round_up(pos, block_type::size); // in blocks
00233         run.resize(cur_run_size);
00234         bm->new_blocks(AllocStr_(),
00235                        trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
00236                        trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
00237                        );
00238 
00239         disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00240 
00241         result_.runs_sizes.push_back(pos);
00242 
00243         // fill the rest of the last block with max values
00244         for ( ; pos != el_in_run; ++pos)
00245             Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value();
00246 
00247 
00248         for (i = 0; i < cur_run_size; ++i)
00249         {
00250             run[i].value = Blocks1[i][0];
00251             write_reqs[i] = Blocks1[i].write(run[i].bid);
00252             //STXXL_MSG("BID: "<<run[i].bid<<" val: "<<run[i].value);
00253         }
00254         result_.runs.push_back(run); // #
00255 
00256         if (input.empty())
00257         {
00258             // return
00259             wait_all(write_reqs, write_reqs + cur_run_size);
00260             delete[] write_reqs;
00261             delete[] Blocks1;
00262             return;
00263         }
00264 
00265         STXXL_VERBOSE1("Filling the second part of the allocated blocks");
00266         pos = 0;
00267         while (!input.empty() && pos != el_in_run)
00268         {
00269             Blocks2[pos / block_type::size][pos % block_type::size] = *input;
00270             ++input;
00271             ++pos;
00272         }
00273         result_.elements += pos;
00274 
00275         if (input.empty())
00276         {
00277             // (re)sort internally and return
00278             pos += el_in_run;
00279             sort_run(Blocks1, pos); // sort first an second run together
00280             wait_all(write_reqs, write_reqs + cur_run_size);
00281             bm->delete_blocks(trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
00282                               trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end()));
00283 
00284             cur_run_size = div_and_round_up(pos, block_type::size);
00285             run.resize(cur_run_size);
00286             bm->new_blocks(AllocStr_(),
00287                            trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
00288                            trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
00289                            );
00290 
00291             result_.runs_sizes[0] = pos;
00292             // fill the rest of the last block with max values
00293             for ( ; pos != 2 * el_in_run; ++pos)
00294                 Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value();
00295 
00296 
00297             assert(cur_run_size > m2);
00298 
00299             for (i = 0; i < m2; ++i)
00300             {
00301                 run[i].value = Blocks1[i][0];
00302                 write_reqs[i]->wait();
00303                 write_reqs[i] = Blocks1[i].write(run[i].bid);
00304             }
00305 
00306             request_ptr * write_reqs1 = new request_ptr[cur_run_size - m2];
00307 
00308             for ( ; i < cur_run_size; ++i)
00309             {
00310                 run[i].value = Blocks1[i][0];
00311                 write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
00312             }
00313 
00314             result_.runs[0] = run;
00315 
00316             wait_all(write_reqs, write_reqs + m2);
00317             delete[] write_reqs;
00318             wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
00319             delete[] write_reqs1;
00320 
00321             delete[] Blocks1;
00322 
00323             return;
00324         }
00325 
00326         sort_run(Blocks2, pos);
00327 
00328         cur_run_size = div_and_round_up(pos, block_type::size); // in blocks
00329         run.resize(cur_run_size);
00330         bm->new_blocks(AllocStr_(),
00331                        trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
00332                        trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
00333                        );
00334 
00335         for (i = 0; i < cur_run_size; ++i)
00336         {
00337             run[i].value = Blocks2[i][0];
00338             write_reqs[i]->wait();
00339             write_reqs[i] = Blocks2[i].write(run[i].bid);
00340         }
00341         assert((pos % el_in_run) == 0);
00342 
00343         result_.runs.push_back(run);
00344         result_.runs_sizes.push_back(pos);
00345 
00346         while (!input.empty())
00347         {
00348             pos = 0;
00349             while (!input.empty() && pos != el_in_run)
00350             {
00351                 Blocks1[pos / block_type::size][pos % block_type::size] = *input;
00352                 ++input;
00353                 ++pos;
00354             }
00355             result_.elements += pos;
00356             sort_run(Blocks1, pos);
00357             cur_run_size = div_and_round_up(pos, block_type::size); // in blocks
00358             run.resize(cur_run_size);
00359             bm->new_blocks(AllocStr_(),
00360                            trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
00361                            trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
00362                            );
00363 
00364             result_.runs_sizes.push_back(pos);
00365 
00366             // fill the rest of the last block with max values (occurs only on the last run)
00367             for ( ; pos != el_in_run; ++pos)
00368                 Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value();
00369 
00370 
00371             for (i = 0; i < cur_run_size; ++i)
00372             {
00373                 run[i].value = Blocks1[i][0];
00374                 write_reqs[i]->wait();
00375                 write_reqs[i] = Blocks1[i].write(run[i].bid);
00376             }
00377             result_.runs.push_back(run); // #
00378 
00379             std::swap(Blocks1, Blocks2);
00380         }
00381 
00382         wait_all(write_reqs, write_reqs + m2);
00383         delete[] write_reqs;
00384         delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
00385     }
00386 
00387 
00395     template <class ValueType_>
00396     struct use_push
00397     {
00398         typedef ValueType_ value_type;
00399     };
00400 
00412     template <
00413         class ValueType_,
00414         class Cmp_,
00415         unsigned BlockSize_,
00416         class AllocStr_>
00417     class runs_creator<
00418         use_push<ValueType_>,
00419         Cmp_,
00420         BlockSize_,
00421         AllocStr_>
00422     {
00423         Cmp_ cmp;
00424 
00425     public:
00426         typedef Cmp_ cmp_type;
00427         typedef ValueType_ value_type;
00428         typedef BID<BlockSize_> bid_type;
00429         typedef typed_block<BlockSize_, value_type> block_type;
00430         typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
00431         typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type;
00432 
00433     private:
00434         typedef typename sorted_runs_type::run_type run_type;
00435         sorted_runs_type result_; // stores the result (sorted runs)
00436         unsigned_type m_;         // memory for internal use in blocks
00437 
00438         bool output_requested;    // true after the result() method was called for the first time
00439 
00440         const unsigned_type m2;
00441         const unsigned_type el_in_run;
00442         unsigned_type cur_el;
00443         block_type * Blocks1;
00444         block_type * Blocks2;
00445         request_ptr * write_reqs;
00446         run_type run;
00447 
00448         void sort_run(block_type * run, unsigned_type elements)
00449         {
00450             if (block_type::has_filler)
00451                 std::sort(
00452 #if 1
00453                     ArrayOfSequencesIterator<
00454                         block_type, typename block_type::value_type, block_type::size
00455                         >(run, 0),
00456                     ArrayOfSequencesIterator<
00457                         block_type, typename block_type::value_type, block_type::size
00458                         >(run, elements),
00459 #else
00460                     TwoToOneDimArrayRowAdaptor<
00461                         block_type, value_type, block_type::size
00462                         >(run, 0),
00463                     TwoToOneDimArrayRowAdaptor<
00464                         block_type, value_type, block_type::size
00465                         >(run, elements),
00466 #endif
00467                     cmp);
00468 
00469             else
00470                 std::sort(run[0].elem, run[0].elem + elements, cmp);
00471         }
00472         void finish_result()
00473         {
00474             if (cur_el == 0)
00475                 return;
00476 
00477 
00478             unsigned_type cur_el_reg = cur_el;
00479             sort_run(Blocks1, cur_el_reg);
00480             result_.elements += cur_el_reg;
00481             if (cur_el_reg < unsigned_type(block_type::size) &&
00482                 unsigned_type(result_.elements) == cur_el_reg)         // small input, do not flush it on the disk(s)
00483             {
00484                 STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << cur_el_reg);
00485                 result_.small_.resize(cur_el_reg);
00486                 std::copy(Blocks1[0].begin(), Blocks1[0].begin() + cur_el_reg, result_.small_.begin());
00487                 return;
00488             }
00489 
00490             const unsigned_type cur_run_size = div_and_round_up(cur_el_reg, block_type::size);     // in blocks
00491             run.resize(cur_run_size);
00492             block_manager * bm = block_manager::get_instance();
00493             bm->new_blocks(AllocStr_(),
00494                            trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
00495                            trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
00496                            );
00497 
00498             disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00499 
00500             result_.runs_sizes.push_back(cur_el_reg);
00501 
00502             // fill the rest of the last block with max values
00503             for ( ; cur_el_reg != el_in_run; ++cur_el_reg)
00504                 Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = cmp.max_value();
00505 
00506 
00507             unsigned_type i = 0;
00508             for ( ; i < cur_run_size; ++i)
00509             {
00510                 run[i].value = Blocks1[i][0];
00511                 if (write_reqs[i].get())
00512                     write_reqs[i]->wait();
00513 
00514                 write_reqs[i] = Blocks1[i].write(run[i].bid);
00515             }
00516             result_.runs.push_back(run);
00517 
00518             for (i = 0; i < m2; ++i)
00519                 if (write_reqs[i].get())
00520                     write_reqs[i]->wait();
00521         }
00522         void cleanup()
00523         {
00524             delete[] write_reqs;
00525             delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
00526             write_reqs = NULL;
00527             Blocks1 = Blocks2 = NULL;
00528         }
00529 
00530     public:
00534         runs_creator(Cmp_ c, unsigned_type memory_to_use) :
00535             cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), output_requested(false),
00536             m2(m_ / 2),
00537             el_in_run(m2 * block_type::size),
00538             cur_el(0),
00539             Blocks1(new block_type[m2 * 2]),
00540             Blocks2(Blocks1 + m2),
00541             write_reqs(new request_ptr[m2])
00542         {
00543             assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use);
00544         }
00545 
00546         ~runs_creator()
00547         {
00548             if (!output_requested)
00549                 cleanup();
00550         }
00551 
00554         void push(const value_type & val)
00555         {
00556             assert(output_requested == false);
00557             unsigned_type cur_el_reg = cur_el;
00558             if (cur_el_reg < el_in_run)
00559             {
00560                 Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = val;
00561                 ++cur_el;
00562                 return;
00563             }
00564 
00565             assert(el_in_run == cur_el);
00566             cur_el = 0;
00567 
00568             //sort and store Blocks1
00569             sort_run(Blocks1, el_in_run);
00570             result_.elements += el_in_run;
00571 
00572             const unsigned_type cur_run_size = div_and_round_up(el_in_run, block_type::size);    // in blocks
00573             run.resize(cur_run_size);
00574             block_manager * bm = block_manager::get_instance();
00575             bm->new_blocks(AllocStr_(),
00576                            trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
00577                            trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
00578                            );
00579 
00580             disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00581 
00582             result_.runs_sizes.push_back(el_in_run);
00583 
00584             for (unsigned_type i = 0; i < cur_run_size; ++i)
00585             {
00586                 run[i].value = Blocks1[i][0];
00587                 if (write_reqs[i].get())
00588                     write_reqs[i]->wait();
00589 
00590                 write_reqs[i] = Blocks1[i].write(run[i].bid);
00591             }
00592 
00593             result_.runs.push_back(run);
00594 
00595             std::swap(Blocks1, Blocks2);
00596 
00597             push(val);
00598         }
00599 
00603         const sorted_runs_type & result()
00604         {
00605             if (!output_requested)
00606             {
00607                 finish_result();
00608                 output_requested = true;
00609                 cleanup();
00610 #ifdef STXXL_PRINT_STAT_AFTER_RF
00611                 STXXL_MSG(*stats::get_instance());
00612 #endif
00613             }
00614             return result_;
00615         }
00616     };
00617 
00618 
00625     template <class ValueType_>
00626     struct from_sorted_sequences
00627     {
00628         typedef ValueType_ value_type;
00629     };
00630 
00642     template <
00643         class ValueType_,
00644         class Cmp_,
00645         unsigned BlockSize_,
00646         class AllocStr_>
00647     class runs_creator<
00648         from_sorted_sequences<ValueType_>,
00649         Cmp_,
00650         BlockSize_,
00651         AllocStr_>
00652     {
00653         typedef ValueType_ value_type;
00654         typedef BID<BlockSize_> bid_type;
00655         typedef typed_block<BlockSize_, value_type> block_type;
00656         typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
00657         typedef AllocStr_ alloc_strategy_type;
00658         Cmp_ cmp;
00659 
00660     public:
00661         typedef Cmp_ cmp_type;
00662         typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type;
00663 
00664     private:
00665         typedef typename sorted_runs_type::run_type run_type;
00666         sorted_runs_type result_; // stores the result (sorted runs)
00667         unsigned_type m_;         // memory for internal use in blocks
00668         buffered_writer<block_type> writer;
00669         block_type * cur_block;
00670         unsigned_type offset;
00671         unsigned_type iblock;
00672         unsigned_type irun;
00673         alloc_strategy_type alloc_strategy;
00674 
00675     public:
00680         runs_creator(Cmp_ c, unsigned_type memory_to_use) :
00681             cmp(c),
00682             m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
00683             writer(m_, m_ / 2),
00684             cur_block(writer.get_free_block()),
00685             offset(0),
00686             iblock(0),
00687             irun(0)
00688         {
00689             assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use);
00690         }
00691 
00694         void push(const value_type & val)
00695         {
00696             assert(offset < block_type::size);
00697 
00698             (*cur_block)[offset] = val;
00699             ++offset;
00700 
00701             if (offset == block_type::size)
00702             {
00703                 // write current block
00704 
00705                 block_manager * bm = block_manager::get_instance();
00706                 // allocate space for the block
00707                 result_.runs.resize(irun + 1);
00708                 result_.runs[irun].resize(iblock + 1);
00709                 bm->new_blocks(
00710                     offset_allocator<alloc_strategy_type>(iblock, alloc_strategy),
00711                     trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
00712                         result_.runs[irun].begin() + iblock),
00713                     trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
00714                         result_.runs[irun].end())
00715                     );
00716 
00717                 result_.runs[irun][iblock].value = (*cur_block)[0];         // init trigger
00718                 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
00719                 ++iblock;
00720 
00721                 offset = 0;
00722             }
00723 
00724             ++result_.elements;
00725         }
00726 
00728         void finish()
00729         {
00730             if (offset == 0 && iblock == 0) // current run is empty
00731                 return;
00732 
00733 
00734             result_.runs_sizes.resize(irun + 1);
00735             result_.runs_sizes.back() = iblock * block_type::size + offset;
00736 
00737             if (offset)    // if current block is partially filled
00738             {
00739                 while (offset != block_type::size)
00740                 {
00741                     (*cur_block)[offset] = cmp.max_value();
00742                     ++offset;
00743                 }
00744                 offset = 0;
00745 
00746                 block_manager * bm = block_manager::get_instance();
00747                 // allocate space for the block
00748                 result_.runs.resize(irun + 1);
00749                 result_.runs[irun].resize(iblock + 1);
00750                 bm->new_blocks(
00751                     offset_allocator<alloc_strategy_type>(iblock, alloc_strategy),
00752                     trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
00753                         result_.runs[irun].begin() + iblock),
00754                     trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
00755                         result_.runs[irun].end())
00756                     );
00757 
00758                 result_.runs[irun][iblock].value = (*cur_block)[0];         // init trigger
00759                 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
00760             }
00761             else
00762             { }
00763 
00764             iblock = 0;
00765             ++irun;
00766         }
00767 
00771         const sorted_runs_type & result()
00772         {
00773             finish();
00774             writer.flush();
00775 
00776             return result_;
00777         }
00778     };
00779 
00780 
00785     template <class RunsType_, class Cmp_>
00786     bool check_sorted_runs(RunsType_ & sruns, Cmp_ cmp)
00787     {
00788         typedef typename RunsType_::block_type block_type;
00789         typedef typename block_type::value_type value_type;
00790         STXXL_VERBOSE2("Elements: " << sruns.elements);
00791         unsigned_type nruns = sruns.runs.size();
00792         STXXL_VERBOSE2("Runs: " << nruns);
00793         unsigned_type irun = 0;
00794         for (irun = 0; irun < nruns; ++irun)
00795         {
00796             const unsigned_type nblocks = sruns.runs[irun].size();
00797             block_type * blocks = new block_type[nblocks];
00798             request_ptr * reqs = new request_ptr[nblocks];
00799             for (unsigned_type j = 0; j < nblocks; ++j)
00800             {
00801                 reqs[j] = blocks[j].read(sruns.runs[irun][j].bid);
00802             }
00803             wait_all(reqs, reqs + nblocks);
00804             for (unsigned_type j = 0; j < nblocks; ++j)
00805             {
00806                 if (cmp(blocks[j][0], sruns.runs[irun][j].value) ||
00807                     cmp(sruns.runs[irun][j].value, blocks[j][0])) 
00808                 {
00809                     STXXL_ERRMSG("check_sorted_runs  wrong trigger in the run");
00810                     return false;
00811                 }
00812             }
00813             if (!stxxl::is_sorted(
00814 #if 1
00815                     ArrayOfSequencesIterator<
00816                         block_type, typename block_type::value_type, block_type::size
00817                         >(blocks, 0),
00818                     ArrayOfSequencesIterator<
00819                         block_type, typename block_type::value_type, block_type::size
00820                         >(blocks, sruns.runs_sizes[irun]),
00821 #else
00822                     TwoToOneDimArrayRowAdaptor<
00823                         block_type, value_type, block_type::size
00824                         >(blocks, 0),
00825                     TwoToOneDimArrayRowAdaptor<
00826                         block_type, value_type, block_type::size
00827                         >(blocks,
00828                           //nblocks*block_type::size
00829                           //(irun<nruns-1)?(nblocks*block_type::size): (sruns.elements%(nblocks*block_type::size))
00830                           sruns.runs_sizes[irun]
00831                           ),
00832 #endif
00833                     cmp))
00834             {
00835                 STXXL_ERRMSG("check_sorted_runs  wrong order in the run");
00836                 return false;
00837             }
00838 
00839             delete[] reqs;
00840             delete[] blocks;
00841         }
00842 
00843         STXXL_MSG("Checking runs finished successfully");
00844 
00845         return true;
00846     }
00847 
00848 
00856     template <class RunsType_,
00857               class Cmp_,
00858               class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00859     class runs_merger : private noncopyable
00860     {
00861         typedef RunsType_ sorted_runs_type;
00862         typedef AllocStr_ alloc_strategy;
00863         typedef typename sorted_runs_type::size_type size_type;
00864         typedef Cmp_ value_cmp;
00865         typedef typename sorted_runs_type::run_type run_type;
00866         typedef typename sorted_runs_type::block_type block_type;
00867         typedef typename block_type::bid_type bid_type;
00868         typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
00869         typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
00870         typedef sort_local::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
00871         typedef loser_tree<run_cursor_type, run_cursor2_cmp_type, block_type::size> loser_tree_type;
00872 
00873         typedef stxxl::int64 diff_type;
00874         typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
00875         typedef typename std::vector<sequence>::size_type seqs_size_type;
00876         std::vector<sequence> * seqs;
00877         std::vector<block_type *> * buffers;
00878 
00879 
00880         sorted_runs_type sruns;
00881         unsigned_type m_; //  blocks to use - 1
00882         value_cmp cmp;
00883         size_type elements_remaining;
00884         unsigned_type buffer_pos;
00885         block_type * current_block;
00886         run_type consume_seq;
00887         prefetcher_type * prefetcher;
00888         loser_tree_type * losers;
00889         int_type * prefetch_seq;
00890         unsigned_type nruns;
00891 #ifdef STXXL_CHECK_ORDER_IN_SORTS
00892         typename block_type::value_type last_element;
00893 #endif
00894 
00895         void merge_recursively();
00896 
00897         void deallocate_prefetcher()
00898         {
00899             if (prefetcher)
00900             {
00901                 delete losers;
00902                 delete seqs;
00903                 delete buffers;
00904                 delete prefetcher;
00905                 delete[] prefetch_seq;
00906                 prefetcher = NULL;
00907             }
00908             // free blocks in runs , (or the user should do it?)
00909             sruns.deallocate_blocks();
00910         }
00911 
00912         void initialize_current_block()
00913         {
00914             if (do_parallel_merge())
00915             {
00916 #if STXXL_PARALLEL_MULTIWAY_MERGE
00917 // begin of STL-style merging
00918                 seqs = new std::vector<sequence>(nruns);
00919                 buffers = new std::vector<block_type *>(nruns);
00920 
00921                 for (unsigned_type i = 0; i < nruns; i++)                                       //initialize sequences
00922                 {
00923                     (*buffers)[i] = prefetcher->pull_block();                                   //get first block of each run
00924                     (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end());  //this memory location stays the same, only the data is exchanged
00925                 }
00926 
00927 // end of STL-style merging
00928 #else
00929                 assert(false);
00930 #endif
00931             }
00932             else
00933             {
00934 // begin of native merging procedure
00935 
00936                 losers = new loser_tree_type(prefetcher, nruns, run_cursor2_cmp_type(cmp));
00937 
00938 // end of native merging procedure
00939             }
00940         }
00941 
00942         void fill_current_block()
00943         {
00944             if (do_parallel_merge())
00945             {
00946 #if STXXL_PARALLEL_MULTIWAY_MERGE
00947 // begin of STL-style merging
00948                 diff_type rest = block_type::size;              // elements still to merge for this output block
00949 
00950                 do                                              // while rest > 0 and still elements available
00951                 {
00952                     value_type * min_last_element = NULL;       // no element found yet
00953                     diff_type total_size = 0;
00954 
00955                     for (seqs_size_type i = 0; i < (*seqs).size(); i++)
00956                     {
00957                         if ((*seqs)[i].first == (*seqs)[i].second)
00958                             continue; //run empty
00959 
00960                         if (min_last_element == NULL)
00961                             min_last_element = &(*((*seqs)[i].second - 1));
00962                         else
00963                             min_last_element = cmp(*min_last_element, *((*seqs)[i].second - 1)) ? min_last_element : &(*((*seqs)[i].second - 1));
00964 
00965                         total_size += (*seqs)[i].second - (*seqs)[i].first;
00966                         STXXL_VERBOSE1("last " << *((*seqs)[i].second - 1) << " block size " << ((*seqs)[i].second - (*seqs)[i].first));
00967                     }
00968 
00969                     assert(min_last_element != NULL);           //there must be some element
00970 
00971                     STXXL_VERBOSE1("min_last_element " << min_last_element << " total size " << total_size + (block_type::size - rest));
00972 
00973                     diff_type less_equal_than_min_last = 0;
00974                     //locate this element in all sequences
00975                     for (seqs_size_type i = 0; i < (*seqs).size(); i++)
00976                     {
00977                         if ((*seqs)[i].first == (*seqs)[i].second)
00978                             continue; //empty subsequence
00979 
00980                         typename block_type::iterator position = std::upper_bound((*seqs)[i].first, (*seqs)[i].second, *min_last_element, cmp);
00981                         STXXL_VERBOSE1("greater equal than " << position - (*seqs)[i].first);
00982                         less_equal_than_min_last += position - (*seqs)[i].first;
00983                     }
00984 
00985                     STXXL_VERBOSE1("finished loop");
00986 
00987                     ptrdiff_t output_size = (std::min)(less_equal_than_min_last, rest);   // at most rest elements
00988 
00989                     STXXL_VERBOSE1("before merge" << output_size);
00990 
00991                     stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), current_block->end() - rest, cmp, output_size);
00992                     //sequence iterators are progressed appropriately
00993 
00994                     STXXL_VERBOSE1("after merge");
00995 
00996                     rest -= output_size;
00997 
00998                     STXXL_VERBOSE1("so long");
00999 
01000                     for (seqs_size_type i = 0; i < (*seqs).size(); i++)
01001                     {
01002                         if ((*seqs)[i].first == (*seqs)[i].second)                        // run empty
01003                         {
01004                             if (prefetcher->block_consumed((*buffers)[i]))
01005                             {
01006                                 (*seqs)[i].first = (*buffers)[i]->begin();                // reset iterator
01007                                 (*seqs)[i].second = (*buffers)[i]->end();
01008                                 STXXL_VERBOSE1("block ran empty " << i);
01009                             }
01010                             else
01011                             {
01012                                 (*seqs).erase((*seqs).begin() + i);                       // remove this sequence
01013                                 (*buffers).erase((*buffers).begin() + i);
01014                                 STXXL_VERBOSE1("seq removed " << i);
01015                             }
01016                         }
01017                     }
01018                 } while (rest > 0 && (*seqs).size() > 0);
01019 
01020  #ifdef STXXL_CHECK_ORDER_IN_SORTS
01021                 if (!stxxl::is_sorted(current_block->begin(), current_block->end(), cmp))
01022                 {
01023                     for (value_type * i = current_block->begin() + 1; i != current_block->end(); i++)
01024                         if (cmp(*i, *(i - 1)))
01025                         {
01026                             STXXL_VERBOSE1("Error at position " << (i - current_block->begin()));
01027                         }
01028                     assert(false);
01029                 }
01030  #endif
01031 
01032 // end of STL-style merging
01033 #else
01034                 assert(false);
01035 #endif
01036             }
01037             else
01038             {
01039 // begin of native merging procedure
01040 
01041                 losers->multi_merge(current_block->elem);
01042 
01043 // end of native merging procedure
01044             }
01045         }
01046 
01047     public:
01049         typedef typename sorted_runs_type::value_type value_type;
01050 
01055         runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
01056             sruns(r), m_(memory_to_use / block_type::raw_size / sort_memory_usage_factor() /* - 1 */), cmp(c),
01057             elements_remaining(r.elements),
01058             current_block(NULL),
01059             prefetcher(NULL)
01060 #ifdef STXXL_CHECK_ORDER_IN_SORTS
01061             , last_element(cmp.min_value())
01062 #endif
01063         {
01064             if (empty())
01065                 return;
01066 
01067 
01068             if (!sruns.small_.empty()) // we have a small input < B,
01069             // that is kept in the main memory
01070             {
01071                 STXXL_VERBOSE1("runs_merger: small input optimization, input length: " << elements_remaining);
01072                 assert(elements_remaining == size_type(sruns.small_.size()));
01073                 current_block = new block_type;
01074                 std::copy(sruns.small_.begin(), sruns.small_.end(), current_block->begin());
01075                 current_value = current_block->elem[0];
01076                 buffer_pos = 1;
01077 
01078                 return;
01079             }
01080 
01081 #ifdef STXXL_CHECK_ORDER_IN_SORTS
01082             assert(check_sorted_runs(r, cmp));
01083 #endif
01084 
01085             current_block = new block_type;
01086 
01087             disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
01088 
01089             nruns = sruns.runs.size();
01090 
01091             if (m_ < nruns)
01092             {
01093                 // can not merge runs in one pass
01094                 // merge recursively:
01095                 STXXL_ERRMSG("The implementation of sort requires more than one merge pass, therefore for a better");
01096                 STXXL_ERRMSG("efficiency decrease block size of run storage (a parameter of the run_creator)");
01097                 STXXL_ERRMSG("or increase the amount memory dedicated to the merger.");
01098                 STXXL_ERRMSG("m = " << m_ << " nruns=" << nruns);
01099 
01100                 // insufficient memory, can not merge at all
01101                 if (m_ < 2) {
01102                     STXXL_ERRMSG("The merger requires memory to store at least two blocks internally. Aborting.");
01103                     abort();
01104                 }
01105 
01106                 merge_recursively();
01107 
01108                 nruns = sruns.runs.size();
01109             }
01110 
01111             assert(nruns <= m_);
01112 
01113             unsigned_type i;
01114             /*
01115                const unsigned_type out_run_size =
01116                   div_and_round_up(elements_remaining,size_type(block_type::size));
01117              */
01118             unsigned_type prefetch_seq_size = 0;
01119             for (i = 0; i < nruns; i++)
01120             {
01121                 prefetch_seq_size += sruns.runs[i].size();
01122             }
01123 
01124             consume_seq.resize(prefetch_seq_size);
01125 
01126             prefetch_seq = new int_type[prefetch_seq_size];
01127 
01128             typename run_type::iterator copy_start = consume_seq.begin();
01129             for (i = 0; i < nruns; i++)
01130             {
01131                 copy_start = std::copy(
01132                     sruns.runs[i].begin(),
01133                     sruns.runs[i].end(),
01134                     copy_start);
01135             }
01136 
01137             std::stable_sort(consume_seq.begin(), consume_seq.end(),
01138                              sort_local::trigger_entry_cmp<bid_type, value_type, value_cmp>(cmp));
01139 
01140             int_type disks_number = config::get_instance()->disks_number();
01141 
01142             const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (int_type(m_) - int_type(nruns)));
01143 
01144 
01145 #ifdef SORT_OPTIMAL_PREFETCHING
01146             // heuristic
01147             const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10;
01148 
01149             compute_prefetch_schedule(
01150                 consume_seq,
01151                 prefetch_seq,
01152                 n_opt_prefetch_buffers,
01153                 disks_number);
01154 #else
01155             for (i = 0; i < prefetch_seq_size; ++i)
01156                 prefetch_seq[i] = i;
01157 
01158 #endif
01159 
01160 
01161             prefetcher = new prefetcher_type(
01162                 consume_seq.begin(),
01163                 consume_seq.end(),
01164                 prefetch_seq,
01165                 nruns + n_prefetch_buffers);
01166 
01167             losers = NULL;
01168             seqs = NULL;
01169             buffers = NULL;
01170 
01171             initialize_current_block();
01172             fill_current_block();
01173 
01174 
01175             current_value = current_block->elem[0];
01176             buffer_pos = 1;
01177 
01178             if (elements_remaining <= block_type::size)
01179                 deallocate_prefetcher();
01180         }
01181 
01183         bool empty() const
01184         {
01185             return elements_remaining == 0;
01186         }
01188         runs_merger & operator ++ () // preincrement operator
01189         {
01190             assert(!empty());
01191 
01192             --elements_remaining;
01193 
01194             if (buffer_pos != block_type::size)
01195             {
01196                 current_value = current_block->elem[buffer_pos];
01197                 ++buffer_pos;
01198             }
01199             else
01200             {
01201                 if (!empty())
01202                 {
01203                     fill_current_block();
01204 
01205 #ifdef STXXL_CHECK_ORDER_IN_SORTS
01206                     assert(stxxl::is_sorted(current_block->elem, current_block->elem + current_block->size, cmp));
01207                     assert(!cmp(current_block->elem[0], current_value));
01208 #endif
01209                     current_value = current_block->elem[0];
01210                     buffer_pos = 1;
01211                 }
01212                 if (elements_remaining <= block_type::size)
01213                     deallocate_prefetcher();
01214             }
01215 
01216 
01217 #ifdef STXXL_CHECK_ORDER_IN_SORTS
01218             if (!empty())
01219             {
01220                 assert(!cmp(current_value, last_element));
01221                 last_element = current_value;
01222             }
01223 #endif
01224 
01225             return *this;
01226         }
01228         const value_type & operator * () const
01229         {
01230             assert(!empty());
01231             return current_value;
01232         }
01233 
01235         const value_type * operator -> () const
01236         {
01237             assert(!empty());
01238             return &current_value;
01239         }
01240 
01241 
01244         virtual ~runs_merger()
01245         {
01246             deallocate_prefetcher();
01247 
01248             if (current_block)
01249                 delete current_block;
01250 
01251             // free blocks in runs , (or the user should do it?)
01252             sruns.deallocate_blocks();
01253         }
01254 
01255     private:
01256         // cache for the current value
01257         value_type current_value;
01258     };
01259 
01260 
01261     template <class RunsType_, class Cmp_, class AllocStr_>
01262     void runs_merger<RunsType_, Cmp_, AllocStr_>::merge_recursively()
01263     {
01264         block_manager * bm = block_manager::get_instance();
01265         unsigned_type ndisks = config::get_instance()->disks_number();
01266         unsigned_type nwrite_buffers = 2 * ndisks;
01267 
01268         unsigned_type nruns = sruns.runs.size();
01269         const unsigned_type merge_factor =
01270             static_cast<unsigned_type>(ceil(pow(nruns, 1. / ceil(log(double(nruns)) / log(double(m_))))));
01271         assert(merge_factor <= m_);
01272         while (nruns > m_)
01273         {
01274             unsigned_type new_nruns = div_and_round_up(nruns, merge_factor);
01275             STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
01276                           " opt_merge_factor: " << merge_factor << " m:" << m_ << " new_nruns: " << new_nruns);
01277 
01278             sorted_runs_type new_runs;
01279             new_runs.runs.resize(new_nruns);
01280             new_runs.runs_sizes.resize(new_nruns);
01281             new_runs.elements = sruns.elements;
01282 
01283             unsigned_type runs_left = nruns;
01284             unsigned_type cur_out_run = 0;
01285             unsigned_type elements_in_new_run = 0;
01286             //unsigned_type blocks_in_new_run = 0;
01287 
01288 
01289             while (runs_left > 0)
01290             {
01291                 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
01292                 //blocks_in_new_run = 0 ;
01293                 elements_in_new_run = 0;
01294                 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
01295                 {
01296                     elements_in_new_run += sruns.runs_sizes[i];
01297                     //blocks_in_new_run += sruns.runs[i].size();
01298                 }
01299                 const unsigned_type blocks_in_new_run1 = div_and_round_up(elements_in_new_run, block_type::size);
01300                 //assert(blocks_in_new_run1 == blocks_in_new_run);
01301 
01302                 new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
01303                 // allocate run
01304                 new_runs.runs[cur_out_run++].resize(blocks_in_new_run1);
01305                 runs_left -= runs2merge;
01306             }
01307 
01308             // allocate blocks for the new runs
01309             for (unsigned_type i = 0; i < new_runs.runs.size(); ++i)
01310                 bm->new_blocks(alloc_strategy(),
01311                                trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(new_runs.runs[i].begin()),
01312                                trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(new_runs.runs[i].end()));
01313 
01314 
01315             // merge all
01316             runs_left = nruns;
01317             cur_out_run = 0;
01318             size_type elements_left = sruns.elements;
01319 
01320             while (runs_left > 0)
01321             {
01322                 unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
01323                 STXXL_VERBOSE("Merging " << runs2merge << " runs");
01324 
01325                 sorted_runs_type cur_runs;
01326                 cur_runs.runs.resize(runs2merge);
01327                 cur_runs.runs_sizes.resize(runs2merge);
01328 
01329                 std::copy(sruns.runs.begin() + nruns - runs_left,
01330                           sruns.runs.begin() + nruns - runs_left + runs2merge,
01331                           cur_runs.runs.begin());
01332                 std::copy(sruns.runs_sizes.begin() + nruns - runs_left,
01333                           sruns.runs_sizes.begin() + nruns - runs_left + runs2merge,
01334                           cur_runs.runs_sizes.begin());
01335 
01336                 runs_left -= runs2merge;
01337                 /*
01338                    cur_runs.elements = (runs_left)?
01339                    (new_runs.runs[cur_out_run].size()*block_type::size):
01340                    (elements_left);
01341                  */
01342                 cur_runs.elements = new_runs.runs_sizes[cur_out_run];
01343                 elements_left -= cur_runs.elements;
01344 
01345                 if (runs2merge > 1)
01346                 {
01347                     runs_merger<RunsType_, Cmp_, AllocStr_> merger(cur_runs, cmp, m_ * block_type::raw_size * sort_memory_usage_factor());
01348 
01349                     {   // make sure everything is being destroyed in right time
01350                         buf_ostream<block_type, typename run_type::iterator> out(
01351                             new_runs.runs[cur_out_run].begin(),
01352                             nwrite_buffers);
01353 
01354                         size_type cnt = 0;
01355                         const size_type cnt_max = cur_runs.elements;
01356 
01357                         while (cnt != cnt_max)
01358                         {
01359                             *out = *merger;
01360                             if ((cnt % block_type::size) == 0) // have to write the trigger value
01361                                 new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger;
01362 
01363 
01364                             ++cnt;
01365                             ++out;
01366                             ++merger;
01367                         }
01368                         assert(merger.empty());
01369 
01370                         while (cnt % block_type::size)
01371                         {
01372                             *out = cmp.max_value();
01373                             ++out;
01374                             ++cnt;
01375                         }
01376                     }
01377                 }
01378                 else
01379                 {
01380                     bm->delete_blocks(
01381                         trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
01382                             new_runs.runs.back().begin()),
01383                         trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
01384                             new_runs.runs.back().end()));
01385 
01386                     assert(cur_runs.runs.size() == 1);
01387 
01388                     std::copy(cur_runs.runs.front().begin(),
01389                               cur_runs.runs.front().end(),
01390                               new_runs.runs.back().begin());
01391                     new_runs.runs_sizes.back() = cur_runs.runs_sizes.back();
01392                 }
01393 
01394                 ++cur_out_run;
01395             }
01396             assert(elements_left == 0);
01397 
01398             nruns = new_nruns;
01399             sruns = new_runs;
01400         }
01401     }
01402 
01403 
01412     template <class Input_,
01413               class Cmp_,
01414               unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
01415               class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
01416     class sort : public noncopyable
01417     {
01418         typedef runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> runs_creator_type;
01419         typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
01420         typedef runs_merger<sorted_runs_type, Cmp_, AllocStr_> runs_merger_type;
01421 
01422         runs_creator_type creator;
01423         runs_merger_type merger;
01424 
01425     public:
01427         typedef typename Input_::value_type value_type;
01428 
01433         sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use) :
01434             creator(in, c, memory_to_use),
01435             merger(creator.result(), c, memory_to_use)
01436         { }
01437 
01443         sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use_rc, unsigned_type memory_to_use_m) :
01444             creator(in, c, memory_to_use_rc),
01445             merger(creator.result(), c, memory_to_use_m)
01446         { }
01447 
01448 
01450         const value_type & operator * () const
01451         {
01452             assert(!empty());
01453             return *merger;
01454         }
01455 
01456         const value_type * operator -> () const
01457         {
01458             assert(!empty());
01459             return merger.operator -> ();
01460         }
01461 
01463         sort & operator ++ ()
01464         {
01465             ++merger;
01466             return *this;
01467         }
01468 
01470         bool empty() const
01471         {
01472             return merger.empty();
01473         }
01474     };
01475 
01481     template <
01482         class ValueType_,
01483         unsigned BlockSize_>
01484     class compute_sorted_runs_type
01485     {
01486         typedef ValueType_ value_type;
01487         typedef BID<BlockSize_> bid_type;
01488         typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
01489 
01490     public:
01491         typedef sorted_runs<value_type, trigger_entry_type> result;
01492     };
01493 
01495 }
01496 
01499 
01501 
01510 template <unsigned BlockSize,
01511           class RandomAccessIterator,
01512           class CmpType,
01513           class AllocStr>
01514 void sort(RandomAccessIterator begin,
01515           RandomAccessIterator end,
01516           CmpType cmp,
01517           unsigned_type MemSize,
01518           AllocStr AS)
01519 {
01520     UNUSED(AS);
01521 #ifdef BOOST_MSVC
01522     typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType;
01523 #else
01524     typedef __typeof__(stream::streamify(begin, end)) InputType;
01525 #endif
01526     InputType Input(begin, end);
01527     typedef stream::sort<InputType, CmpType, BlockSize, AllocStr> sorter_type;
01528     sorter_type Sort(Input, cmp, MemSize);
01529     stream::materialize(Sort, begin);
01530 }
01531 
01533 
01534 __STXXL_END_NAMESPACE
01535 
01536 #endif // !STXXL_SORT_STREAM_HEADER
01537 // vim: et:ts=4:sw=4

Generated on Thu Jun 4 10:29:30 2009 for Stxxl by  doxygen 1.4.7