001/*
002 * Copyright 2011-2018 Ping Identity Corporation
003 * All Rights Reserved.
004 */
005/*
006 * Copyright (C) 2011-2018 Ping Identity Corporation
007 *
008 * This program is free software; you can redistribute it and/or modify
009 * it under the terms of the GNU General Public License (GPLv2 only)
010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011 * as published by the Free Software Foundation.
012 *
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Public License for more details.
017 *
018 * You should have received a copy of the GNU General Public License
019 * along with this program; if not, see <http://www.gnu.org/licenses>.
020 */
021package com.unboundid.util;
022
023
024
025import java.io.ByteArrayInputStream;
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.InputStream;
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.Iterator;
033
034import static com.unboundid.util.UtilityMessages.*;
035
036
037
038/**
039 * This class provides an input stream implementation that can aggregate
040 * multiple input streams.  When reading data from this input stream, it will
041 * read from the first input stream until the end of it is reached, at point it
042 * will close it and start reading from the next one, and so on until all input
043 * streams have been exhausted.  Closing the aggregate input stream will cause
044 * all remaining input streams to be closed.
045 */
046@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE)
047public final class AggregateInputStream
048       extends InputStream
049{
050  // The currently-active input stream.
051  private volatile InputStream activeInputStream;
052
053  // The iterator that will be used to access the input streams.
054  private final Iterator<InputStream> streamIterator;
055
056
057
058  /**
059   * Creates a new aggregate input stream that will use the provided set of
060   * input streams.
061   *
062   * @param  inputStreams  The input streams to be used by this aggregate input
063   *                       stream.  It must not be {@code null}.
064   */
065  public AggregateInputStream(final InputStream... inputStreams)
066  {
067    this(StaticUtils.toList(inputStreams));
068  }
069
070
071
072  /**
073   * Creates a new aggregate input stream that will use the provided set of
074   * input streams.
075   *
076   * @param  inputStreams  The input streams to be used by this aggregate input
077   *                       stream.  It must not be {@code null}.
078   */
079  public AggregateInputStream(
080              final Collection<? extends InputStream> inputStreams)
081  {
082    Validator.ensureNotNull(inputStreams);
083
084    final ArrayList<InputStream> streamList =
085         new ArrayList<InputStream>(inputStreams);
086    streamIterator = streamList.iterator();
087    activeInputStream = null;
088  }
089
090
091
092  /**
093   * Creates a new aggregate input stream that will read data from the specified
094   * files.
095   *
096   * @param  files  The set of files to be read by this aggregate input stream.
097   *                It must not be {@code null}.
098   *
099   * @throws  IOException  If a problem is encountered while attempting to
100   *                       create input streams for the provided files.
101   */
102  public AggregateInputStream(final File... files)
103         throws IOException
104  {
105    this(false, files);
106  }
107
108
109
110  /**
111   * Creates a new aggregate input stream that will read data from the specified
112   * files.
113   *
114   * @param  ensureBlankLinesBetweenFiles  Indicates whether to ensure that
115   *                                       there is at least one completely
116   *                                       blank line between files.  This may
117   *                                       be useful when blank lines are
118   *                                       used as delimiters (for example, when
119   *                                       reading LDIF data), there is a chance
120   *                                       that the files may not end with blank
121   *                                       lines, and the inclusion of extra
122   *                                       blank lines between files will not
123   *                                       cause any harm.
124   * @param  files                         The set of files to be read by this
125   *                                       aggregate input stream.  It must not
126   *                                       be {@code null}.
127   *
128   * @throws  IOException  If a problem is encountered while attempting to
129   *                       create input streams for the provided files.
130   */
131  public AggregateInputStream(final boolean ensureBlankLinesBetweenFiles,
132                              final File... files)
133         throws IOException
134  {
135    Validator.ensureNotNull(files);
136
137    final ArrayList<InputStream> streamList = new ArrayList<>(2 * files.length);
138
139    IOException ioException = null;
140    for (final File f : files)
141    {
142      if (ensureBlankLinesBetweenFiles && (! streamList.isEmpty()))
143      {
144        final ByteStringBuffer buffer = new ByteStringBuffer(4);
145        buffer.append(StaticUtils.EOL_BYTES);
146        buffer.append(StaticUtils.EOL_BYTES);
147        streamList.add(new ByteArrayInputStream(buffer.toByteArray()));
148      }
149
150      try
151      {
152        streamList.add(new FileInputStream(f));
153      }
154      catch (final IOException ioe)
155      {
156        Debug.debugException(ioe);
157        ioException = ioe;
158        break;
159      }
160    }
161
162    if (ioException != null)
163    {
164      for (final InputStream s : streamList)
165      {
166        if (s != null)
167        {
168          try
169          {
170            s.close();
171          }
172          catch (final Exception e)
173          {
174            Debug.debugException(e);
175          }
176        }
177      }
178
179      throw ioException;
180    }
181
182    streamIterator = streamList.iterator();
183    activeInputStream = null;
184  }
185
186
187
188  /**
189   * Reads the next byte of data from the current active input stream, switching
190   * to the next input stream in the set if appropriate.
191   *
192   * @return  The next byte of data that was read, or -1 if all streams have
193   *          been exhausted.
194   *
195   * @throws  IOException  If a problem is encountered while attempting to read
196   *                       data from an input stream.
197   */
198  @Override()
199  public int read()
200         throws IOException
201  {
202    while (true)
203    {
204      if (activeInputStream == null)
205      {
206        if (streamIterator.hasNext())
207        {
208          activeInputStream = streamIterator.next();
209          continue;
210        }
211        else
212        {
213          return -1;
214        }
215      }
216
217      final int byteRead = activeInputStream.read();
218      if (byteRead < 0)
219      {
220        activeInputStream.close();
221        activeInputStream = null;
222      }
223      else
224      {
225        return byteRead;
226      }
227    }
228  }
229
230
231
232  /**
233   * Reads data from the current active input stream into the provided array,
234   * switching to the next input stream in the set if appropriate.
235   *
236   * @param  b  The array into which the data read should be placed, starting
237   *            with an index of zero.  It must not be {@code null}.
238   *
239   * @return  The number of bytes read into the array, or -1 if all streams have
240   *          been exhausted.
241   *
242   * @throws  IOException  If a problem is encountered while attempting to read
243   *                       data from an input stream.
244   */
245  @Override()
246  public int read(final byte[] b)
247         throws IOException
248  {
249    return read(b, 0, b.length);
250  }
251
252
253
254  /**
255   * Reads data from the current active input stream into the provided array,
256   * switching to the next input stream in the set if appropriate.
257   *
258   * @param  b    The array into which the data read should be placed.  It must
259   *              not be {@code null}.
260   * @param  off  The position in the array at which to start writing data.
261   * @param  len  The maximum number of bytes that may be read.
262   *
263   * @return  The number of bytes read into the array, or -1 if all streams have
264   *          been exhausted.
265   *
266   * @throws  IOException  If a problem is encountered while attempting to read
267   *                       data from an input stream.
268   */
269  @Override()
270  public int read(final byte[] b, final int off, final int len)
271         throws IOException
272  {
273    while (true)
274    {
275      if (activeInputStream == null)
276      {
277        if (streamIterator.hasNext())
278        {
279          activeInputStream = streamIterator.next();
280          continue;
281        }
282        else
283        {
284          return -1;
285        }
286      }
287
288      final int bytesRead = activeInputStream.read(b, off, len);
289      if (bytesRead < 0)
290      {
291        activeInputStream.close();
292        activeInputStream = null;
293      }
294      else
295      {
296        return bytesRead;
297      }
298    }
299  }
300
301
302
303  /**
304   * Attempts to skip and discard up to the specified number of bytes from the
305   * input stream.
306   *
307   * @param  n  The number of bytes to attempt to skip.
308   *
309   * @return  The number of bytes actually skipped.
310   *
311   * @throws  IOException  If a problem is encountered while attempting to skip
312   *                       data from the input stream.
313   */
314  @Override()
315  public long skip(final long n)
316         throws IOException
317  {
318    if (activeInputStream == null)
319    {
320      if (streamIterator.hasNext())
321      {
322        activeInputStream = streamIterator.next();
323        return activeInputStream.skip(n);
324      }
325      else
326      {
327        return 0L;
328      }
329    }
330    else
331    {
332      return activeInputStream.skip(n);
333    }
334  }
335
336
337
338  /**
339   * Retrieves an estimate of the number of bytes that can be read without
340   * blocking.
341   *
342   * @return  An estimate of the number of bytes that can be read without
343   *          blocking.
344   *
345   * @throws  IOException  If a problem is encountered while attempting to make
346   *                       the determination.
347   */
348  @Override()
349  public int available()
350         throws IOException
351  {
352    if (activeInputStream == null)
353    {
354      if (streamIterator.hasNext())
355      {
356        activeInputStream = streamIterator.next();
357        return activeInputStream.available();
358      }
359      else
360      {
361        return 0;
362      }
363    }
364    else
365    {
366      return activeInputStream.available();
367    }
368  }
369
370
371
372  /**
373   * Indicates whether this input stream supports the use of the {@code mark}
374   * and {@code reset} methods.  This implementation does not support that
375   * capability.
376   *
377   * @return  {@code false} to indicate that this input stream implementation
378   *          does not support the use of {@code mark} and {@code reset}.
379   */
380  @Override()
381  public boolean markSupported()
382  {
383    return false;
384  }
385
386
387
388  /**
389   * Marks the current position in the input stream.  This input stream does not
390   * support this functionality, so no action will be taken.
391   *
392   * @param  readLimit  The maximum number of bytes that the caller may wish to
393   *                    read before being able to reset the stream.
394   */
395  @Override()
396  public void mark(final int readLimit)
397  {
398    // No implementation is required.
399  }
400
401
402
403  /**
404   * Attempts to reset the position of this input stream to the mark location.
405   * This implementation does not support {@code mark} and {@code reset}
406   * functionality, so this method will always throw an exception.
407   *
408   * @throws  IOException  To indicate that reset is not supported.
409   */
410  @Override()
411  public void reset()
412         throws IOException
413  {
414    throw new IOException(ERR_AGGREGATE_INPUT_STREAM_MARK_NOT_SUPPORTED.get());
415  }
416
417
418
419  /**
420   * Closes this input stream.  All associated input streams will be closed.
421   *
422   * @throws  IOException  If an exception was encountered while attempting to
423   *                       close any of the associated streams.  Note that even
424   *                       if an exception is encountered, an attempt will be
425   *                       made to close all streams.
426   */
427  @Override()
428  public void close()
429         throws IOException
430  {
431    IOException firstException = null;
432
433    if (activeInputStream != null)
434    {
435      try
436      {
437        activeInputStream.close();
438      }
439      catch (final IOException ioe)
440      {
441        Debug.debugException(ioe);
442        firstException = ioe;
443      }
444      activeInputStream = null;
445    }
446
447    while (streamIterator.hasNext())
448    {
449      final InputStream s = streamIterator.next();
450      try
451      {
452        s.close();
453      }
454      catch (final IOException ioe)
455      {
456        Debug.debugException(ioe);
457        if (firstException == null)
458        {
459          firstException = ioe;
460        }
461      }
462    }
463
464    if (firstException != null)
465    {
466      throw firstException;
467    }
468  }
469}