csv_async/async_readers/
ardr_futures.rs

1use futures::io;
2
3use crate::AsyncReaderBuilder;
4use crate::byte_record::{ByteRecord, Position};
5use crate::error::Result;
6use crate::string_record::StringRecord;
7use super::{
8    AsyncReaderImpl,
9    StringRecordsStream, StringRecordsIntoStream,
10    ByteRecordsStream, ByteRecordsIntoStream,
11};
12
13
14impl AsyncReaderBuilder {
15    /// Build a CSV reader from this configuration that reads data from `rdr`.
16    ///
17    /// Note that the CSV reader is buffered automatically, so you should not
18    /// wrap `rdr` in a buffered reader.
19    ///
20    /// # Example
21    ///
22    /// ```
23    /// use std::error::Error;
24    /// use futures::stream::StreamExt;
25    /// use csv_async::AsyncReaderBuilder;
26    ///
27    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
28    /// async fn example() -> Result<(), Box<dyn Error>> {
29    ///     let data = "\
30    /// city,country,pop
31    /// Boston,United States,4628910
32    /// Concord,United States,42695
33    /// ";
34    ///     let mut rdr = AsyncReaderBuilder::new().create_reader(data.as_bytes());
35    ///     let mut records = rdr.into_records();
36    ///     while let Some(record) = records.next().await {
37    ///         println!("{:?}", record?);
38    ///     }
39    ///     Ok(())
40    /// }
41    /// ```
42    pub fn create_reader<R: io::AsyncRead + Unpin + Send>(&self, rdr: R) -> AsyncReader<R> {
43        AsyncReader::new(self, rdr)
44    }
45}
46
47/// A already configured CSV reader.
48///
49/// A CSV reader takes as input CSV data and transforms that into standard Rust
50/// values. The reader reads CSV data is as a sequence of records,
51/// where a record is a sequence of fields and each field is a string.
52///
53/// # Configuration
54///
55/// A CSV reader has convenient constructor method `from_reader`.
56/// However, if you want to configure the CSV reader to use
57/// a different delimiter or quote character (among many other things), then
58/// you should use a [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html) to construct
59/// a `AsyncReader`. For example, to change the field delimiter:
60///
61/// ```
62/// use std::error::Error;
63/// use futures::stream::StreamExt;
64/// use csv_async::AsyncReaderBuilder;
65///
66/// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
67/// async fn example() -> Result<(), Box<dyn Error>> {
68///     let data = "\
69/// city;country;pop
70/// Boston;United States;4628910
71/// ";
72///     let mut rdr = AsyncReaderBuilder::new()
73///         .delimiter(b';')
74///         .create_reader(data.as_bytes());
75///
76///     let mut records = rdr.records();
77///     assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
78///     Ok(())
79/// }
80/// ```
81///
82/// # Error handling
83///
84/// In general, CSV *parsing* does not ever return an error. That is, there is
85/// no such thing as malformed CSV data. Instead, this reader will prioritize
86/// finding a parse over rejecting CSV data that it does not understand. This
87/// choice was inspired by other popular CSV parsers, but also because it is
88/// pragmatic. CSV data varies wildly, so even if the CSV data is malformed,
89/// it might still be possible to work with the data. In the land of CSV, there
90/// is no "right" or "wrong," only "right" and "less right."
91///
92/// With that said, a number of errors can occur while reading CSV data:
93///
94/// * By default, all records in CSV data must have the same number of fields.
95///   If a record is found with a different number of fields than a prior
96///   record, then an error is returned. This behavior can be disabled by
97///   enabling flexible parsing via the `flexible` method on
98///   [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html).
99/// * When reading CSV data from a resource (like a file), it is possible for
100///   reading from the underlying resource to fail. This will return an error.
101///   For subsequent calls to the reader after encountering a such error
102///   (unless `seek` is used), it will behave as if end of file had been
103///   reached, in order to avoid running into infinite loops when still
104///   attempting to read the next record when one has errored.
105/// * When reading CSV data into `String` or `&str` fields (e.g., via a
106///   [`StringRecord`](struct.StringRecord.html)), UTF-8 is strictly
107///   enforced. If CSV data is invalid UTF-8, then an error is returned. If
108///   you want to read invalid UTF-8, then you should use the byte oriented
109///   APIs such as [`ByteRecord`](struct.ByteRecord.html). If you need explicit
110///   support for another encoding entirely, then you'll need to use another
111///   crate to transcode your CSV data to UTF-8 before parsing it.
112/// * When using Serde to deserialize CSV data into Rust types, it is possible
113///   for a number of additional errors to occur. For example, deserializing
114///   a field `xyz` into an `i32` field will result in an error.
115///
116/// For more details on the precise semantics of errors, see the
117/// [`Error`](enum.Error.html) type.
118#[derive(Debug)]
119pub struct AsyncReader<R>(AsyncReaderImpl<R>);
120
121impl<'r, R> AsyncReader<R>
122where
123    R: io::AsyncRead + Unpin + Send + 'r,
124{
125    /// Create a new CSV reader given a builder and a source of underlying
126    /// bytes.
127    fn new(builder: &AsyncReaderBuilder, rdr: R) -> AsyncReader<R> {
128        AsyncReader(AsyncReaderImpl::new(builder, rdr))
129    }
130
131    /// Create a new CSV parser with a default configuration for the given
132    /// reader.
133    ///
134    /// To customize CSV parsing, use a `ReaderBuilder`.
135    ///
136    /// # Example
137    ///
138    /// ```
139    /// use std::error::Error;
140    /// use futures::stream::StreamExt;
141    /// use csv_async::AsyncReader;
142    ///
143    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
144    /// async fn example() -> Result<(), Box<dyn Error>> {
145    ///     let data = "\
146    /// city,country,pop
147    /// Boston,United States,4628910
148    /// Concord,United States,42695
149    /// ";
150    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
151    ///     let mut records = rdr.into_records();
152    ///     while let Some(record) = records.next().await {
153    ///         println!("{:?}", record?);
154    ///     }
155    ///     Ok(())
156    /// }
157    /// ```
158    #[inline]
159    pub fn from_reader(rdr: R) -> AsyncReader<R> {
160        AsyncReaderBuilder::new().create_reader(rdr)
161    }
162
163    /// Returns a borrowed iterator over all records as strings.
164    ///
165    /// Each item yielded by this iterator is a `Result<StringRecord, Error>`.
166    /// Therefore, in order to access the record, callers must handle the
167    /// possibility of error (typically with `try!` or `?`).
168    ///
169    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
170    /// default), then this does not include the first record.
171    ///
172    /// # Example
173    ///
174    /// ```
175    /// use std::error::Error;
176    /// use futures::stream::StreamExt;
177    /// use csv_async::AsyncReader;
178    ///
179    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
180    /// async fn example() -> Result<(), Box<dyn Error>> {
181    ///     let data = "\
182    /// city,country,pop
183    /// Boston,United States,4628910
184    /// ";
185    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
186    ///     let mut records = rdr.records();
187    ///     while let Some(record) = records.next().await {
188    ///         println!("{:?}", record?);
189    ///     }
190    ///     Ok(())
191    /// }
192    /// ```
193    #[inline]
194    pub fn records(&mut self) -> StringRecordsStream<R> {
195        StringRecordsStream::new(&mut self.0)
196    }
197
198    /// Returns an owned iterator over all records as strings.
199    ///
200    /// Each item yielded by this iterator is a `Result<StringRecord, Error>`.
201    /// Therefore, in order to access the record, callers must handle the
202    /// possibility of error (typically with `try!` or `?`).
203    ///
204    /// This is mostly useful when you want to return a CSV iterator or store
205    /// it somewhere.
206    ///
207    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
208    /// default), then this does not include the first record.
209    ///
210    /// # Example
211    ///
212    /// ```
213    /// use std::error::Error;
214    /// use futures::stream::StreamExt;
215    /// use csv_async::AsyncReader;
216    ///
217    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
218    /// async fn example() -> Result<(), Box<dyn Error>> {
219    ///     let data = "\
220    /// city,country,pop
221    /// Boston,United States,4628910
222    /// ";
223    ///     let rdr = AsyncReader::from_reader(data.as_bytes());
224    ///     let mut records = rdr.into_records();
225    ///     while let Some(record) = records.next().await {
226    ///         println!("{:?}", record?);
227    ///     }
228    ///     Ok(())
229    /// }
230    /// ```
231    #[inline]
232    pub fn into_records(self) -> StringRecordsIntoStream<'r, R> {
233        StringRecordsIntoStream::new(self.0)
234    }
235
236    /// Returns a borrowed iterator over all records as raw bytes.
237    ///
238    /// Each item yielded by this iterator is a `Result<ByteRecord, Error>`.
239    /// Therefore, in order to access the record, callers must handle the
240    /// possibility of error (typically with `try!` or `?`).
241    ///
242    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
243    /// default), then this does not include the first record.
244    ///
245    /// # Example
246    ///
247    /// ```
248    /// use std::error::Error;
249    /// use futures::stream::StreamExt;
250    /// use csv_async::AsyncReader;
251    ///
252    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
253    /// async fn example() -> Result<(), Box<dyn Error>> {
254    ///     let data = "\
255    /// city,country,pop
256    /// Boston,United States,4628910
257    /// ";
258    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
259    ///     let mut iter = rdr.byte_records();
260    ///     assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
261    ///     assert!(iter.next().await.is_none());
262    ///     Ok(())
263    /// }
264    /// ```
265    #[inline]
266    pub fn byte_records(&mut self) -> ByteRecordsStream<R> {
267        ByteRecordsStream::new(&mut self.0)
268    }
269
270    /// Returns an owned iterator over all records as raw bytes.
271    ///
272    /// Each item yielded by this iterator is a `Result<ByteRecord, Error>`.
273    /// Therefore, in order to access the record, callers must handle the
274    /// possibility of error (typically with `try!` or `?`).
275    ///
276    /// This is mostly useful when you want to return a CSV iterator or store
277    /// it somewhere.
278    ///
279    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
280    /// default), then this does not include the first record.
281    ///
282    /// # Example
283    ///
284    /// ```
285    /// use std::error::Error;
286    /// use futures::stream::StreamExt;
287    /// use csv_async::AsyncReader;
288    ///
289    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
290    /// async fn example() -> Result<(), Box<dyn Error>> {
291    ///     let data = "\
292    /// city,country,pop
293    /// Boston,United States,4628910
294    /// ";
295    ///     let rdr = AsyncReader::from_reader(data.as_bytes());
296    ///     let mut iter = rdr.into_byte_records();
297    ///     assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
298    ///     assert!(iter.next().await.is_none());
299    ///     Ok(())
300    /// }
301    /// ```
302    #[inline]
303    pub fn into_byte_records(self) -> ByteRecordsIntoStream<'r, R> {
304        ByteRecordsIntoStream::new(self.0)
305    }
306
307    /// Returns a reference to the first row read by this parser.
308    ///
309    /// If no row has been read yet, then this will force parsing of the first
310    /// row.
311    ///
312    /// If there was a problem parsing the row or if it wasn't valid UTF-8,
313    /// then this returns an error.
314    ///
315    /// If the underlying reader emits EOF before any data, then this returns
316    /// an empty record.
317    ///
318    /// Note that this method may be used regardless of whether `has_headers`
319    /// was enabled (but it is enabled by default).
320    ///
321    /// # Example
322    ///
323    /// This example shows how to get the header row of CSV data. Notice that
324    /// the header row does not appear as a record in the iterator!
325    ///
326    /// ```
327    /// use std::error::Error;
328    /// use futures::stream::StreamExt;
329    /// use csv_async::AsyncReader;
330    ///
331    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
332    /// async fn example() -> Result<(), Box<dyn Error>> {
333    ///     let data = "\
334    /// city,country,pop
335    /// Boston,United States,4628910
336    /// ";
337    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
338    ///
339    ///     // We can read the headers before iterating.
340    ///     {
341    ///     // `headers` borrows from the reader, so we put this in its
342    ///     // own scope. That way, the borrow ends before we try iterating
343    ///     // below. Alternatively, we could clone the headers.
344    ///     let headers = rdr.headers().await?;
345    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
346    ///     }
347    ///
348    ///     {
349    ///     let mut records = rdr.records();
350    ///     assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
351    ///     assert!(records.next().await.is_none());
352    ///     }
353    ///
354    ///     // We can also read the headers after iterating.
355    ///     let headers = rdr.headers().await?;
356    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
357    ///     Ok(())
358    /// }
359    /// ```
360    #[inline]
361    pub async fn headers(&mut self) -> Result<&StringRecord> {
362        self.0.headers().await
363    }
364
365    /// Returns a reference to the first row read by this parser as raw bytes.
366    ///
367    /// If no row has been read yet, then this will force parsing of the first
368    /// row.
369    ///
370    /// If there was a problem parsing the row then this returns an error.
371    ///
372    /// If the underlying reader emits EOF before any data, then this returns
373    /// an empty record.
374    ///
375    /// Note that this method may be used regardless of whether `has_headers`
376    /// was enabled (but it is enabled by default).
377    ///
378    /// # Example
379    ///
380    /// This example shows how to get the header row of CSV data. Notice that
381    /// the header row does not appear as a record in the iterator!
382    ///
383    /// ```
384    /// use std::error::Error;
385    /// use futures::stream::StreamExt;
386    /// use csv_async::AsyncReader;
387    ///
388    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
389    /// async fn example() -> Result<(), Box<dyn Error>> {
390    ///     let data = "\
391    /// city,country,pop
392    /// Boston,United States,4628910
393    /// ";
394    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
395    ///
396    ///     // We can read the headers before iterating.
397    ///     {
398    ///     // `headers` borrows from the reader, so we put this in its
399    ///     // own scope. That way, the borrow ends before we try iterating
400    ///     // below. Alternatively, we could clone the headers.
401    ///     let headers = rdr.byte_headers().await?;
402    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
403    ///     }
404    ///
405    ///     {
406    ///     let mut records = rdr.byte_records();
407    ///     assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
408    ///     assert!(records.next().await.is_none());
409    ///     }
410    ///
411    ///     // We can also read the headers after iterating.
412    ///     let headers = rdr.byte_headers().await?;
413    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
414    ///     Ok(())
415    /// }
416    /// ```
417    #[inline]
418    pub async fn byte_headers(&mut self) -> Result<&ByteRecord> {
419        self.0.byte_headers().await
420    }
421
422    /// Set the headers of this CSV parser manually.
423    ///
424    /// This overrides any other setting (including `set_byte_headers`). Any
425    /// automatic detection of headers is disabled. This may be called at any
426    /// time.
427    ///
428    /// # Example
429    ///
430    /// ```
431    /// use std::error::Error;
432    /// use csv_async::{AsyncReader, StringRecord};
433    ///
434    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
435    /// async fn example() -> Result<(), Box<dyn Error>> {
436    ///     let data = "\
437    /// city,country,pop
438    /// Boston,United States,4628910
439    /// ";
440    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
441    ///
442    ///     assert_eq!(rdr.headers().await?, vec!["city", "country", "pop"]);
443    ///     rdr.set_headers(StringRecord::from(vec!["a", "b", "c"]));
444    ///     assert_eq!(rdr.headers().await?, vec!["a", "b", "c"]);
445    ///
446    ///     Ok(())
447    /// }
448    /// ```
449    #[inline]
450    pub fn set_headers(&mut self, headers: StringRecord) {
451        self.0.set_headers(headers);
452    }
453
454    /// Set the headers of this CSV parser manually as raw bytes.
455    ///
456    /// This overrides any other setting (including `set_headers`). Any
457    /// automatic detection of headers is disabled. This may be called at any
458    /// time.
459    ///
460    /// # Example
461    ///
462    /// ```
463    /// use std::error::Error;
464    /// use csv_async::{AsyncReader, ByteRecord};
465    ///
466    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
467    /// async fn example() -> Result<(), Box<dyn Error>> {
468    ///     let data = "\
469    /// city,country,pop
470    /// Boston,United States,4628910
471    /// ";
472    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
473    ///
474    ///     assert_eq!(rdr.byte_headers().await?, vec!["city", "country", "pop"]);
475    ///     rdr.set_byte_headers(ByteRecord::from(vec!["a", "b", "c"]));
476    ///     assert_eq!(rdr.byte_headers().await?, vec!["a", "b", "c"]);
477    ///
478    ///     Ok(())
479    /// }
480    /// ```
481    #[inline]
482    pub fn set_byte_headers(&mut self, headers: ByteRecord) {
483        self.0.set_byte_headers(headers);
484    }
485
486    /// Read a single row into the given record. Returns false when no more
487    /// records could be read.
488    ///
489    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
490    /// default), then this will treat initial row as headers and read the first data record.
491    ///
492    /// This method is useful when you want to read records as fast as
493    /// as possible. It's less ergonomic than an iterator, but it permits the
494    /// caller to reuse the `StringRecord` allocation, which usually results
495    /// in higher throughput.
496    ///
497    /// Records read via this method are guaranteed to have a position set
498    /// on them, even if the reader is at EOF or if an error is returned.
499    ///
500    /// # Example
501    ///
502    /// ```
503    /// use std::error::Error;
504    /// use csv_async::{AsyncReader, StringRecord};
505    ///
506    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
507    /// async fn example() -> Result<(), Box<dyn Error>> {
508    ///     let data = "\
509    /// city,country,pop
510    /// Boston,United States,4628910
511    /// ";
512    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
513    ///     let mut record = StringRecord::new();
514    ///
515    ///     if rdr.read_record(&mut record).await? {
516    ///         assert_eq!(record, vec!["Boston", "United States", "4628910"]);
517    ///         Ok(())
518    ///     } else {
519    ///         Err(From::from("expected at least one record but got none"))
520    ///     }
521    /// }
522    /// ```
523    #[inline]
524    pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool> {
525        self.0.read_record(record).await
526    }
527
528    /// Read a single row into the given byte record. Returns false when no
529    /// more records could be read.
530    ///
531    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
532    /// default), then this will treat initial row as headers and read the first data record.
533    ///
534    /// This method is useful when you want to read records as fast as
535    /// as possible. It's less ergonomic than an iterator, but it permits the
536    /// caller to reuse the `ByteRecord` allocation, which usually results
537    /// in higher throughput.
538    ///
539    /// Records read via this method are guaranteed to have a position set
540    /// on them, even if the reader is at EOF or if an error is returned.
541    ///
542    /// # Example
543    ///
544    /// ```
545    /// use std::error::Error;
546    /// use csv_async::{ByteRecord, AsyncReader};
547    ///
548    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
549    /// async fn example() -> Result<(), Box<dyn Error>> {
550    ///     let data = "\
551    /// city,country,pop
552    /// Boston,United States,4628910
553    /// ";
554    ///     let mut rdr = AsyncReader::from_reader(data.as_bytes());
555    ///     let mut record = ByteRecord::new();
556    ///
557    ///     if rdr.read_byte_record(&mut record).await? {
558    ///         assert_eq!(record, vec!["Boston", "United States", "4628910"]);
559    ///         Ok(())
560    ///     } else {
561    ///         Err(From::from("expected at least one record but got none"))
562    ///     }
563    /// }
564    /// ```
565    #[inline]
566    pub async fn read_byte_record(&mut self, record: &mut ByteRecord) -> Result<bool> {
567        self.0.read_byte_record(record).await
568    }
569
570    /// Return the current position of this CSV reader.
571    ///
572    /// The byte offset in the position returned can be used to `seek` this
573    /// reader. In particular, seeking to a position returned here on the same
574    /// data will result in parsing the same subsequent record.
575    ///
576    /// # Example: reading the position
577    ///
578    /// ```
579    /// use std::error::Error;
580    /// use futures::io;
581    /// use futures::stream::StreamExt;
582    /// use csv_async::{AsyncReader, Position};
583    ///
584    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
585    /// async fn example() -> Result<(), Box<dyn Error>> {
586    ///     let data = "\
587    /// city,country,popcount
588    /// Boston,United States,4628910
589    /// Concord,United States,42695
590    /// ";
591    ///     let rdr = AsyncReader::from_reader(io::Cursor::new(data));
592    ///     let mut iter = rdr.into_records();
593    ///     let mut pos = Position::new();
594    ///     loop {
595    ///         let next = iter.next().await;
596    ///         if let Some(next) = next {
597    ///             pos = next?.position().expect("Cursor should be at some valid position").clone();
598    ///         } else {
599    ///             break;
600    ///         }
601    ///     }
602    ///
603    ///     // `pos` should now be the position immediately before the last
604    ///     // record.
605    ///     assert_eq!(pos.byte(), 51);
606    ///     assert_eq!(pos.line(), 3);
607    ///     assert_eq!(pos.record(), 2);
608    ///     Ok(())
609    /// }
610    /// ```
611    #[inline]
612    pub fn position(&self) -> &Position {
613        self.0.position()
614    }
615
616    /// Returns true if and only if this reader has been exhausted.
617    ///
618    /// When this returns true, no more records can be read from this reader
619    /// (unless it has been seeked to another position).
620    ///
621    /// # Example
622    ///
623    /// ```
624    /// use std::error::Error;
625    /// use futures::io;
626    /// use futures::stream::StreamExt;
627    /// use csv_async::{AsyncReader, Position};
628    ///
629    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
630    /// async fn example() -> Result<(), Box<dyn Error>> {
631    ///     let data = "\
632    /// city,country,popcount
633    /// Boston,United States,4628910
634    /// Concord,United States,42695
635    /// ";
636    ///     let mut rdr = AsyncReader::from_reader(io::Cursor::new(data));
637    ///     assert!(!rdr.is_done());
638    ///     {
639    ///         let mut records = rdr.records();
640    ///         while let Some(record) = records.next().await {
641    ///             let _ = record?;
642    ///         }
643    ///     }
644    ///     assert!(rdr.is_done());
645    ///     Ok(())
646    /// }
647    /// ```
648    #[inline]
649    pub fn is_done(&self) -> bool {
650        self.0.is_done()
651    }
652
653    /// Returns true if and only if this reader has been configured to
654    /// interpret the first record as a header record.
655    #[inline]
656    pub fn has_headers(&self) -> bool {
657        self.0.has_headers()
658    }
659
660    /// Returns a reference to the underlying reader.
661    #[inline]
662    pub fn get_ref(&self) -> &R {
663        self.0.get_ref()
664    }
665
666    /// Returns a mutable reference to the underlying reader.
667    #[inline]
668    pub fn get_mut(&mut self) -> &mut R {
669        self.0.get_mut()
670    }
671
672    /// Unwraps this CSV reader, returning the underlying reader.
673    ///
674    /// Note that any leftover data inside this reader's internal buffer is
675    /// lost.
676    #[inline]
677    pub fn into_inner(self) -> R {
678        self.0.into_inner()
679    }
680}
681
682impl<R: io::AsyncRead + io::AsyncSeek + std::marker::Unpin> AsyncReader<R> {
683    /// Seeks the underlying reader to the position given.
684    ///
685    /// This comes with a few caveats:
686    ///
687    /// * Any internal buffer associated with this reader is cleared.
688    /// * If the given position does not correspond to a position immediately
689    ///   before the start of a record, then the behavior of this reader is
690    ///   unspecified.
691    /// * Any special logic that skips the first record in the CSV reader
692    ///   when reading or iterating over records is disabled.
693    ///
694    /// If the given position has a byte offset equivalent to the current
695    /// position, then no seeking is performed.
696    ///
697    /// If the header row has not already been read, then this will attempt
698    /// to read the header row before seeking. Therefore, it is possible that
699    /// this returns an error associated with reading CSV data.
700    ///
701    /// Note that seeking is performed based only on the byte offset in the
702    /// given position. Namely, the record or line numbers in the position may
703    /// be incorrect, but this will cause any future position generated by
704    /// this CSV reader to be similarly incorrect.
705    ///
706    /// # Example: seek to parse a record twice
707    ///
708    /// ```
709    /// use std::error::Error;
710    /// use futures::io;
711    /// use futures::stream::StreamExt;
712    /// use csv_async::{AsyncReader, Position};
713    ///
714    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
715    /// async fn example() -> Result<(), Box<dyn Error>> {
716    ///     let data = "\
717    /// city,country,popcount
718    /// Boston,United States,4628910
719    /// Concord,United States,42695
720    /// ";
721    ///     let mut rdr = AsyncReader::from_reader(io::Cursor::new(data));
722    ///     let mut pos = Position::new();
723    ///     {
724    ///     let mut records = rdr.records();
725    ///     loop {
726    ///         let next = records.next().await;
727    ///         if let Some(next) = next {
728    ///             pos = next?.position().expect("Cursor should be at some valid position").clone();
729    ///         } else {
730    ///             break;
731    ///         }
732    ///     }
733    ///     }
734    ///
735    ///     {
736    ///     // Now seek the reader back to `pos`. This will let us read the
737    ///     // last record again.
738    ///     rdr.seek(pos).await?;
739    ///     let mut records = rdr.into_records();
740    ///     if let Some(result) = records.next().await {
741    ///         let record = result?;
742    ///         assert_eq!(record, vec!["Concord", "United States", "42695"]);
743    ///         Ok(())
744    ///     } else {
745    ///         Err(From::from("expected at least one record but got none"))
746    ///     }
747    ///     }
748    /// }
749    /// ```
750    #[inline]
751    pub async fn seek(&mut self, pos: Position) -> Result<()> {
752        self.0.seek(pos).await
753    }
754
755    /// This is like `seek`, but provides direct control over how the seeking
756    /// operation is performed via `io::SeekFrom`.
757    ///
758    /// The `pos` position given *should* correspond the position indicated
759    /// by `seek_from`, but there is no requirement. If the `pos` position
760    /// given is incorrect, then the position information returned by this
761    /// reader will be similarly incorrect.
762    ///
763    /// If the header row has not already been read, then this will attempt
764    /// to read the header row before seeking. Therefore, it is possible that
765    /// this returns an error associated with reading CSV data.
766    ///
767    /// Unlike `seek`, this will always cause an actual seek to be performed.
768    #[inline]
769    pub async fn seek_raw(
770        &mut self,
771        seek_from: io::SeekFrom,
772        pos: Position,
773    ) -> Result<()> {
774        self.0.seek_raw(seek_from, pos).await
775    }
776
777    /// Rewinds the underlying reader to first data record.
778    ///
779    /// Function is aware of header presence.
780    /// After `rewind` record iterators will return first data record (skipping header if present), while
781    /// after `seek(0)` they will return header row (even if `has_header` is set).
782    /// 
783    /// # Example: Reads the same data multiply times
784    ///
785    /// ```
786    /// use std::error::Error;
787    /// use futures::io;
788    /// use futures::stream::StreamExt;
789    /// use csv_async::AsyncReader;
790    ///
791    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
792    /// async fn example() -> Result<(), Box<dyn Error>> {
793    ///     let data = "\
794    /// city,country,popcount
795    /// Boston,United States,4628910
796    /// Concord,United States,42695
797    /// ";
798    ///     let mut rdr = AsyncReader::from_reader(io::Cursor::new(data));
799    ///     let mut output = Vec::new();
800    ///     loop {
801    ///         let mut records = rdr.records();
802    ///         while let Some(rec) = records.next().await {
803    ///             output.push(rec?);
804    ///         }
805    ///         if output.len() >= 6 {
806    ///             break;
807    ///         } else {
808    ///             drop(records);
809    ///             rdr.rewind().await?;
810    ///         }
811    ///     }
812    ///     assert_eq!(output,
813    ///         vec![
814    ///             vec!["Boston", "United States", "4628910"],
815    ///             vec!["Concord", "United States", "42695"],
816    ///             vec!["Boston", "United States", "4628910"],
817    ///             vec!["Concord", "United States", "42695"],
818    ///             vec!["Boston", "United States", "4628910"],
819    ///             vec!["Concord", "United States", "42695"],
820    ///         ]);
821    ///     Ok(())
822    /// }
823    /// ```
824    #[inline]
825    pub async fn rewind(&mut self) -> Result<()> {
826        self.0.rewind().await
827    }
828}
829
830#[cfg(test)]
831mod tests {
832    use std::pin::Pin;
833    use std::task::{Context, Poll};
834
835    use futures::io;
836    use futures::stream::StreamExt;
837    use async_std::task;
838
839    use crate::byte_record::ByteRecord;
840    use crate::error::ErrorKind;
841    use crate::string_record::StringRecord;
842    use crate::Trim;
843
844    use super::{Position, AsyncReaderBuilder, AsyncReader};
845
846    fn b(s: &str) -> &[u8] {
847        s.as_bytes()
848    }
849    fn s(b: &[u8]) -> &str {
850        ::std::str::from_utf8(b).unwrap()
851    }
852
853    fn newpos(byte: u64, line: u64, record: u64) -> Position {
854        let mut p = Position::new();
855        p.set_byte(byte).set_line(line).set_record(record);
856        p
857    }
858
859    async fn count(stream: impl StreamExt) -> usize {
860        stream.fold(0, |acc, _| async move { acc + 1 }).await
861    }
862
863    #[async_std::test]
864    async fn read_byte_record() {
865        let data = b("foo,\"b,ar\",baz\nabc,mno,xyz");
866        let mut rdr =
867            AsyncReaderBuilder::new().has_headers(false).create_reader(data);
868        let mut rec = ByteRecord::new();
869
870        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
871        assert_eq!(3, rec.len());
872        assert_eq!("foo", s(&rec[0]));
873        assert_eq!("b,ar", s(&rec[1]));
874        assert_eq!("baz", s(&rec[2]));
875
876        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
877        assert_eq!(3, rec.len());
878        assert_eq!("abc", s(&rec[0]));
879        assert_eq!("mno", s(&rec[1]));
880        assert_eq!("xyz", s(&rec[2]));
881
882        assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
883    }
884
885    #[async_std::test]
886    async fn read_trimmed_records_and_headers() {
887        let data = b("foo,  bar,\tbaz\n  1,  2,  3\n1\t,\t,3\t\t");
888        let mut rdr = AsyncReaderBuilder::new()
889            .has_headers(true)
890            .trim(Trim::All)
891            .create_reader(data);
892        let mut rec = ByteRecord::new();
893        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
894        assert_eq!("1", s(&rec[0]));
895        assert_eq!("2", s(&rec[1]));
896        assert_eq!("3", s(&rec[2]));
897        let mut rec = StringRecord::new();
898        assert!(rdr.read_record(&mut rec).await.unwrap());
899        assert_eq!("1", &rec[0]);
900        assert_eq!("", &rec[1]);
901        assert_eq!("3", &rec[2]);
902        {
903            let headers = rdr.headers().await.unwrap();
904            assert_eq!(3, headers.len());
905            assert_eq!("foo", &headers[0]);
906            assert_eq!("bar", &headers[1]);
907            assert_eq!("baz", &headers[2]);
908        }
909    }
910
911    #[async_std::test]
912    async fn read_trimmed_header() {
913        let data = b("foo,  bar,\tbaz\n  1,  2,  3\n1\t,\t,3\t\t");
914        let mut rdr = AsyncReaderBuilder::new()
915            .has_headers(true)
916            .trim(Trim::Headers)
917            .create_reader(data);
918        let mut rec = ByteRecord::new();
919        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
920        assert_eq!("  1", s(&rec[0]));
921        assert_eq!("  2", s(&rec[1]));
922        assert_eq!("  3", s(&rec[2]));
923        {
924            let headers = rdr.headers().await.unwrap();
925            assert_eq!(3, headers.len());
926            assert_eq!("foo", &headers[0]);
927            assert_eq!("bar", &headers[1]);
928            assert_eq!("baz", &headers[2]);
929        }
930    }
931
932    #[async_std::test]
933    async fn read_trimed_header_invalid_utf8() {
934        let data = &b"foo,  b\xFFar,\tbaz\na,b,c\nd,e,f"[..];
935        let mut rdr = AsyncReaderBuilder::new()
936            .has_headers(true)
937            .trim(Trim::Headers)
938            .create_reader(data);
939        let mut rec = StringRecord::new();
940
941        // force the headers to be read
942        let _ = rdr.read_record(&mut rec).await;
943        // Check the byte headers are trimmed
944        {
945            let headers = rdr.byte_headers().await.unwrap();
946            assert_eq!(3, headers.len());
947            assert_eq!(b"foo", &headers[0]);
948            assert_eq!(b"b\xFFar", &headers[1]);
949            assert_eq!(b"baz", &headers[2]);
950        }
951        match *rdr.headers().await.unwrap_err().kind() {
952            ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
953                assert_eq!(pos, &newpos(0, 1, 0));
954                assert_eq!(err.field(), 1);
955                assert_eq!(err.valid_up_to(), 3);
956            }
957            ref err => panic!("match failed, got {:?}", err),
958        }
959    }
960
961    #[async_std::test]
962    async fn read_trimmed_records() {
963        let data = b("foo,  bar,\tbaz\n  1,  2,  3\n1\t,\t,3\t\t");
964        let mut rdr = AsyncReaderBuilder::new()
965            .has_headers(true)
966            .trim(Trim::Fields)
967            .create_reader(data);
968        let mut rec = ByteRecord::new();
969        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
970        assert_eq!("1", s(&rec[0]));
971        assert_eq!("2", s(&rec[1]));
972        assert_eq!("3", s(&rec[2]));
973        {
974            let headers = rdr.headers().await.unwrap();
975            assert_eq!(3, headers.len());
976            assert_eq!("foo", &headers[0]);
977            assert_eq!("  bar", &headers[1]);
978            assert_eq!("\tbaz", &headers[2]);
979        }
980    }
981
982    #[async_std::test]
983    async fn read_record_unequal_fails() {
984        let data = b("foo\nbar,baz");
985        let mut rdr =
986            AsyncReaderBuilder::new().has_headers(false).create_reader(data);
987        let mut rec = ByteRecord::new();
988
989        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
990        assert_eq!(1, rec.len());
991        assert_eq!("foo", s(&rec[0]));
992
993        match rdr.read_byte_record(&mut rec).await {
994            Err(err) => match *err.kind() {
995                ErrorKind::UnequalLengths {
996                    expected_len: 1,
997                    ref pos,
998                    len: 2,
999                } => {
1000                    assert_eq!(pos, &Some(newpos(4, 2, 1)));
1001                }
1002                ref wrong => panic!("match failed, got {:?}", wrong),
1003            },
1004            wrong => panic!("match failed, got {:?}", wrong),
1005        }
1006    }
1007
1008    #[async_std::test]
1009    async fn read_record_unequal_ok() {
1010        let data = b("foo\nbar,baz");
1011        let mut rdr = AsyncReaderBuilder::new()
1012            .has_headers(false)
1013            .flexible(true)
1014            .create_reader(data);
1015        let mut rec = ByteRecord::new();
1016
1017        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1018        assert_eq!(1, rec.len());
1019        assert_eq!("foo", s(&rec[0]));
1020
1021        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1022        assert_eq!(2, rec.len());
1023        assert_eq!("bar", s(&rec[0]));
1024        assert_eq!("baz", s(&rec[1]));
1025
1026        assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1027    }
1028
1029    // This tests that even if we get a CSV error, we can continue reading
1030    // if we want.
1031    #[async_std::test]
1032    async fn read_record_unequal_continue() {
1033        let data = b("foo\nbar,baz\nquux");
1034        let mut rdr =
1035            AsyncReaderBuilder::new().has_headers(false).create_reader(data);
1036        let mut rec = ByteRecord::new();
1037
1038        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1039        assert_eq!(1, rec.len());
1040        assert_eq!("foo", s(&rec[0]));
1041
1042        match rdr.read_byte_record(&mut rec).await {
1043            Err(err) => match err.kind() {
1044                &ErrorKind::UnequalLengths {
1045                    expected_len: 1,
1046                    ref pos,
1047                    len: 2,
1048                } => {
1049                    assert_eq!(pos, &Some(newpos(4, 2, 1)));
1050                }
1051                wrong => panic!("match failed, got {:?}", wrong),
1052            },
1053            wrong => panic!("match failed, got {:?}", wrong),
1054        }
1055
1056        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1057        assert_eq!(1, rec.len());
1058        assert_eq!("quux", s(&rec[0]));
1059
1060        assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1061    }
1062
1063    #[async_std::test]
1064    async fn read_record_headers() {
1065        let data = b("foo,bar,baz\na,b,c\nd,e,f");
1066        let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_reader(data);
1067        let mut rec = StringRecord::new();
1068
1069        assert!(rdr.read_record(&mut rec).await.unwrap());
1070        assert_eq!(3, rec.len());
1071        assert_eq!("a", &rec[0]);
1072
1073        assert!(rdr.read_record(&mut rec).await.unwrap());
1074        assert_eq!(3, rec.len());
1075        assert_eq!("d", &rec[0]);
1076
1077        assert!(!rdr.read_record(&mut rec).await.unwrap());
1078
1079        {
1080            let headers = rdr.byte_headers().await.unwrap();
1081            assert_eq!(3, headers.len());
1082            assert_eq!(b"foo", &headers[0]);
1083            assert_eq!(b"bar", &headers[1]);
1084            assert_eq!(b"baz", &headers[2]);
1085        }
1086        {
1087            let headers = rdr.headers().await.unwrap();
1088            assert_eq!(3, headers.len());
1089            assert_eq!("foo", &headers[0]);
1090            assert_eq!("bar", &headers[1]);
1091            assert_eq!("baz", &headers[2]);
1092        }
1093    }
1094
1095    #[async_std::test]
1096    async fn read_record_headers_invalid_utf8() {
1097        let data = &b"foo,b\xFFar,baz\na,b,c\nd,e,f"[..];
1098        let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_reader(data);
1099        let mut rec = StringRecord::new();
1100
1101        assert!(rdr.read_record(&mut rec).await.unwrap());
1102        assert_eq!(3, rec.len());
1103        assert_eq!("a", &rec[0]);
1104
1105        assert!(rdr.read_record(&mut rec).await.unwrap());
1106        assert_eq!(3, rec.len());
1107        assert_eq!("d", &rec[0]);
1108
1109        assert!(!rdr.read_record(&mut rec).await.unwrap());
1110
1111        // Check that we can read the headers as raw bytes, but that
1112        // if we read them as strings, we get an appropriate UTF-8 error.
1113        {
1114            let headers = rdr.byte_headers().await.unwrap();
1115            assert_eq!(3, headers.len());
1116            assert_eq!(b"foo", &headers[0]);
1117            assert_eq!(b"b\xFFar", &headers[1]);
1118            assert_eq!(b"baz", &headers[2]);
1119        }
1120        match *rdr.headers().await.unwrap_err().kind() {
1121            ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
1122                assert_eq!(pos, &newpos(0, 1, 0));
1123                assert_eq!(err.field(), 1);
1124                assert_eq!(err.valid_up_to(), 1);
1125            }
1126            ref err => panic!("match failed, got {:?}", err),
1127        }
1128    }
1129
1130    #[async_std::test]
1131    async fn read_record_no_headers_before() {
1132        let data = b("foo,bar,baz\na,b,c\nd,e,f");
1133        let mut rdr =
1134            AsyncReaderBuilder::new().has_headers(false).create_reader(data);
1135        let mut rec = StringRecord::new();
1136
1137        {
1138            let headers = rdr.headers().await.unwrap();
1139            assert_eq!(3, headers.len());
1140            assert_eq!("foo", &headers[0]);
1141            assert_eq!("bar", &headers[1]);
1142            assert_eq!("baz", &headers[2]);
1143        }
1144
1145        assert!(rdr.read_record(&mut rec).await.unwrap());
1146        assert_eq!(3, rec.len());
1147        assert_eq!("foo", &rec[0]);
1148
1149        assert!(rdr.read_record(&mut rec).await.unwrap());
1150        assert_eq!(3, rec.len());
1151        assert_eq!("a", &rec[0]);
1152
1153        assert!(rdr.read_record(&mut rec).await.unwrap());
1154        assert_eq!(3, rec.len());
1155        assert_eq!("d", &rec[0]);
1156
1157        assert!(!rdr.read_record(&mut rec).await.unwrap());
1158    }
1159
1160    #[async_std::test]
1161    async fn read_record_no_headers_after() {
1162        let data = b("foo,bar,baz\na,b,c\nd,e,f");
1163        let mut rdr =
1164            AsyncReaderBuilder::new().has_headers(false).create_reader(data);
1165        let mut rec = StringRecord::new();
1166
1167        assert!(rdr.read_record(&mut rec).await.unwrap());
1168        assert_eq!(3, rec.len());
1169        assert_eq!("foo", &rec[0]);
1170
1171        assert!(rdr.read_record(&mut rec).await.unwrap());
1172        assert_eq!(3, rec.len());
1173        assert_eq!("a", &rec[0]);
1174
1175        assert!(rdr.read_record(&mut rec).await.unwrap());
1176        assert_eq!(3, rec.len());
1177        assert_eq!("d", &rec[0]);
1178
1179        assert!(!rdr.read_record(&mut rec).await.unwrap());
1180
1181        let headers = rdr.headers().await.unwrap();
1182        assert_eq!(3, headers.len());
1183        assert_eq!("foo", &headers[0]);
1184        assert_eq!("bar", &headers[1]);
1185        assert_eq!("baz", &headers[2]);
1186    }
1187
1188    #[async_std::test]
1189    async fn seek() {
1190        let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1191        let mut rdr = AsyncReaderBuilder::new().create_reader(io::Cursor::new(data));
1192        rdr.seek(newpos(18, 3, 2)).await.unwrap();
1193
1194        let mut rec = StringRecord::new();
1195
1196        assert_eq!(18, rdr.position().byte());
1197        assert!(rdr.read_record(&mut rec).await.unwrap());
1198        assert_eq!(3, rec.len());
1199        assert_eq!("d", &rec[0]);
1200
1201        assert_eq!(24, rdr.position().byte());
1202        assert_eq!(4, rdr.position().line());
1203        assert_eq!(3, rdr.position().record());
1204        assert!(rdr.read_record(&mut rec).await.unwrap());
1205        assert_eq!(3, rec.len());
1206        assert_eq!("g", &rec[0]);
1207
1208        assert!(!rdr.read_record(&mut rec).await.unwrap());
1209    }
1210
1211    // Test that we can read headers after seeking even if the headers weren't
1212    // explicit read before seeking.
1213    #[async_std::test]
1214    async fn seek_headers_after() {
1215        let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1216        let mut rdr = AsyncReaderBuilder::new().create_reader(io::Cursor::new(data));
1217        rdr.seek(newpos(18, 3, 2)).await.unwrap();
1218        assert_eq!(rdr.headers().await.unwrap(), vec!["foo", "bar", "baz"]);
1219    }
1220
1221    // Test that we can read headers after seeking if the headers were read
1222    // before seeking.
1223    #[async_std::test]
1224    async fn seek_headers_before_after() {
1225        let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1226        let mut rdr = AsyncReaderBuilder::new().create_reader(io::Cursor::new(data));
1227        let headers = rdr.headers().await.unwrap().clone();
1228        rdr.seek(newpos(18, 3, 2)).await.unwrap();
1229        assert_eq!(&headers, rdr.headers().await.unwrap());
1230    }
1231
1232    // Test that even if we didn't read headers before seeking, if we seek to
1233    // the current byte offset, then no seeking is done and therefore we can
1234    // still read headers after seeking.
1235    #[async_std::test]
1236    async fn seek_headers_no_actual_seek() {
1237        let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1238        let mut rdr = AsyncReaderBuilder::new().create_reader(io::Cursor::new(data));
1239        rdr.seek(Position::new()).await.unwrap();
1240        assert_eq!("foo", &rdr.headers().await.unwrap()[0]);
1241    }
1242
1243    #[async_std::test]
1244    async fn rewind() {
1245        let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1246        let mut rdr = AsyncReaderBuilder::new().create_reader(io::Cursor::new(data));
1247        // rdr.seek(newpos(18, 3, 2)).await.unwrap();
1248
1249        let mut rec = StringRecord::new();
1250        assert!(rdr.read_record(&mut rec).await.unwrap());
1251        assert_eq!(3, rec.len());
1252        assert_eq!("a", &rec[0]);
1253
1254        // assert_eq!(18, rdr.position().byte());
1255        assert!(rdr.read_record(&mut rec).await.unwrap());
1256        assert_eq!(3, rec.len());
1257        assert_eq!("d", &rec[0]);
1258
1259        rdr.rewind().await.unwrap();
1260
1261        assert!(rdr.read_record(&mut rec).await.unwrap());
1262        assert_eq!(3, rec.len());
1263        assert_eq!("a", &rec[0]);
1264    }
1265
1266    // Test that position info is reported correctly in absence of headers.
1267    #[async_std::test]
1268    async fn positions_no_headers() {
1269        let mut rdr = AsyncReaderBuilder::new()
1270            .has_headers(false)
1271            .create_reader("a,b,c\nx,y,z".as_bytes())
1272            .into_records();
1273
1274        let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
1275        assert_eq!(pos.byte(), 0);
1276        assert_eq!(pos.line(), 1);
1277        assert_eq!(pos.record(), 0);
1278
1279        let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
1280        assert_eq!(pos.byte(), 6);
1281        assert_eq!(pos.line(), 2);
1282        assert_eq!(pos.record(), 1);
1283
1284        // Test that we are at end of stream, and properly signal this.
1285        assert!(rdr.next().await.is_none());
1286        // Testing that we are not panic, trying to pass over end of stream (Issue#22)
1287        assert!(rdr.next().await.is_none());
1288    }
1289
1290    // Test that position info is reported correctly with headers.
1291    #[async_std::test]
1292    async fn positions_headers() {
1293        let mut rdr = AsyncReaderBuilder::new()
1294            .has_headers(true)
1295            .create_reader("a,b,c\nx,y,z".as_bytes())
1296            .into_records();
1297
1298        let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
1299        assert_eq!(pos.byte(), 6);
1300        assert_eq!(pos.line(), 2);
1301        assert_eq!(pos.record(), 1);
1302    }
1303
1304    // Test that reading headers on empty data yields an empty record.
1305    #[async_std::test]
1306    async fn headers_on_empty_data() {
1307        let mut rdr = AsyncReaderBuilder::new().create_reader("".as_bytes());
1308        let r = rdr.byte_headers().await.unwrap();
1309        assert_eq!(r.len(), 0);
1310    }
1311
1312    // Test that reading the first record on empty data works.
1313    #[async_std::test]
1314    async fn no_headers_on_empty_data() {
1315        let mut rdr =
1316        AsyncReaderBuilder::new().has_headers(false).create_reader("".as_bytes());
1317        assert_eq!(count(rdr.records()).await, 0);
1318    }
1319
1320    // Test that reading the first record on empty data works, even if
1321    // we've tried to read headers before hand.
1322    #[async_std::test]
1323    async fn no_headers_on_empty_data_after_headers() {
1324        let mut rdr =
1325            AsyncReaderBuilder::new().has_headers(false).create_reader("".as_bytes());
1326        assert_eq!(rdr.headers().await.unwrap().len(), 0);
1327        assert_eq!(count(rdr.records()).await, 0);
1328    }
1329
1330    #[test]
1331    fn behavior_on_io_errors() {
1332        struct FailingRead;
1333        impl io::AsyncRead for FailingRead {
1334            fn poll_read(
1335                self: Pin<&mut Self>,
1336                _cx: &mut Context,
1337                _buf: &mut [u8]
1338            ) -> Poll<Result<usize, io::Error>> {
1339                Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Broken reader")))
1340            }
1341        }
1342        impl std::marker::Unpin for FailingRead {}
1343    
1344        task::block_on(async {
1345            let mut records = AsyncReader::from_reader(FailingRead).into_records();
1346            let first_record = records.next().await;
1347            assert!(
1348                matches!(&first_record, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1349            );
1350            assert!(records.next().await.is_none());
1351        });
1352    
1353        task::block_on(async {
1354            let mut records = AsyncReaderBuilder::new()
1355                .end_on_io_error(false)
1356                .create_reader(FailingRead)
1357                .into_records();
1358            let first_record = records.next().await;
1359            assert!(
1360                matches!(&first_record, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1361            );
1362            let second_record = records.next().await;
1363            assert!(
1364                matches!(&second_record, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1365            );
1366        });
1367    }
1368}