csv_async/async_readers/ardr_futures.rs
1use futures::io;
2
3use crate::AsyncReaderBuilder;
4use crate::byte_record::{ByteRecord, Position};
5use crate::error::Result;
6use crate::string_record::StringRecord;
7use super::{
8 AsyncReaderImpl,
9 StringRecordsStream, StringRecordsIntoStream,
10 ByteRecordsStream, ByteRecordsIntoStream,
11};
12
13
14impl AsyncReaderBuilder {
15 /// Build a CSV reader from this configuration that reads data from `rdr`.
16 ///
17 /// Note that the CSV reader is buffered automatically, so you should not
18 /// wrap `rdr` in a buffered reader.
19 ///
20 /// # Example
21 ///
22 /// ```
23 /// use std::error::Error;
24 /// use futures::stream::StreamExt;
25 /// use csv_async::AsyncReaderBuilder;
26 ///
27 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
28 /// async fn example() -> Result<(), Box<dyn Error>> {
29 /// let data = "\
30 /// city,country,pop
31 /// Boston,United States,4628910
32 /// Concord,United States,42695
33 /// ";
34 /// let mut rdr = AsyncReaderBuilder::new().create_reader(data.as_bytes());
35 /// let mut records = rdr.into_records();
36 /// while let Some(record) = records.next().await {
37 /// println!("{:?}", record?);
38 /// }
39 /// Ok(())
40 /// }
41 /// ```
42 pub fn create_reader<R: io::AsyncRead + Unpin + Send>(&self, rdr: R) -> AsyncReader<R> {
43 AsyncReader::new(self, rdr)
44 }
45}
46
47/// A already configured CSV reader.
48///
49/// A CSV reader takes as input CSV data and transforms that into standard Rust
50/// values. The reader reads CSV data is as a sequence of records,
51/// where a record is a sequence of fields and each field is a string.
52///
53/// # Configuration
54///
55/// A CSV reader has convenient constructor method `from_reader`.
56/// However, if you want to configure the CSV reader to use
57/// a different delimiter or quote character (among many other things), then
58/// you should use a [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html) to construct
59/// a `AsyncReader`. For example, to change the field delimiter:
60///
61/// ```
62/// use std::error::Error;
63/// use futures::stream::StreamExt;
64/// use csv_async::AsyncReaderBuilder;
65///
66/// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
67/// async fn example() -> Result<(), Box<dyn Error>> {
68/// let data = "\
69/// city;country;pop
70/// Boston;United States;4628910
71/// ";
72/// let mut rdr = AsyncReaderBuilder::new()
73/// .delimiter(b';')
74/// .create_reader(data.as_bytes());
75///
76/// let mut records = rdr.records();
77/// assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
78/// Ok(())
79/// }
80/// ```
81///
82/// # Error handling
83///
84/// In general, CSV *parsing* does not ever return an error. That is, there is
85/// no such thing as malformed CSV data. Instead, this reader will prioritize
86/// finding a parse over rejecting CSV data that it does not understand. This
87/// choice was inspired by other popular CSV parsers, but also because it is
88/// pragmatic. CSV data varies wildly, so even if the CSV data is malformed,
89/// it might still be possible to work with the data. In the land of CSV, there
90/// is no "right" or "wrong," only "right" and "less right."
91///
92/// With that said, a number of errors can occur while reading CSV data:
93///
94/// * By default, all records in CSV data must have the same number of fields.
95/// If a record is found with a different number of fields than a prior
96/// record, then an error is returned. This behavior can be disabled by
97/// enabling flexible parsing via the `flexible` method on
98/// [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html).
99/// * When reading CSV data from a resource (like a file), it is possible for
100/// reading from the underlying resource to fail. This will return an error.
101/// For subsequent calls to the reader after encountering a such error
102/// (unless `seek` is used), it will behave as if end of file had been
103/// reached, in order to avoid running into infinite loops when still
104/// attempting to read the next record when one has errored.
105/// * When reading CSV data into `String` or `&str` fields (e.g., via a
106/// [`StringRecord`](struct.StringRecord.html)), UTF-8 is strictly
107/// enforced. If CSV data is invalid UTF-8, then an error is returned. If
108/// you want to read invalid UTF-8, then you should use the byte oriented
109/// APIs such as [`ByteRecord`](struct.ByteRecord.html). If you need explicit
110/// support for another encoding entirely, then you'll need to use another
111/// crate to transcode your CSV data to UTF-8 before parsing it.
112/// * When using Serde to deserialize CSV data into Rust types, it is possible
113/// for a number of additional errors to occur. For example, deserializing
114/// a field `xyz` into an `i32` field will result in an error.
115///
116/// For more details on the precise semantics of errors, see the
117/// [`Error`](enum.Error.html) type.
118#[derive(Debug)]
119pub struct AsyncReader<R>(AsyncReaderImpl<R>);
120
121impl<'r, R> AsyncReader<R>
122where
123 R: io::AsyncRead + Unpin + Send + 'r,
124{
125 /// Create a new CSV reader given a builder and a source of underlying
126 /// bytes.
127 fn new(builder: &AsyncReaderBuilder, rdr: R) -> AsyncReader<R> {
128 AsyncReader(AsyncReaderImpl::new(builder, rdr))
129 }
130
131 /// Create a new CSV parser with a default configuration for the given
132 /// reader.
133 ///
134 /// To customize CSV parsing, use a `ReaderBuilder`.
135 ///
136 /// # Example
137 ///
138 /// ```
139 /// use std::error::Error;
140 /// use futures::stream::StreamExt;
141 /// use csv_async::AsyncReader;
142 ///
143 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
144 /// async fn example() -> Result<(), Box<dyn Error>> {
145 /// let data = "\
146 /// city,country,pop
147 /// Boston,United States,4628910
148 /// Concord,United States,42695
149 /// ";
150 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
151 /// let mut records = rdr.into_records();
152 /// while let Some(record) = records.next().await {
153 /// println!("{:?}", record?);
154 /// }
155 /// Ok(())
156 /// }
157 /// ```
158 #[inline]
159 pub fn from_reader(rdr: R) -> AsyncReader<R> {
160 AsyncReaderBuilder::new().create_reader(rdr)
161 }
162
163 /// Returns a borrowed iterator over all records as strings.
164 ///
165 /// Each item yielded by this iterator is a `Result<StringRecord, Error>`.
166 /// Therefore, in order to access the record, callers must handle the
167 /// possibility of error (typically with `try!` or `?`).
168 ///
169 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
170 /// default), then this does not include the first record.
171 ///
172 /// # Example
173 ///
174 /// ```
175 /// use std::error::Error;
176 /// use futures::stream::StreamExt;
177 /// use csv_async::AsyncReader;
178 ///
179 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
180 /// async fn example() -> Result<(), Box<dyn Error>> {
181 /// let data = "\
182 /// city,country,pop
183 /// Boston,United States,4628910
184 /// ";
185 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
186 /// let mut records = rdr.records();
187 /// while let Some(record) = records.next().await {
188 /// println!("{:?}", record?);
189 /// }
190 /// Ok(())
191 /// }
192 /// ```
193 #[inline]
194 pub fn records(&mut self) -> StringRecordsStream<R> {
195 StringRecordsStream::new(&mut self.0)
196 }
197
198 /// Returns an owned iterator over all records as strings.
199 ///
200 /// Each item yielded by this iterator is a `Result<StringRecord, Error>`.
201 /// Therefore, in order to access the record, callers must handle the
202 /// possibility of error (typically with `try!` or `?`).
203 ///
204 /// This is mostly useful when you want to return a CSV iterator or store
205 /// it somewhere.
206 ///
207 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
208 /// default), then this does not include the first record.
209 ///
210 /// # Example
211 ///
212 /// ```
213 /// use std::error::Error;
214 /// use futures::stream::StreamExt;
215 /// use csv_async::AsyncReader;
216 ///
217 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
218 /// async fn example() -> Result<(), Box<dyn Error>> {
219 /// let data = "\
220 /// city,country,pop
221 /// Boston,United States,4628910
222 /// ";
223 /// let rdr = AsyncReader::from_reader(data.as_bytes());
224 /// let mut records = rdr.into_records();
225 /// while let Some(record) = records.next().await {
226 /// println!("{:?}", record?);
227 /// }
228 /// Ok(())
229 /// }
230 /// ```
231 #[inline]
232 pub fn into_records(self) -> StringRecordsIntoStream<'r, R> {
233 StringRecordsIntoStream::new(self.0)
234 }
235
236 /// Returns a borrowed iterator over all records as raw bytes.
237 ///
238 /// Each item yielded by this iterator is a `Result<ByteRecord, Error>`.
239 /// Therefore, in order to access the record, callers must handle the
240 /// possibility of error (typically with `try!` or `?`).
241 ///
242 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
243 /// default), then this does not include the first record.
244 ///
245 /// # Example
246 ///
247 /// ```
248 /// use std::error::Error;
249 /// use futures::stream::StreamExt;
250 /// use csv_async::AsyncReader;
251 ///
252 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
253 /// async fn example() -> Result<(), Box<dyn Error>> {
254 /// let data = "\
255 /// city,country,pop
256 /// Boston,United States,4628910
257 /// ";
258 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
259 /// let mut iter = rdr.byte_records();
260 /// assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
261 /// assert!(iter.next().await.is_none());
262 /// Ok(())
263 /// }
264 /// ```
265 #[inline]
266 pub fn byte_records(&mut self) -> ByteRecordsStream<R> {
267 ByteRecordsStream::new(&mut self.0)
268 }
269
270 /// Returns an owned iterator over all records as raw bytes.
271 ///
272 /// Each item yielded by this iterator is a `Result<ByteRecord, Error>`.
273 /// Therefore, in order to access the record, callers must handle the
274 /// possibility of error (typically with `try!` or `?`).
275 ///
276 /// This is mostly useful when you want to return a CSV iterator or store
277 /// it somewhere.
278 ///
279 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
280 /// default), then this does not include the first record.
281 ///
282 /// # Example
283 ///
284 /// ```
285 /// use std::error::Error;
286 /// use futures::stream::StreamExt;
287 /// use csv_async::AsyncReader;
288 ///
289 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
290 /// async fn example() -> Result<(), Box<dyn Error>> {
291 /// let data = "\
292 /// city,country,pop
293 /// Boston,United States,4628910
294 /// ";
295 /// let rdr = AsyncReader::from_reader(data.as_bytes());
296 /// let mut iter = rdr.into_byte_records();
297 /// assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
298 /// assert!(iter.next().await.is_none());
299 /// Ok(())
300 /// }
301 /// ```
302 #[inline]
303 pub fn into_byte_records(self) -> ByteRecordsIntoStream<'r, R> {
304 ByteRecordsIntoStream::new(self.0)
305 }
306
307 /// Returns a reference to the first row read by this parser.
308 ///
309 /// If no row has been read yet, then this will force parsing of the first
310 /// row.
311 ///
312 /// If there was a problem parsing the row or if it wasn't valid UTF-8,
313 /// then this returns an error.
314 ///
315 /// If the underlying reader emits EOF before any data, then this returns
316 /// an empty record.
317 ///
318 /// Note that this method may be used regardless of whether `has_headers`
319 /// was enabled (but it is enabled by default).
320 ///
321 /// # Example
322 ///
323 /// This example shows how to get the header row of CSV data. Notice that
324 /// the header row does not appear as a record in the iterator!
325 ///
326 /// ```
327 /// use std::error::Error;
328 /// use futures::stream::StreamExt;
329 /// use csv_async::AsyncReader;
330 ///
331 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
332 /// async fn example() -> Result<(), Box<dyn Error>> {
333 /// let data = "\
334 /// city,country,pop
335 /// Boston,United States,4628910
336 /// ";
337 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
338 ///
339 /// // We can read the headers before iterating.
340 /// {
341 /// // `headers` borrows from the reader, so we put this in its
342 /// // own scope. That way, the borrow ends before we try iterating
343 /// // below. Alternatively, we could clone the headers.
344 /// let headers = rdr.headers().await?;
345 /// assert_eq!(headers, vec!["city", "country", "pop"]);
346 /// }
347 ///
348 /// {
349 /// let mut records = rdr.records();
350 /// assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
351 /// assert!(records.next().await.is_none());
352 /// }
353 ///
354 /// // We can also read the headers after iterating.
355 /// let headers = rdr.headers().await?;
356 /// assert_eq!(headers, vec!["city", "country", "pop"]);
357 /// Ok(())
358 /// }
359 /// ```
360 #[inline]
361 pub async fn headers(&mut self) -> Result<&StringRecord> {
362 self.0.headers().await
363 }
364
365 /// Returns a reference to the first row read by this parser as raw bytes.
366 ///
367 /// If no row has been read yet, then this will force parsing of the first
368 /// row.
369 ///
370 /// If there was a problem parsing the row then this returns an error.
371 ///
372 /// If the underlying reader emits EOF before any data, then this returns
373 /// an empty record.
374 ///
375 /// Note that this method may be used regardless of whether `has_headers`
376 /// was enabled (but it is enabled by default).
377 ///
378 /// # Example
379 ///
380 /// This example shows how to get the header row of CSV data. Notice that
381 /// the header row does not appear as a record in the iterator!
382 ///
383 /// ```
384 /// use std::error::Error;
385 /// use futures::stream::StreamExt;
386 /// use csv_async::AsyncReader;
387 ///
388 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
389 /// async fn example() -> Result<(), Box<dyn Error>> {
390 /// let data = "\
391 /// city,country,pop
392 /// Boston,United States,4628910
393 /// ";
394 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
395 ///
396 /// // We can read the headers before iterating.
397 /// {
398 /// // `headers` borrows from the reader, so we put this in its
399 /// // own scope. That way, the borrow ends before we try iterating
400 /// // below. Alternatively, we could clone the headers.
401 /// let headers = rdr.byte_headers().await?;
402 /// assert_eq!(headers, vec!["city", "country", "pop"]);
403 /// }
404 ///
405 /// {
406 /// let mut records = rdr.byte_records();
407 /// assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
408 /// assert!(records.next().await.is_none());
409 /// }
410 ///
411 /// // We can also read the headers after iterating.
412 /// let headers = rdr.byte_headers().await?;
413 /// assert_eq!(headers, vec!["city", "country", "pop"]);
414 /// Ok(())
415 /// }
416 /// ```
417 #[inline]
418 pub async fn byte_headers(&mut self) -> Result<&ByteRecord> {
419 self.0.byte_headers().await
420 }
421
422 /// Set the headers of this CSV parser manually.
423 ///
424 /// This overrides any other setting (including `set_byte_headers`). Any
425 /// automatic detection of headers is disabled. This may be called at any
426 /// time.
427 ///
428 /// # Example
429 ///
430 /// ```
431 /// use std::error::Error;
432 /// use csv_async::{AsyncReader, StringRecord};
433 ///
434 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
435 /// async fn example() -> Result<(), Box<dyn Error>> {
436 /// let data = "\
437 /// city,country,pop
438 /// Boston,United States,4628910
439 /// ";
440 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
441 ///
442 /// assert_eq!(rdr.headers().await?, vec!["city", "country", "pop"]);
443 /// rdr.set_headers(StringRecord::from(vec!["a", "b", "c"]));
444 /// assert_eq!(rdr.headers().await?, vec!["a", "b", "c"]);
445 ///
446 /// Ok(())
447 /// }
448 /// ```
449 #[inline]
450 pub fn set_headers(&mut self, headers: StringRecord) {
451 self.0.set_headers(headers);
452 }
453
454 /// Set the headers of this CSV parser manually as raw bytes.
455 ///
456 /// This overrides any other setting (including `set_headers`). Any
457 /// automatic detection of headers is disabled. This may be called at any
458 /// time.
459 ///
460 /// # Example
461 ///
462 /// ```
463 /// use std::error::Error;
464 /// use csv_async::{AsyncReader, ByteRecord};
465 ///
466 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
467 /// async fn example() -> Result<(), Box<dyn Error>> {
468 /// let data = "\
469 /// city,country,pop
470 /// Boston,United States,4628910
471 /// ";
472 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
473 ///
474 /// assert_eq!(rdr.byte_headers().await?, vec!["city", "country", "pop"]);
475 /// rdr.set_byte_headers(ByteRecord::from(vec!["a", "b", "c"]));
476 /// assert_eq!(rdr.byte_headers().await?, vec!["a", "b", "c"]);
477 ///
478 /// Ok(())
479 /// }
480 /// ```
481 #[inline]
482 pub fn set_byte_headers(&mut self, headers: ByteRecord) {
483 self.0.set_byte_headers(headers);
484 }
485
486 /// Read a single row into the given record. Returns false when no more
487 /// records could be read.
488 ///
489 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
490 /// default), then this will treat initial row as headers and read the first data record.
491 ///
492 /// This method is useful when you want to read records as fast as
493 /// as possible. It's less ergonomic than an iterator, but it permits the
494 /// caller to reuse the `StringRecord` allocation, which usually results
495 /// in higher throughput.
496 ///
497 /// Records read via this method are guaranteed to have a position set
498 /// on them, even if the reader is at EOF or if an error is returned.
499 ///
500 /// # Example
501 ///
502 /// ```
503 /// use std::error::Error;
504 /// use csv_async::{AsyncReader, StringRecord};
505 ///
506 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
507 /// async fn example() -> Result<(), Box<dyn Error>> {
508 /// let data = "\
509 /// city,country,pop
510 /// Boston,United States,4628910
511 /// ";
512 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
513 /// let mut record = StringRecord::new();
514 ///
515 /// if rdr.read_record(&mut record).await? {
516 /// assert_eq!(record, vec!["Boston", "United States", "4628910"]);
517 /// Ok(())
518 /// } else {
519 /// Err(From::from("expected at least one record but got none"))
520 /// }
521 /// }
522 /// ```
523 #[inline]
524 pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool> {
525 self.0.read_record(record).await
526 }
527
528 /// Read a single row into the given byte record. Returns false when no
529 /// more records could be read.
530 ///
531 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
532 /// default), then this will treat initial row as headers and read the first data record.
533 ///
534 /// This method is useful when you want to read records as fast as
535 /// as possible. It's less ergonomic than an iterator, but it permits the
536 /// caller to reuse the `ByteRecord` allocation, which usually results
537 /// in higher throughput.
538 ///
539 /// Records read via this method are guaranteed to have a position set
540 /// on them, even if the reader is at EOF or if an error is returned.
541 ///
542 /// # Example
543 ///
544 /// ```
545 /// use std::error::Error;
546 /// use csv_async::{ByteRecord, AsyncReader};
547 ///
548 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
549 /// async fn example() -> Result<(), Box<dyn Error>> {
550 /// let data = "\
551 /// city,country,pop
552 /// Boston,United States,4628910
553 /// ";
554 /// let mut rdr = AsyncReader::from_reader(data.as_bytes());
555 /// let mut record = ByteRecord::new();
556 ///
557 /// if rdr.read_byte_record(&mut record).await? {
558 /// assert_eq!(record, vec!["Boston", "United States", "4628910"]);
559 /// Ok(())
560 /// } else {
561 /// Err(From::from("expected at least one record but got none"))
562 /// }
563 /// }
564 /// ```
565 #[inline]
566 pub async fn read_byte_record(&mut self, record: &mut ByteRecord) -> Result<bool> {
567 self.0.read_byte_record(record).await
568 }
569
570 /// Return the current position of this CSV reader.
571 ///
572 /// The byte offset in the position returned can be used to `seek` this
573 /// reader. In particular, seeking to a position returned here on the same
574 /// data will result in parsing the same subsequent record.
575 ///
576 /// # Example: reading the position
577 ///
578 /// ```
579 /// use std::error::Error;
580 /// use futures::io;
581 /// use futures::stream::StreamExt;
582 /// use csv_async::{AsyncReader, Position};
583 ///
584 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
585 /// async fn example() -> Result<(), Box<dyn Error>> {
586 /// let data = "\
587 /// city,country,popcount
588 /// Boston,United States,4628910
589 /// Concord,United States,42695
590 /// ";
591 /// let rdr = AsyncReader::from_reader(io::Cursor::new(data));
592 /// let mut iter = rdr.into_records();
593 /// let mut pos = Position::new();
594 /// loop {
595 /// let next = iter.next().await;
596 /// if let Some(next) = next {
597 /// pos = next?.position().expect("Cursor should be at some valid position").clone();
598 /// } else {
599 /// break;
600 /// }
601 /// }
602 ///
603 /// // `pos` should now be the position immediately before the last
604 /// // record.
605 /// assert_eq!(pos.byte(), 51);
606 /// assert_eq!(pos.line(), 3);
607 /// assert_eq!(pos.record(), 2);
608 /// Ok(())
609 /// }
610 /// ```
611 #[inline]
612 pub fn position(&self) -> &Position {
613 self.0.position()
614 }
615
616 /// Returns true if and only if this reader has been exhausted.
617 ///
618 /// When this returns true, no more records can be read from this reader
619 /// (unless it has been seeked to another position).
620 ///
621 /// # Example
622 ///
623 /// ```
624 /// use std::error::Error;
625 /// use futures::io;
626 /// use futures::stream::StreamExt;
627 /// use csv_async::{AsyncReader, Position};
628 ///
629 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
630 /// async fn example() -> Result<(), Box<dyn Error>> {
631 /// let data = "\
632 /// city,country,popcount
633 /// Boston,United States,4628910
634 /// Concord,United States,42695
635 /// ";
636 /// let mut rdr = AsyncReader::from_reader(io::Cursor::new(data));
637 /// assert!(!rdr.is_done());
638 /// {
639 /// let mut records = rdr.records();
640 /// while let Some(record) = records.next().await {
641 /// let _ = record?;
642 /// }
643 /// }
644 /// assert!(rdr.is_done());
645 /// Ok(())
646 /// }
647 /// ```
648 #[inline]
649 pub fn is_done(&self) -> bool {
650 self.0.is_done()
651 }
652
653 /// Returns true if and only if this reader has been configured to
654 /// interpret the first record as a header record.
655 #[inline]
656 pub fn has_headers(&self) -> bool {
657 self.0.has_headers()
658 }
659
660 /// Returns a reference to the underlying reader.
661 #[inline]
662 pub fn get_ref(&self) -> &R {
663 self.0.get_ref()
664 }
665
666 /// Returns a mutable reference to the underlying reader.
667 #[inline]
668 pub fn get_mut(&mut self) -> &mut R {
669 self.0.get_mut()
670 }
671
672 /// Unwraps this CSV reader, returning the underlying reader.
673 ///
674 /// Note that any leftover data inside this reader's internal buffer is
675 /// lost.
676 #[inline]
677 pub fn into_inner(self) -> R {
678 self.0.into_inner()
679 }
680}
681
682impl<R: io::AsyncRead + io::AsyncSeek + std::marker::Unpin> AsyncReader<R> {
683 /// Seeks the underlying reader to the position given.
684 ///
685 /// This comes with a few caveats:
686 ///
687 /// * Any internal buffer associated with this reader is cleared.
688 /// * If the given position does not correspond to a position immediately
689 /// before the start of a record, then the behavior of this reader is
690 /// unspecified.
691 /// * Any special logic that skips the first record in the CSV reader
692 /// when reading or iterating over records is disabled.
693 ///
694 /// If the given position has a byte offset equivalent to the current
695 /// position, then no seeking is performed.
696 ///
697 /// If the header row has not already been read, then this will attempt
698 /// to read the header row before seeking. Therefore, it is possible that
699 /// this returns an error associated with reading CSV data.
700 ///
701 /// Note that seeking is performed based only on the byte offset in the
702 /// given position. Namely, the record or line numbers in the position may
703 /// be incorrect, but this will cause any future position generated by
704 /// this CSV reader to be similarly incorrect.
705 ///
706 /// # Example: seek to parse a record twice
707 ///
708 /// ```
709 /// use std::error::Error;
710 /// use futures::io;
711 /// use futures::stream::StreamExt;
712 /// use csv_async::{AsyncReader, Position};
713 ///
714 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
715 /// async fn example() -> Result<(), Box<dyn Error>> {
716 /// let data = "\
717 /// city,country,popcount
718 /// Boston,United States,4628910
719 /// Concord,United States,42695
720 /// ";
721 /// let mut rdr = AsyncReader::from_reader(io::Cursor::new(data));
722 /// let mut pos = Position::new();
723 /// {
724 /// let mut records = rdr.records();
725 /// loop {
726 /// let next = records.next().await;
727 /// if let Some(next) = next {
728 /// pos = next?.position().expect("Cursor should be at some valid position").clone();
729 /// } else {
730 /// break;
731 /// }
732 /// }
733 /// }
734 ///
735 /// {
736 /// // Now seek the reader back to `pos`. This will let us read the
737 /// // last record again.
738 /// rdr.seek(pos).await?;
739 /// let mut records = rdr.into_records();
740 /// if let Some(result) = records.next().await {
741 /// let record = result?;
742 /// assert_eq!(record, vec!["Concord", "United States", "42695"]);
743 /// Ok(())
744 /// } else {
745 /// Err(From::from("expected at least one record but got none"))
746 /// }
747 /// }
748 /// }
749 /// ```
750 #[inline]
751 pub async fn seek(&mut self, pos: Position) -> Result<()> {
752 self.0.seek(pos).await
753 }
754
755 /// This is like `seek`, but provides direct control over how the seeking
756 /// operation is performed via `io::SeekFrom`.
757 ///
758 /// The `pos` position given *should* correspond the position indicated
759 /// by `seek_from`, but there is no requirement. If the `pos` position
760 /// given is incorrect, then the position information returned by this
761 /// reader will be similarly incorrect.
762 ///
763 /// If the header row has not already been read, then this will attempt
764 /// to read the header row before seeking. Therefore, it is possible that
765 /// this returns an error associated with reading CSV data.
766 ///
767 /// Unlike `seek`, this will always cause an actual seek to be performed.
768 #[inline]
769 pub async fn seek_raw(
770 &mut self,
771 seek_from: io::SeekFrom,
772 pos: Position,
773 ) -> Result<()> {
774 self.0.seek_raw(seek_from, pos).await
775 }
776
777 /// Rewinds the underlying reader to first data record.
778 ///
779 /// Function is aware of header presence.
780 /// After `rewind` record iterators will return first data record (skipping header if present), while
781 /// after `seek(0)` they will return header row (even if `has_header` is set).
782 ///
783 /// # Example: Reads the same data multiply times
784 ///
785 /// ```
786 /// use std::error::Error;
787 /// use futures::io;
788 /// use futures::stream::StreamExt;
789 /// use csv_async::AsyncReader;
790 ///
791 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
792 /// async fn example() -> Result<(), Box<dyn Error>> {
793 /// let data = "\
794 /// city,country,popcount
795 /// Boston,United States,4628910
796 /// Concord,United States,42695
797 /// ";
798 /// let mut rdr = AsyncReader::from_reader(io::Cursor::new(data));
799 /// let mut output = Vec::new();
800 /// loop {
801 /// let mut records = rdr.records();
802 /// while let Some(rec) = records.next().await {
803 /// output.push(rec?);
804 /// }
805 /// if output.len() >= 6 {
806 /// break;
807 /// } else {
808 /// drop(records);
809 /// rdr.rewind().await?;
810 /// }
811 /// }
812 /// assert_eq!(output,
813 /// vec![
814 /// vec!["Boston", "United States", "4628910"],
815 /// vec!["Concord", "United States", "42695"],
816 /// vec!["Boston", "United States", "4628910"],
817 /// vec!["Concord", "United States", "42695"],
818 /// vec!["Boston", "United States", "4628910"],
819 /// vec!["Concord", "United States", "42695"],
820 /// ]);
821 /// Ok(())
822 /// }
823 /// ```
824 #[inline]
825 pub async fn rewind(&mut self) -> Result<()> {
826 self.0.rewind().await
827 }
828}
829
830#[cfg(test)]
831mod tests {
832 use std::pin::Pin;
833 use std::task::{Context, Poll};
834
835 use futures::io;
836 use futures::stream::StreamExt;
837 use async_std::task;
838
839 use crate::byte_record::ByteRecord;
840 use crate::error::ErrorKind;
841 use crate::string_record::StringRecord;
842 use crate::Trim;
843
844 use super::{Position, AsyncReaderBuilder, AsyncReader};
845
846 fn b(s: &str) -> &[u8] {
847 s.as_bytes()
848 }
849 fn s(b: &[u8]) -> &str {
850 ::std::str::from_utf8(b).unwrap()
851 }
852
853 fn newpos(byte: u64, line: u64, record: u64) -> Position {
854 let mut p = Position::new();
855 p.set_byte(byte).set_line(line).set_record(record);
856 p
857 }
858
859 async fn count(stream: impl StreamExt) -> usize {
860 stream.fold(0, |acc, _| async move { acc + 1 }).await
861 }
862
863 #[async_std::test]
864 async fn read_byte_record() {
865 let data = b("foo,\"b,ar\",baz\nabc,mno,xyz");
866 let mut rdr =
867 AsyncReaderBuilder::new().has_headers(false).create_reader(data);
868 let mut rec = ByteRecord::new();
869
870 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
871 assert_eq!(3, rec.len());
872 assert_eq!("foo", s(&rec[0]));
873 assert_eq!("b,ar", s(&rec[1]));
874 assert_eq!("baz", s(&rec[2]));
875
876 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
877 assert_eq!(3, rec.len());
878 assert_eq!("abc", s(&rec[0]));
879 assert_eq!("mno", s(&rec[1]));
880 assert_eq!("xyz", s(&rec[2]));
881
882 assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
883 }
884
885 #[async_std::test]
886 async fn read_trimmed_records_and_headers() {
887 let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
888 let mut rdr = AsyncReaderBuilder::new()
889 .has_headers(true)
890 .trim(Trim::All)
891 .create_reader(data);
892 let mut rec = ByteRecord::new();
893 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
894 assert_eq!("1", s(&rec[0]));
895 assert_eq!("2", s(&rec[1]));
896 assert_eq!("3", s(&rec[2]));
897 let mut rec = StringRecord::new();
898 assert!(rdr.read_record(&mut rec).await.unwrap());
899 assert_eq!("1", &rec[0]);
900 assert_eq!("", &rec[1]);
901 assert_eq!("3", &rec[2]);
902 {
903 let headers = rdr.headers().await.unwrap();
904 assert_eq!(3, headers.len());
905 assert_eq!("foo", &headers[0]);
906 assert_eq!("bar", &headers[1]);
907 assert_eq!("baz", &headers[2]);
908 }
909 }
910
911 #[async_std::test]
912 async fn read_trimmed_header() {
913 let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
914 let mut rdr = AsyncReaderBuilder::new()
915 .has_headers(true)
916 .trim(Trim::Headers)
917 .create_reader(data);
918 let mut rec = ByteRecord::new();
919 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
920 assert_eq!(" 1", s(&rec[0]));
921 assert_eq!(" 2", s(&rec[1]));
922 assert_eq!(" 3", s(&rec[2]));
923 {
924 let headers = rdr.headers().await.unwrap();
925 assert_eq!(3, headers.len());
926 assert_eq!("foo", &headers[0]);
927 assert_eq!("bar", &headers[1]);
928 assert_eq!("baz", &headers[2]);
929 }
930 }
931
932 #[async_std::test]
933 async fn read_trimed_header_invalid_utf8() {
934 let data = &b"foo, b\xFFar,\tbaz\na,b,c\nd,e,f"[..];
935 let mut rdr = AsyncReaderBuilder::new()
936 .has_headers(true)
937 .trim(Trim::Headers)
938 .create_reader(data);
939 let mut rec = StringRecord::new();
940
941 // force the headers to be read
942 let _ = rdr.read_record(&mut rec).await;
943 // Check the byte headers are trimmed
944 {
945 let headers = rdr.byte_headers().await.unwrap();
946 assert_eq!(3, headers.len());
947 assert_eq!(b"foo", &headers[0]);
948 assert_eq!(b"b\xFFar", &headers[1]);
949 assert_eq!(b"baz", &headers[2]);
950 }
951 match *rdr.headers().await.unwrap_err().kind() {
952 ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
953 assert_eq!(pos, &newpos(0, 1, 0));
954 assert_eq!(err.field(), 1);
955 assert_eq!(err.valid_up_to(), 3);
956 }
957 ref err => panic!("match failed, got {:?}", err),
958 }
959 }
960
961 #[async_std::test]
962 async fn read_trimmed_records() {
963 let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
964 let mut rdr = AsyncReaderBuilder::new()
965 .has_headers(true)
966 .trim(Trim::Fields)
967 .create_reader(data);
968 let mut rec = ByteRecord::new();
969 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
970 assert_eq!("1", s(&rec[0]));
971 assert_eq!("2", s(&rec[1]));
972 assert_eq!("3", s(&rec[2]));
973 {
974 let headers = rdr.headers().await.unwrap();
975 assert_eq!(3, headers.len());
976 assert_eq!("foo", &headers[0]);
977 assert_eq!(" bar", &headers[1]);
978 assert_eq!("\tbaz", &headers[2]);
979 }
980 }
981
982 #[async_std::test]
983 async fn read_record_unequal_fails() {
984 let data = b("foo\nbar,baz");
985 let mut rdr =
986 AsyncReaderBuilder::new().has_headers(false).create_reader(data);
987 let mut rec = ByteRecord::new();
988
989 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
990 assert_eq!(1, rec.len());
991 assert_eq!("foo", s(&rec[0]));
992
993 match rdr.read_byte_record(&mut rec).await {
994 Err(err) => match *err.kind() {
995 ErrorKind::UnequalLengths {
996 expected_len: 1,
997 ref pos,
998 len: 2,
999 } => {
1000 assert_eq!(pos, &Some(newpos(4, 2, 1)));
1001 }
1002 ref wrong => panic!("match failed, got {:?}", wrong),
1003 },
1004 wrong => panic!("match failed, got {:?}", wrong),
1005 }
1006 }
1007
1008 #[async_std::test]
1009 async fn read_record_unequal_ok() {
1010 let data = b("foo\nbar,baz");
1011 let mut rdr = AsyncReaderBuilder::new()
1012 .has_headers(false)
1013 .flexible(true)
1014 .create_reader(data);
1015 let mut rec = ByteRecord::new();
1016
1017 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1018 assert_eq!(1, rec.len());
1019 assert_eq!("foo", s(&rec[0]));
1020
1021 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1022 assert_eq!(2, rec.len());
1023 assert_eq!("bar", s(&rec[0]));
1024 assert_eq!("baz", s(&rec[1]));
1025
1026 assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1027 }
1028
1029 // This tests that even if we get a CSV error, we can continue reading
1030 // if we want.
1031 #[async_std::test]
1032 async fn read_record_unequal_continue() {
1033 let data = b("foo\nbar,baz\nquux");
1034 let mut rdr =
1035 AsyncReaderBuilder::new().has_headers(false).create_reader(data);
1036 let mut rec = ByteRecord::new();
1037
1038 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1039 assert_eq!(1, rec.len());
1040 assert_eq!("foo", s(&rec[0]));
1041
1042 match rdr.read_byte_record(&mut rec).await {
1043 Err(err) => match err.kind() {
1044 &ErrorKind::UnequalLengths {
1045 expected_len: 1,
1046 ref pos,
1047 len: 2,
1048 } => {
1049 assert_eq!(pos, &Some(newpos(4, 2, 1)));
1050 }
1051 wrong => panic!("match failed, got {:?}", wrong),
1052 },
1053 wrong => panic!("match failed, got {:?}", wrong),
1054 }
1055
1056 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1057 assert_eq!(1, rec.len());
1058 assert_eq!("quux", s(&rec[0]));
1059
1060 assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1061 }
1062
1063 #[async_std::test]
1064 async fn read_record_headers() {
1065 let data = b("foo,bar,baz\na,b,c\nd,e,f");
1066 let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_reader(data);
1067 let mut rec = StringRecord::new();
1068
1069 assert!(rdr.read_record(&mut rec).await.unwrap());
1070 assert_eq!(3, rec.len());
1071 assert_eq!("a", &rec[0]);
1072
1073 assert!(rdr.read_record(&mut rec).await.unwrap());
1074 assert_eq!(3, rec.len());
1075 assert_eq!("d", &rec[0]);
1076
1077 assert!(!rdr.read_record(&mut rec).await.unwrap());
1078
1079 {
1080 let headers = rdr.byte_headers().await.unwrap();
1081 assert_eq!(3, headers.len());
1082 assert_eq!(b"foo", &headers[0]);
1083 assert_eq!(b"bar", &headers[1]);
1084 assert_eq!(b"baz", &headers[2]);
1085 }
1086 {
1087 let headers = rdr.headers().await.unwrap();
1088 assert_eq!(3, headers.len());
1089 assert_eq!("foo", &headers[0]);
1090 assert_eq!("bar", &headers[1]);
1091 assert_eq!("baz", &headers[2]);
1092 }
1093 }
1094
1095 #[async_std::test]
1096 async fn read_record_headers_invalid_utf8() {
1097 let data = &b"foo,b\xFFar,baz\na,b,c\nd,e,f"[..];
1098 let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_reader(data);
1099 let mut rec = StringRecord::new();
1100
1101 assert!(rdr.read_record(&mut rec).await.unwrap());
1102 assert_eq!(3, rec.len());
1103 assert_eq!("a", &rec[0]);
1104
1105 assert!(rdr.read_record(&mut rec).await.unwrap());
1106 assert_eq!(3, rec.len());
1107 assert_eq!("d", &rec[0]);
1108
1109 assert!(!rdr.read_record(&mut rec).await.unwrap());
1110
1111 // Check that we can read the headers as raw bytes, but that
1112 // if we read them as strings, we get an appropriate UTF-8 error.
1113 {
1114 let headers = rdr.byte_headers().await.unwrap();
1115 assert_eq!(3, headers.len());
1116 assert_eq!(b"foo", &headers[0]);
1117 assert_eq!(b"b\xFFar", &headers[1]);
1118 assert_eq!(b"baz", &headers[2]);
1119 }
1120 match *rdr.headers().await.unwrap_err().kind() {
1121 ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
1122 assert_eq!(pos, &newpos(0, 1, 0));
1123 assert_eq!(err.field(), 1);
1124 assert_eq!(err.valid_up_to(), 1);
1125 }
1126 ref err => panic!("match failed, got {:?}", err),
1127 }
1128 }
1129
1130 #[async_std::test]
1131 async fn read_record_no_headers_before() {
1132 let data = b("foo,bar,baz\na,b,c\nd,e,f");
1133 let mut rdr =
1134 AsyncReaderBuilder::new().has_headers(false).create_reader(data);
1135 let mut rec = StringRecord::new();
1136
1137 {
1138 let headers = rdr.headers().await.unwrap();
1139 assert_eq!(3, headers.len());
1140 assert_eq!("foo", &headers[0]);
1141 assert_eq!("bar", &headers[1]);
1142 assert_eq!("baz", &headers[2]);
1143 }
1144
1145 assert!(rdr.read_record(&mut rec).await.unwrap());
1146 assert_eq!(3, rec.len());
1147 assert_eq!("foo", &rec[0]);
1148
1149 assert!(rdr.read_record(&mut rec).await.unwrap());
1150 assert_eq!(3, rec.len());
1151 assert_eq!("a", &rec[0]);
1152
1153 assert!(rdr.read_record(&mut rec).await.unwrap());
1154 assert_eq!(3, rec.len());
1155 assert_eq!("d", &rec[0]);
1156
1157 assert!(!rdr.read_record(&mut rec).await.unwrap());
1158 }
1159
1160 #[async_std::test]
1161 async fn read_record_no_headers_after() {
1162 let data = b("foo,bar,baz\na,b,c\nd,e,f");
1163 let mut rdr =
1164 AsyncReaderBuilder::new().has_headers(false).create_reader(data);
1165 let mut rec = StringRecord::new();
1166
1167 assert!(rdr.read_record(&mut rec).await.unwrap());
1168 assert_eq!(3, rec.len());
1169 assert_eq!("foo", &rec[0]);
1170
1171 assert!(rdr.read_record(&mut rec).await.unwrap());
1172 assert_eq!(3, rec.len());
1173 assert_eq!("a", &rec[0]);
1174
1175 assert!(rdr.read_record(&mut rec).await.unwrap());
1176 assert_eq!(3, rec.len());
1177 assert_eq!("d", &rec[0]);
1178
1179 assert!(!rdr.read_record(&mut rec).await.unwrap());
1180
1181 let headers = rdr.headers().await.unwrap();
1182 assert_eq!(3, headers.len());
1183 assert_eq!("foo", &headers[0]);
1184 assert_eq!("bar", &headers[1]);
1185 assert_eq!("baz", &headers[2]);
1186 }
1187
1188 #[async_std::test]
1189 async fn seek() {
1190 let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1191 let mut rdr = AsyncReaderBuilder::new().create_reader(io::Cursor::new(data));
1192 rdr.seek(newpos(18, 3, 2)).await.unwrap();
1193
1194 let mut rec = StringRecord::new();
1195
1196 assert_eq!(18, rdr.position().byte());
1197 assert!(rdr.read_record(&mut rec).await.unwrap());
1198 assert_eq!(3, rec.len());
1199 assert_eq!("d", &rec[0]);
1200
1201 assert_eq!(24, rdr.position().byte());
1202 assert_eq!(4, rdr.position().line());
1203 assert_eq!(3, rdr.position().record());
1204 assert!(rdr.read_record(&mut rec).await.unwrap());
1205 assert_eq!(3, rec.len());
1206 assert_eq!("g", &rec[0]);
1207
1208 assert!(!rdr.read_record(&mut rec).await.unwrap());
1209 }
1210
1211 // Test that we can read headers after seeking even if the headers weren't
1212 // explicit read before seeking.
1213 #[async_std::test]
1214 async fn seek_headers_after() {
1215 let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1216 let mut rdr = AsyncReaderBuilder::new().create_reader(io::Cursor::new(data));
1217 rdr.seek(newpos(18, 3, 2)).await.unwrap();
1218 assert_eq!(rdr.headers().await.unwrap(), vec!["foo", "bar", "baz"]);
1219 }
1220
1221 // Test that we can read headers after seeking if the headers were read
1222 // before seeking.
1223 #[async_std::test]
1224 async fn seek_headers_before_after() {
1225 let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1226 let mut rdr = AsyncReaderBuilder::new().create_reader(io::Cursor::new(data));
1227 let headers = rdr.headers().await.unwrap().clone();
1228 rdr.seek(newpos(18, 3, 2)).await.unwrap();
1229 assert_eq!(&headers, rdr.headers().await.unwrap());
1230 }
1231
1232 // Test that even if we didn't read headers before seeking, if we seek to
1233 // the current byte offset, then no seeking is done and therefore we can
1234 // still read headers after seeking.
1235 #[async_std::test]
1236 async fn seek_headers_no_actual_seek() {
1237 let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1238 let mut rdr = AsyncReaderBuilder::new().create_reader(io::Cursor::new(data));
1239 rdr.seek(Position::new()).await.unwrap();
1240 assert_eq!("foo", &rdr.headers().await.unwrap()[0]);
1241 }
1242
1243 #[async_std::test]
1244 async fn rewind() {
1245 let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1246 let mut rdr = AsyncReaderBuilder::new().create_reader(io::Cursor::new(data));
1247 // rdr.seek(newpos(18, 3, 2)).await.unwrap();
1248
1249 let mut rec = StringRecord::new();
1250 assert!(rdr.read_record(&mut rec).await.unwrap());
1251 assert_eq!(3, rec.len());
1252 assert_eq!("a", &rec[0]);
1253
1254 // assert_eq!(18, rdr.position().byte());
1255 assert!(rdr.read_record(&mut rec).await.unwrap());
1256 assert_eq!(3, rec.len());
1257 assert_eq!("d", &rec[0]);
1258
1259 rdr.rewind().await.unwrap();
1260
1261 assert!(rdr.read_record(&mut rec).await.unwrap());
1262 assert_eq!(3, rec.len());
1263 assert_eq!("a", &rec[0]);
1264 }
1265
1266 // Test that position info is reported correctly in absence of headers.
1267 #[async_std::test]
1268 async fn positions_no_headers() {
1269 let mut rdr = AsyncReaderBuilder::new()
1270 .has_headers(false)
1271 .create_reader("a,b,c\nx,y,z".as_bytes())
1272 .into_records();
1273
1274 let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
1275 assert_eq!(pos.byte(), 0);
1276 assert_eq!(pos.line(), 1);
1277 assert_eq!(pos.record(), 0);
1278
1279 let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
1280 assert_eq!(pos.byte(), 6);
1281 assert_eq!(pos.line(), 2);
1282 assert_eq!(pos.record(), 1);
1283
1284 // Test that we are at end of stream, and properly signal this.
1285 assert!(rdr.next().await.is_none());
1286 // Testing that we are not panic, trying to pass over end of stream (Issue#22)
1287 assert!(rdr.next().await.is_none());
1288 }
1289
1290 // Test that position info is reported correctly with headers.
1291 #[async_std::test]
1292 async fn positions_headers() {
1293 let mut rdr = AsyncReaderBuilder::new()
1294 .has_headers(true)
1295 .create_reader("a,b,c\nx,y,z".as_bytes())
1296 .into_records();
1297
1298 let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
1299 assert_eq!(pos.byte(), 6);
1300 assert_eq!(pos.line(), 2);
1301 assert_eq!(pos.record(), 1);
1302 }
1303
1304 // Test that reading headers on empty data yields an empty record.
1305 #[async_std::test]
1306 async fn headers_on_empty_data() {
1307 let mut rdr = AsyncReaderBuilder::new().create_reader("".as_bytes());
1308 let r = rdr.byte_headers().await.unwrap();
1309 assert_eq!(r.len(), 0);
1310 }
1311
1312 // Test that reading the first record on empty data works.
1313 #[async_std::test]
1314 async fn no_headers_on_empty_data() {
1315 let mut rdr =
1316 AsyncReaderBuilder::new().has_headers(false).create_reader("".as_bytes());
1317 assert_eq!(count(rdr.records()).await, 0);
1318 }
1319
1320 // Test that reading the first record on empty data works, even if
1321 // we've tried to read headers before hand.
1322 #[async_std::test]
1323 async fn no_headers_on_empty_data_after_headers() {
1324 let mut rdr =
1325 AsyncReaderBuilder::new().has_headers(false).create_reader("".as_bytes());
1326 assert_eq!(rdr.headers().await.unwrap().len(), 0);
1327 assert_eq!(count(rdr.records()).await, 0);
1328 }
1329
1330 #[test]
1331 fn behavior_on_io_errors() {
1332 struct FailingRead;
1333 impl io::AsyncRead for FailingRead {
1334 fn poll_read(
1335 self: Pin<&mut Self>,
1336 _cx: &mut Context,
1337 _buf: &mut [u8]
1338 ) -> Poll<Result<usize, io::Error>> {
1339 Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Broken reader")))
1340 }
1341 }
1342 impl std::marker::Unpin for FailingRead {}
1343
1344 task::block_on(async {
1345 let mut records = AsyncReader::from_reader(FailingRead).into_records();
1346 let first_record = records.next().await;
1347 assert!(
1348 matches!(&first_record, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1349 );
1350 assert!(records.next().await.is_none());
1351 });
1352
1353 task::block_on(async {
1354 let mut records = AsyncReaderBuilder::new()
1355 .end_on_io_error(false)
1356 .create_reader(FailingRead)
1357 .into_records();
1358 let first_record = records.next().await;
1359 assert!(
1360 matches!(&first_record, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1361 );
1362 let second_record = records.next().await;
1363 assert!(
1364 matches!(&second_record, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1365 );
1366 });
1367 }
1368}