13 #ifndef STXXL_BUFFERED_WRITER_HEADER
14 #define STXXL_BUFFERED_WRITER_HEADER
18 #include <stxxl/bits/mng/mng.h>
21 __STXXL_BEGIN_NAMESPACE
34 template <
typename block_type>
40 typedef typename block_type::bid_type bid_type;
42 const unsigned_type nwriteblocks;
43 block_type * write_buffers;
44 bid_type * write_bids;
46 const unsigned_type writebatchsize;
48 std::vector<int_type> free_write_blocks;
49 std::vector<int_type> busy_write_blocks;
56 batch_entry(stxxl::int64 o,
int b) : offset(o), ibuffer(b) { }
58 struct batch_entry_cmp
60 bool operator () (
const batch_entry & a,
const batch_entry & b)
const
62 return (a.offset > b.offset);
66 typedef std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp> batch_type;
67 batch_type batch_write_blocks;
75 nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
76 writebatchsize(write_batch_size ? write_batch_size : 1)
78 write_buffers =
new block_type[nwriteblocks];
80 write_bids =
new bid_type[nwriteblocks];
82 for (unsigned_type i = 0; i < nwriteblocks; i++)
83 free_write_blocks.push_back(i);
86 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
93 for (std::vector<int_type>::iterator it = busy_write_blocks.begin();
94 it != busy_write_blocks.end(); ++it)
96 if (write_reqs[ibuffer = (*it)]->
poll())
98 busy_write_blocks.erase(it);
99 free_write_blocks.push_back(ibuffer);
104 if (UNLIKELY(free_write_blocks.empty()))
106 int_type size = busy_write_blocks.size();
109 for ( ; i < size; ++i)
111 reqs[i] = write_reqs[busy_write_blocks[i]];
113 int_type completed =
wait_any(reqs, size);
114 int_type completed_global = busy_write_blocks[completed];
116 busy_write_blocks.erase(busy_write_blocks.begin() + completed);
118 return (write_buffers + completed_global);
120 ibuffer = free_write_blocks.back();
121 free_write_blocks.pop_back();
123 return (write_buffers + ibuffer);
130 block_type *
write(block_type * filled_block,
const bid_type & bid)
132 if (batch_write_blocks.size() >= writebatchsize)
135 while (!batch_write_blocks.empty())
137 int_type ibuffer = batch_write_blocks.top().ibuffer;
138 batch_write_blocks.pop();
140 if (write_reqs[ibuffer].valid())
141 write_reqs[ibuffer]->
wait();
143 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
145 busy_write_blocks.push_back(ibuffer);
150 int_type ibuffer = filled_block - write_buffers;
151 write_bids[ibuffer] = bid;
152 batch_write_blocks.push(batch_entry(bid.offset, ibuffer));
160 while (!batch_write_blocks.empty())
162 ibuffer = batch_write_blocks.top().ibuffer;
163 batch_write_blocks.pop();
165 if (write_reqs[ibuffer].valid())
166 write_reqs[ibuffer]->
wait();
168 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
170 busy_write_blocks.push_back(ibuffer);
172 for (std::vector<int_type>::const_iterator it =
173 busy_write_blocks.begin();
174 it != busy_write_blocks.end(); it++)
177 write_reqs[ibuffer]->
wait();
180 assert(batch_write_blocks.empty());
181 free_write_blocks.clear();
182 busy_write_blocks.clear();
184 for (unsigned_type i = 0; i < nwriteblocks; i++)
185 free_write_blocks.push_back(i);
192 while (!batch_write_blocks.empty())
194 ibuffer = batch_write_blocks.top().ibuffer;
195 batch_write_blocks.pop();
197 if (write_reqs[ibuffer].valid())
198 write_reqs[ibuffer]->
wait();
200 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
202 busy_write_blocks.push_back(ibuffer);
204 for (std::vector<int_type>::const_iterator it =
205 busy_write_blocks.begin();
206 it != busy_write_blocks.end(); it++)
209 write_reqs[ibuffer]->
wait();
213 delete[] write_buffers;
220 __STXXL_END_NAMESPACE
222 #endif // !STXXL_BUFFERED_WRITER_HEADER