stable_ksort.h

00001 /***************************************************************************
00002  *  include/stxxl/bits/algo/stable_ksort.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2003 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00007  *
00008  *  Distributed under the Boost Software License, Version 1.0.
00009  *  (See accompanying file LICENSE_1_0.txt or copy at
00010  *  http://www.boost.org/LICENSE_1_0.txt)
00011  **************************************************************************/
00012 
00013 #ifndef STXXL_STABLE_KSORT_HEADER
00014 #define STXXL_STABLE_KSORT_HEADER
00015 
00016 // it is a first try: distribution sort without sampling
00017 // I rework the stable_ksort when I would have a time
00018 
00019 
00020 #include <cmath>
00021 
00022 #include <stxxl/bits/mng/mng.h>
00023 #include <stxxl/bits/mng/buf_istream.h>
00024 #include <stxxl/bits/mng/buf_ostream.h>
00025 #include <stxxl/bits/common/simple_vector.h>
00026 #include <stxxl/bits/algo/intksort.h>
00027 
00028 #ifndef STXXL_VERBOSE_STABLE_KSORT
00029 #define STXXL_VERBOSE_STABLE_KSORT STXXL_VERBOSE1
00030 #endif
00031 
00032 
00033 __STXXL_BEGIN_NAMESPACE
00034 
00037 
00040 namespace stable_ksort_local
00041 {
00042     template <class type_, class type_key>
00043     void classify_block(type_ * begin, type_ * end, type_key * & out, int_type * bucket, unsigned_type offset, unsigned shift)
00044     {
00045         for (type_ * p = begin; p < end; p++, out++) // count & create references
00046         {
00047             out->ptr = p;
00048             typename type_::key_type key = p->key();
00049             int_type ibucket = (key - offset) >> shift;
00050             out->key = key;
00051             bucket[ibucket]++;
00052         }
00053     }
00054 
00055     template <typename type>
00056     struct type_key
00057     {
00058         typedef typename type::key_type key_type;
00059         key_type key;
00060         type * ptr;
00061 
00062         type_key() { }
00063         type_key(key_type k, type * p) : key(k), ptr(p)
00064         { }
00065     };
00066 
00067     template <typename type>
00068     bool operator < (const type_key<type> & a, const type_key<type> & b)
00069     {
00070         return a.key < b.key;
00071     }
00072 
00073     template <typename type>
00074     bool operator > (const type_key<type> & a, const type_key<type> & b)
00075     {
00076         return a.key > b.key;
00077     }
00078 
00079 
00080     template <typename BIDType_, typename AllocStrategy_>
00081     class bid_sequence
00082     {
00083     public:
00084         typedef BIDType_ bid_type;
00085         typedef bid_type & reference;
00086         typedef AllocStrategy_ alloc_strategy;
00087         typedef typename simple_vector<bid_type>::size_type size_type;
00088         typedef typename simple_vector<bid_type>::iterator iterator;
00089 
00090     protected:
00091         simple_vector<bid_type> * bids;
00092         alloc_strategy alloc_strategy_;
00093 
00094     public:
00095         bid_sequence() { }
00096         bid_sequence(size_type size_)
00097         {
00098             bids = new simple_vector<bid_type>(size_);
00099             block_manager * mng = block_manager::get_instance();
00100             mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
00101         }
00102         void init(size_type size_)
00103         {
00104             bids = new simple_vector<bid_type>(size_);
00105             block_manager * mng = block_manager::get_instance();
00106             mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
00107         }
00108         reference operator [] (size_type i)
00109         {
00110             size_type size_ = size();             // cache size in a register
00111             if (i < size_)
00112                 return *(bids->begin() + i);
00113 
00114             block_manager * mng = block_manager::get_instance();
00115             simple_vector<bid_type> * larger_bids = new simple_vector<bid_type>((i + 1) * 2);
00116             std::copy(bids->begin(), bids->end(), larger_bids->begin());
00117             mng->new_blocks(alloc_strategy_, larger_bids->begin() + size_, larger_bids->end());
00118             delete bids;
00119             bids = larger_bids;
00120             return *(larger_bids->begin() + i);
00121         }
00122         size_type size() { return bids->size(); }
00123         iterator begin() { return bids->begin(); }
00124         ~bid_sequence()
00125         {
00126             block_manager::get_instance()->delete_blocks(bids->begin(), bids->end());
00127             delete bids;
00128         }
00129     };
00130 
00131     template <typename ExtIterator_>
00132     void distribute(
00133         bid_sequence<typename ExtIterator_::vector_type::block_type::bid_type,
00134                      typename ExtIterator_::vector_type::alloc_strategy> * bucket_bids,
00135         int64 * bucket_sizes,
00136         const int_type nbuckets,
00137         const int_type lognbuckets,
00138         ExtIterator_ first,
00139         ExtIterator_ last,
00140         const int_type nread_buffers,
00141         const int_type nwrite_buffers)
00142     {
00143         typedef typename ExtIterator_::vector_type::value_type value_type;
00144         typedef typename value_type::key_type key_type;
00145         typedef typename ExtIterator_::block_type block_type;
00146         typedef typename block_type::bid_type bid_type;
00147         typedef buf_istream<typename ExtIterator_::block_type,
00148                             typename ExtIterator_::bids_container_iterator> buf_istream_type;
00149 
00150         int_type i = 0;
00151 
00152         buf_istream_type in(first.bid(), last.bid() + ((first.block_offset()) ? 1 : 0),
00153                             nread_buffers);
00154 
00155         buffered_writer<block_type> out(
00156             nbuckets + nwrite_buffers,
00157             nwrite_buffers);
00158 
00159         unsigned_type * bucket_block_offsets = new unsigned_type[nbuckets];
00160         unsigned_type * bucket_iblock = new unsigned_type[nbuckets];
00161         block_type ** bucket_blocks = new block_type *[nbuckets];
00162 
00163         std::fill(bucket_sizes, bucket_sizes + nbuckets, 0);
00164         std::fill(bucket_iblock, bucket_iblock + nbuckets, 0);
00165         std::fill(bucket_block_offsets, bucket_block_offsets + nbuckets, 0);
00166 
00167         for (i = 0; i < nbuckets; i++)
00168             bucket_blocks[i] = out.get_free_block();
00169 
00170 
00171         ExtIterator_ cur = first - first.block_offset();
00172 
00173         // skip part of the block before first untouched
00174         for ( ; cur != first; cur++)
00175             ++in;
00176 
00177 
00178         const int_type shift = sizeof(key_type) * 8 - lognbuckets;
00179         // search in the the range [_begin,_end)
00180         STXXL_VERBOSE_STABLE_KSORT("Shift by: " << shift << " bits, lognbuckets: " << lognbuckets);
00181         for ( ; cur != last; cur++)
00182         {
00183             key_type cur_key = in.current().key();
00184             int_type ibucket = cur_key >> shift;
00185 
00186             int_type block_offset = bucket_block_offsets[ibucket];
00187             in >> (bucket_blocks[ibucket]->elem[block_offset++]);
00188             if (block_offset == block_type::size)
00189             {
00190                 block_offset = 0;
00191                 int_type iblock = bucket_iblock[ibucket]++;
00192                 bucket_blocks[ibucket] = out.write(bucket_blocks[ibucket], bucket_bids[ibucket][iblock]);
00193             }
00194             bucket_block_offsets[ibucket] = block_offset;
00195         }
00196         for (i = 0; i < nbuckets; i++)
00197         {
00198             if (bucket_block_offsets[i])
00199             {
00200                 out.write(bucket_blocks[i], bucket_bids[i][bucket_iblock[i]]);
00201             }
00202             bucket_sizes[i] = int64(block_type::size) * bucket_iblock[i] +
00203                               bucket_block_offsets[i];
00204             STXXL_VERBOSE_STABLE_KSORT("Bucket " << i << " has size " << bucket_sizes[i] <<
00205                                        ", estimated size: " << ((last - first) / int64(nbuckets)));
00206         }
00207 
00208         delete[] bucket_blocks;
00209         delete[] bucket_block_offsets;
00210         delete[] bucket_iblock;
00211     }
00212 }
00213 
00214 template <typename ExtIterator_>
00215 void stable_ksort(ExtIterator_ first, ExtIterator_ last, unsigned_type M)
00216 {
00217     STXXL_MSG("Warning: stable_ksort is not yet fully implemented, it assumes that the keys are uniformly distributed between [0,(std::numeric_limits<key_type>::max)()]");
00218     typedef typename ExtIterator_::vector_type::value_type value_type;
00219     typedef typename value_type::key_type key_type;
00220     typedef typename ExtIterator_::block_type block_type;
00221     typedef typename block_type::bid_type bid_type;
00222     typedef typename ExtIterator_::vector_type::alloc_strategy alloc_strategy;
00223     typedef stable_ksort_local::bid_sequence<bid_type, alloc_strategy> bucket_bids_type;
00224     typedef stable_ksort_local::type_key<value_type> type_key_;
00225 
00226     first.flush();     // flush container
00227 
00228     double begin = timestamp();
00229 
00230     unsigned_type i = 0;
00231     config * cfg = config::get_instance();
00232     const unsigned_type m = M / block_type::raw_size;
00233     assert(2 * block_type::raw_size <= M);
00234     const unsigned_type write_buffers_multiple = 2;
00235     const unsigned_type read_buffers_multiple = 2;
00236     const unsigned_type ndisks = cfg->disks_number();
00237     const unsigned_type min_num_read_write_buffers = (write_buffers_multiple + read_buffers_multiple) * ndisks;
00238     const unsigned_type nmaxbuckets = m - min_num_read_write_buffers;
00239     const unsigned_type lognbuckets = static_cast<unsigned_type>(log2(double(nmaxbuckets)));
00240     const unsigned_type nbuckets = 1 << lognbuckets;
00241     const unsigned_type est_bucket_size = div_and_round_up((last - first) / int64(nbuckets),
00242                                                            int64(block_type::size)); //in blocks
00243 
00244     if (m < min_num_read_write_buffers + 2 || nbuckets < 2) {
00245         STXXL_ERRMSG("stxxl::stable_ksort: Not enough memory. Blocks available: " << m <<
00246                      ", required for r/w buffers: " << min_num_read_write_buffers <<
00247                      ", required for buckets: 2, nbuckets: " << nbuckets);
00248         abort();
00249     }
00250     STXXL_VERBOSE_STABLE_KSORT("Elements to sort: " << (last - first));
00251     STXXL_VERBOSE_STABLE_KSORT("Number of buckets has to be reduced from " << nmaxbuckets << " to " << nbuckets);
00252     const unsigned_type nread_buffers = (m - nbuckets) * read_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
00253     const unsigned_type nwrite_buffers = (m - nbuckets) * write_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
00254 
00255     STXXL_VERBOSE_STABLE_KSORT("Read buffers in distribution phase: " << nread_buffers);
00256     STXXL_VERBOSE_STABLE_KSORT("Write buffers in distribution phase: " << nwrite_buffers);
00257 
00258     bucket_bids_type * bucket_bids = new bucket_bids_type[nbuckets];
00259     for (i = 0; i < nbuckets; ++i)
00260         bucket_bids[i].init(est_bucket_size);
00261 
00262     int64 * bucket_sizes = new int64[nbuckets];
00263 
00264     disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00265 
00266     stable_ksort_local::distribute(
00267         bucket_bids,
00268         bucket_sizes,
00269         nbuckets,
00270         lognbuckets,
00271         first,
00272         last,
00273         nread_buffers,
00274         nwrite_buffers);
00275 
00276     double dist_end = timestamp(), end;
00277     double io_wait_after_d = stats::get_instance()->get_io_wait_time();
00278 
00279     {
00280         // sort buckets
00281         unsigned_type write_buffers_multiple_bs = 2;
00282         unsigned_type max_bucket_size_bl = (m - write_buffers_multiple_bs * ndisks) / 2; // in number of blocks
00283         int64 max_bucket_size_rec = int64(max_bucket_size_bl) * block_type::size;        // in number of records
00284         int64 max_bucket_size_act = 0;                                                   // actual max bucket size
00285         // establish output stream
00286 
00287         for (i = 0; i < nbuckets; i++)
00288         {
00289             max_bucket_size_act = STXXL_MAX(bucket_sizes[i], max_bucket_size_act);
00290             if (bucket_sizes[i] > max_bucket_size_rec)
00291             {
00292                 STXXL_ERRMSG("Bucket " << i << " is too large: " << bucket_sizes[i] <<
00293                              " records, maximum: " << max_bucket_size_rec);
00294                 STXXL_ERRMSG("Recursion on buckets is not yet implemented, aborting.");
00295                 abort();
00296             }
00297         }
00298         // here we can increase write_buffers_multiple_b knowing max(bucket_sizes[i])
00299         // ... and decrease max_bucket_size_bl
00300         const int_type max_bucket_size_act_bl = div_and_round_up(max_bucket_size_act, block_type::size);
00301         STXXL_VERBOSE_STABLE_KSORT("Reducing required number of required blocks per bucket from " <<
00302                                    max_bucket_size_bl << " to " << max_bucket_size_act_bl);
00303         max_bucket_size_rec = max_bucket_size_act;
00304         max_bucket_size_bl = max_bucket_size_act_bl;
00305         const unsigned_type nwrite_buffers_bs = m - 2 * max_bucket_size_bl;
00306         STXXL_VERBOSE_STABLE_KSORT("Write buffers in bucket sorting phase: " << nwrite_buffers_bs);
00307 
00308         typedef buf_ostream<block_type, typename ExtIterator_::bids_container_iterator> buf_ostream_type;
00309         buf_ostream_type out(first.bid(), nwrite_buffers_bs);
00310 
00311         disk_queues::get_instance()->set_priority_op(disk_queue::READ);
00312 
00313         if (first.block_offset())
00314         {
00315             // has to skip part of the first block
00316             block_type * block = new block_type;
00317             request_ptr req;
00318             req = block->read(*first.bid());
00319             req->wait();
00320 
00321             for (i = 0; i < first.block_offset(); i++)
00322             {
00323                 out << block->elem[i];
00324             }
00325             delete block;
00326         }
00327         block_type * blocks1 = new block_type[max_bucket_size_bl];
00328         block_type * blocks2 = new block_type[max_bucket_size_bl];
00329         request_ptr * reqs1 = new request_ptr[max_bucket_size_bl];
00330         request_ptr * reqs2 = new request_ptr[max_bucket_size_bl];
00331         type_key_ * refs1 = new type_key_[max_bucket_size_rec];
00332         type_key_ * refs2 = new type_key_[max_bucket_size_rec];
00333 
00334         // submit reading first 2 buckets (Peter's scheme)
00335         unsigned_type nbucket_blocks = div_and_round_up(bucket_sizes[0], block_type::size);
00336         for (i = 0; i < nbucket_blocks; i++)
00337             reqs1[i] = blocks1[i].read(bucket_bids[0][i]);
00338 
00339 
00340         nbucket_blocks = div_and_round_up(bucket_sizes[1], block_type::size);
00341         for (i = 0; i < nbucket_blocks; i++)
00342             reqs2[i] = blocks2[i].read(bucket_bids[1][i]);
00343 
00344 
00345         key_type offset = 0;
00346         const unsigned log_k1 =
00347             (std::max)(static_cast<unsigned>(ceil(log2(double(
00348                                                            max_bucket_size_rec * sizeof(type_key_) / STXXL_L2_SIZE)))), 1U);
00349         unsigned_type k1 = 1 << log_k1;
00350         int_type * bucket1 = new int_type[k1];
00351 
00352         const unsigned shift = sizeof(key_type) * 8 - lognbuckets;
00353         const unsigned shift1 = shift - log_k1;
00354 
00355         STXXL_VERBOSE_STABLE_KSORT("Classifying " << nbuckets << " buckets, max size:" << max_bucket_size_rec <<
00356                                    " block size:" << block_type::size << " log_k1:" << log_k1);
00357 
00358         for (unsigned_type k = 0; k < nbuckets; k++)
00359         {
00360             nbucket_blocks = div_and_round_up(bucket_sizes[k], block_type::size);
00361             const unsigned log_k1_k =
00362                 (std::max)(static_cast<unsigned>(ceil(log2(
00363                                                           double(bucket_sizes[k] * sizeof(type_key_) / STXXL_L2_SIZE)))), 1U);
00364             assert(log_k1_k <= log_k1);
00365             k1 = 1 << log_k1_k;
00366             std::fill(bucket1, bucket1 + k1, 0);
00367 
00368             STXXL_VERBOSE_STABLE_KSORT("Classifying bucket " << k << " size:" << bucket_sizes[k] <<
00369                                        " blocks:" << nbucket_blocks << " log_k1:" << log_k1_k);
00370             // classify first nbucket_blocks-1 blocks, they are full
00371             type_key_ * ref_ptr = refs1;
00372             key_type offset1 = offset + (key_type(1) << key_type(shift)) * key_type(k);
00373             for (i = 0; i < nbucket_blocks - 1; i++)
00374             {
00375                 reqs1[i]->wait();
00376                 stable_ksort_local::classify_block(blocks1[i].begin(), blocks1[i].end(), ref_ptr, bucket1, offset1, shift1 /*,k1*/);
00377             }
00378             // last block might be non-full
00379             const unsigned_type last_block_size = bucket_sizes[k] - int64(nbucket_blocks - 1) * block_type::size;
00380             reqs1[i]->wait();
00381 
00382             //STXXL_MSG("block_type::size: "<<block_type::size<<" last_block_size:"<<last_block_size);
00383 
00384             classify_block(blocks1[i].begin(), blocks1[i].begin() + last_block_size, ref_ptr, bucket1, offset1, shift1);
00385 
00386             exclusive_prefix_sum(bucket1, k1);
00387             classify(refs1, refs1 + bucket_sizes[k], refs2, bucket1, offset1, shift1);
00388 
00389             type_key_ * c = refs2;
00390             type_key_ * d = refs1;
00391             for (i = 0; i < k1; i++)
00392             {
00393                 type_key_ * cEnd = refs2 + bucket1[i];
00394                 type_key_ * dEnd = refs1 + bucket1[i];
00395 
00396                 const unsigned log_k2 = static_cast<unsigned>(log2(double(bucket1[i]))) - 1;        // adaptive bucket size
00397                 const unsigned_type k2 = 1 << log_k2;
00398                 int_type * bucket2 = new int_type[k2];
00399                 const unsigned shift2 = shift1 - log_k2;
00400 
00401                 // STXXL_MSG("Sorting bucket "<<k<<":"<<i);
00402                 l1sort(c, cEnd, d, bucket2, k2,
00403                        offset1 + (key_type(1) << key_type(shift1)) * key_type(i),
00404                        shift2);
00405 
00406                 // write out all
00407                 for (type_key_ * p = d; p < dEnd; p++)
00408                     out << (*(p->ptr));
00409 
00410 
00411                 delete[] bucket2;
00412                 c = cEnd;
00413                 d = dEnd;
00414             }
00415             // submit next read
00416             const unsigned_type bucket2submit = k + 2;
00417             if (bucket2submit < nbuckets)
00418             {
00419                 nbucket_blocks = div_and_round_up(bucket_sizes[bucket2submit], block_type::size);
00420                 for (i = 0; i < nbucket_blocks; i++)
00421                     reqs1[i] = blocks1[i].read(bucket_bids[bucket2submit][i]);
00422             }
00423 
00424             std::swap(blocks1, blocks2);
00425             std::swap(reqs1, reqs2);
00426         }
00427 
00428         delete[] bucket1;
00429         delete[] refs1;
00430         delete[] refs2;
00431         delete[] blocks1;
00432         delete[] blocks2;
00433         delete[] reqs1;
00434         delete[] reqs2;
00435         delete[] bucket_bids;
00436         delete[] bucket_sizes;
00437 
00438         if (last.block_offset())
00439         {
00440             // has to skip part of the first block
00441             block_type * block = new block_type;
00442             request_ptr req = block->read(*last.bid());
00443             req->wait();
00444 
00445             for (i = last.block_offset(); i < block_type::size; i++)
00446             {
00447                 out << block->elem[i];
00448             }
00449             delete block;
00450         }
00451 
00452         end = timestamp();
00453     }
00454 
00455     STXXL_VERBOSE("Elapsed time        : " << end - begin << " s. Distribution time: " <<
00456                   dist_end - begin << " s");
00457     STXXL_VERBOSE("Time in I/O wait(ds): " << io_wait_after_d << " s");
00458     STXXL_VERBOSE(*stats::get_instance());
00459     UNUSED(begin + dist_end + io_wait_after_d);
00460 }
00461 
00463 
00464 __STXXL_END_NAMESPACE
00465 
00466 #endif // !STXXL_STABLE_KSORT_HEADER

Generated on Thu Jun 4 10:29:30 2009 for Stxxl by  doxygen 1.4.7