csv_async/async_readers/
ades_futures.rs

1use futures::io;
2use serde::de::DeserializeOwned;
3
4use crate::AsyncReaderBuilder;
5use crate::byte_record::{ByteRecord, Position};
6use crate::error::Result;
7use crate::string_record::StringRecord;
8use super::{
9    AsyncReaderImpl,
10    DeserializeRecordsStream, DeserializeRecordsIntoStream,
11    DeserializeRecordsStreamPos, DeserializeRecordsIntoStreamPos,
12};
13
14
15impl AsyncReaderBuilder {
16    /// Build a CSV `serde` deserializer from this configuration that reads data from `rdr`.
17    ///
18    /// Note that the CSV reader is buffered automatically, so you should not
19    /// wrap `rdr` in a buffered reader like `io::BufReader`.
20    ///
21    /// # Example
22    ///
23    /// ```
24    /// use std::error::Error;
25    /// use futures::stream::StreamExt;
26    /// use serde::Deserialize;
27    /// use csv_async::AsyncReaderBuilder;
28    /// 
29    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
30    /// struct Row {
31    ///     city: String,
32    ///     country: String,
33    ///     pop: u64,
34    /// }
35    ///
36    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
37    /// async fn example() -> Result<(), Box<dyn Error>> {
38    ///     let data = "\
39    /// city,country,pop
40    /// Boston,United States,4628910
41    /// Concord,United States,42695
42    /// ";
43    ///     let mut rdr = AsyncReaderBuilder::new().create_deserializer(data.as_bytes());
44    ///     let mut records = rdr.into_deserialize::<Row>();
45    ///     while let Some(record) = records.next().await {
46    ///         println!("{:?}", record?);
47    ///     }
48    ///     Ok(())
49    /// }
50    /// ```
51    pub fn create_deserializer<R: io::AsyncRead + Unpin + Send>(&self, rdr: R) -> AsyncDeserializer<R> {
52        AsyncDeserializer::new(self, rdr)
53    }
54}
55
56/// Configured CSV `serde` deserializer.
57///
58/// A CSV deserializer takes as input CSV data and transforms that into standard Rust
59/// values. The reader reads CSV data is as a sequence of records,
60/// where a record is either a sequence of string fields or structure with derived
61/// `serde::Deserialize` interface.
62///
63/// # Configuration
64///
65/// A CSV deserializer has convenient constructor method `from_reader`.
66/// However, if you want to configure the CSV deserializer to use
67/// a different delimiter or quote character (among many other things), then
68/// you should use a [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html) to construct
69/// a `AsyncDeserializer`. For example, to change the field delimiter:
70///
71/// ```
72/// use std::error::Error;
73/// use futures::stream::StreamExt;
74/// use serde::Deserialize;
75/// use csv_async::AsyncReaderBuilder;
76/// 
77/// #[derive(Debug, Deserialize, Eq, PartialEq)]
78/// struct Row {
79///     city: String,
80///     country: String,
81///     pop: u64,
82/// }
83///
84/// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
85/// async fn example() -> Result<(), Box<dyn Error>> {
86///     let data = indoc::indoc! {"
87///         city;country;pop
88///         Boston;United States;4628910
89///     "};
90///     let mut rdr = AsyncReaderBuilder::new()
91///         .delimiter(b';')
92///         .create_deserializer(data.as_bytes());
93///
94///     let mut records = rdr.deserialize::<Row>();
95///     assert_eq!(records.next().await.unwrap()?,
96///                Row {city: "Boston".to_string(),
97///                     country: "United States".to_string(),
98///                     pop: 4628910 });
99///     Ok(())
100/// }
101/// ```
102///
103/// # Error handling
104///
105/// In general, CSV *parsing* does not ever return an error. That is, there is
106/// no such thing as malformed CSV data. Instead, this reader will prioritize
107/// finding a parse over rejecting CSV data that it does not understand. This
108/// choice was inspired by other popular CSV parsers, but also because it is
109/// pragmatic. CSV data varies wildly, so even if the CSV data is malformed,
110/// it might still be possible to work with the data. In the land of CSV, there
111/// is no "right" or "wrong," only "right" and "less right."
112///
113/// With that said, a number of errors can occur while reading CSV data:
114///
115/// * By default, all records in CSV data must have the same number of fields.
116///   If a record is found with a different number of fields than a prior
117///   record, then an error is returned. This behavior can be disabled by
118///   enabling flexible parsing via the `flexible` method on
119///   [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html).
120/// * When reading CSV data from a resource (like a file), it is possible for
121///   reading from the underlying resource to fail. This will return an error.
122///   For subsequent calls to the reader after encountering a such error
123///   (unless `seek` is used), it will behave as if end of file had been
124///   reached, in order to avoid running into infinite loops when still
125///   attempting to read the next record when one has errored.
126/// * When reading CSV data into `String` or `&str` fields (e.g., via a
127///   [`StringRecord`](struct.StringRecord.html)), UTF-8 is strictly
128///   enforced. If CSV data is invalid UTF-8, then an error is returned. If
129///   you want to read invalid UTF-8, then you should use the byte oriented
130///   APIs such as [`ByteRecord`](struct.ByteRecord.html). If you need explicit
131///   support for another encoding entirely, then you'll need to use another
132///   crate to transcode your CSV data to UTF-8 before parsing it.
133/// * When using Serde to deserialize CSV data into Rust types, it is possible
134///   for a number of additional errors to occur. For example, deserializing
135///   a field `xyz` into an `i32` field will result in an error.
136///
137/// For more details on the precise semantics of errors, see the
138/// [`Error`](enum.Error.html) type.
139#[derive(Debug)]
140pub struct AsyncDeserializer<R>(AsyncReaderImpl<R>);
141
142impl<'r, R> AsyncDeserializer<R>
143where
144    R: io::AsyncRead + Unpin + Send + 'r,
145{
146    /// Create a new CSV reader given a builder and a source of underlying
147    /// bytes.
148    fn new(builder: &AsyncReaderBuilder, rdr: R) -> AsyncDeserializer<R> {
149        AsyncDeserializer(AsyncReaderImpl::new(builder, rdr))
150    }
151
152    /// Create a new CSV parser with a default configuration for the given
153    /// reader.
154    ///
155    /// To customize CSV parsing, use a `ReaderBuilder`.
156    ///
157    /// # Example
158    ///
159    /// ```
160    /// use std::error::Error;
161    /// use futures::stream::StreamExt;
162    /// use serde::Deserialize;
163    /// use csv_async::AsyncDeserializer;
164    /// 
165    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
166    /// struct Row {
167    ///     city: String,
168    ///     country: String,
169    ///     pop: u64,
170    /// }
171    ///
172    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
173    /// async fn example() -> Result<(), Box<dyn Error>> {
174    ///     let data = "\
175    /// city,country,pop
176    /// Boston,United States,4628910
177    /// Concord,United States,42695
178    /// ";
179    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
180    ///     let mut records = rdr.into_deserialize::<Row>();
181    ///     while let Some(record) = records.next().await {
182    ///         println!("{:?}", record?);
183    ///     }
184    ///     Ok(())
185    /// }
186    /// ```
187    #[inline]
188    pub fn from_reader(rdr: R) -> AsyncDeserializer<R> {
189        AsyncReaderBuilder::new().create_deserializer(rdr)
190    }
191
192    /// Returns a borrowed stream over deserialized records.
193    ///
194    /// Each item yielded by this stream is a `Result<D, Error>`.
195    /// Therefore, in order to access the record, callers must handle the
196    /// possibility of error (typically with `?`).
197    ///
198    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
199    /// default), then this does not include the first record. Additionally,
200    /// if `has_headers` is enabled, then deserializing into a struct will
201    /// automatically align the values in each row to the fields of a struct
202    /// based on the header row.
203    /// 
204    /// Frequently turbo-fish notation is needed while calling this function:
205    /// `rdr.deserialize::<RecordTyme>();`
206    ///
207    /// # Example
208    ///
209    /// This shows how to deserialize CSV data into normal Rust structures. The
210    /// fields of the header row are used to match up the values in each row
211    /// to the fields of the struct.
212    ///
213    /// ```
214    /// use std::error::Error;
215    ///
216    /// use futures::stream::StreamExt;
217    /// use serde::Deserialize;
218    /// use csv_async::AsyncDeserializer;
219    ///
220    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
221    /// struct Row {
222    ///     city: String,
223    ///     country: String,
224    ///     #[serde(rename = "popcount")]
225    ///     population: u64,
226    /// }
227    ///
228    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
229    /// async fn example() -> Result<(), Box<dyn Error>> {
230    ///     let data = "\
231    /// city,country,popcount
232    /// Boston,United States,4628910
233    /// ";
234    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
235    ///     let mut iter = rdr.deserialize();
236    ///
237    ///     if let Some(result) = iter.next().await {
238    ///         let record: Row = result?;
239    ///         assert_eq!(record, Row {
240    ///             city: "Boston".to_string(),
241    ///             country: "United States".to_string(),
242    ///             population: 4628910,
243    ///         });
244    ///         Ok(())
245    ///     } else {
246    ///         Err(From::from("expected at least one record but got none"))
247    ///     }
248    /// }
249    /// ```
250    ///
251    /// # Rules
252    ///
253    /// For the most part, any Rust type that maps straight-forwardly to a CSV
254    /// record is supported. This includes maps, structs, tuples and tuple
255    /// structs. Other Rust types, such as `Vec`s, arrays, and enums have
256    /// a more complicated story. In general, when working with CSV data, one
257    /// should avoid *nested sequences* as much as possible.
258    ///
259    /// Maps, structs, tuples and tuple structs map to CSV records in a simple
260    /// way. Tuples and tuple structs decode their fields in the order that
261    /// they are defined. Structs will do the same only if `has_headers` has
262    /// been disabled using [`ReaderBuilder`](struct.ReaderBuilder.html),
263    /// otherwise, structs and maps are deserialized based on the fields
264    /// defined in the header row. (If there is no header row, then
265    /// deserializing into a map will result in an error.)
266    ///
267    /// Nested sequences are supported in a limited capacity. Namely, they
268    /// are flattened. As a result, it's often useful to use a `Vec` to capture
269    /// a "tail" of fields in a record:
270    ///
271    /// ```
272    /// use std::error::Error;
273    ///
274    /// use futures::stream::StreamExt;
275    /// use serde::Deserialize;
276    /// use csv_async::AsyncReaderBuilder;
277    ///
278    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
279    /// struct Row {
280    ///     label: String,
281    ///     values: Vec<i32>,
282    /// }
283    ///
284    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
285    /// async fn example() -> Result<(), Box<dyn Error>> {
286    ///     let data = "foo,1,2,3";
287    ///     let mut rdr = AsyncReaderBuilder::new()
288    ///         .has_headers(false)
289    ///         .create_deserializer(data.as_bytes());
290    ///     let mut iter = rdr.deserialize();
291    ///
292    ///     if let Some(result) = iter.next().await {
293    ///         let record: Row = result?;
294    ///         assert_eq!(record, Row {
295    ///             label: "foo".to_string(),
296    ///             values: vec![1, 2, 3],
297    ///         });
298    ///         Ok(())
299    ///     } else {
300    ///         Err(From::from("expected at least one record but got none"))
301    ///     }
302    /// }
303    /// ```
304    ///
305    /// In the above example, adding another field to the `Row` struct after
306    /// the `values` field will result in a deserialization error. This is
307    /// because the deserializer doesn't know when to stop reading fields
308    /// into the `values` vector, so it will consume the rest of the fields in
309    /// the record leaving none left over for the additional field.
310    ///
311    /// Finally, simple enums in Rust can be deserialized as well. Namely,
312    /// enums must either be variants with no arguments or variants with a
313    /// single argument. Variants with no arguments are deserialized based on
314    /// which variant name the field matches. Variants with a single argument
315    /// are deserialized based on which variant can store the data. The latter
316    /// is only supported when using "untagged" enum deserialization. The
317    /// following example shows both forms in action:
318    ///
319    /// ```
320    /// use std::error::Error;
321    ///
322    /// use futures::stream::StreamExt;
323    /// use serde::Deserialize;
324    /// use csv_async::AsyncDeserializer;
325    ///
326    /// #[derive(Debug, Deserialize, PartialEq)]
327    /// struct Row {
328    ///     label: Label,
329    ///     value: Number,
330    /// }
331    ///
332    /// #[derive(Debug, Deserialize, PartialEq)]
333    /// #[serde(rename_all = "lowercase")]
334    /// enum Label {
335    ///     Celsius,
336    ///     Fahrenheit,
337    /// }
338    ///
339    /// #[derive(Debug, Deserialize, PartialEq)]
340    /// #[serde(untagged)]
341    /// enum Number {
342    ///     Integer(i64),
343    ///     Float(f64),
344    /// }
345    ///
346    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
347    /// async fn example() -> Result<(), Box<dyn Error>> {
348    ///     let data = "\
349    /// label,value
350    /// celsius,22.2222
351    /// fahrenheit,72
352    /// ";
353    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
354    ///     let mut iter = rdr.deserialize();
355    ///
356    ///     // Read the first record.
357    ///     if let Some(result) = iter.next().await {
358    ///         let record: Row = result?;
359    ///         assert_eq!(record, Row {
360    ///             label: Label::Celsius,
361    ///             value: Number::Float(22.2222),
362    ///         });
363    ///     } else {
364    ///         return Err(From::from(
365    ///             "expected at least two records but got none"));
366    ///     }
367    ///
368    ///     // Read the second record.
369    ///     if let Some(result) = iter.next().await {
370    ///         let record: Row = result?;
371    ///         assert_eq!(record, Row {
372    ///             label: Label::Fahrenheit,
373    ///             value: Number::Integer(72),
374    ///         });
375    ///         Ok(())
376    ///     } else {
377    ///         Err(From::from(
378    ///             "expected at least two records but got only one"))
379    ///     }
380    /// }
381    /// ```
382    #[inline]
383    pub fn deserialize<D:'r>(&'r mut self) -> DeserializeRecordsStream<'r, R, D>
384    where
385        D: DeserializeOwned,
386    {
387        DeserializeRecordsStream::new(& mut self.0)
388    }
389
390    /// Returns a borrowed stream over pairs of deserialized record and position
391    /// in reader stream before record read.
392    ///
393    /// Each item yielded by this stream is a `(Result<D, Error>, Position)`.
394    /// Therefore, in order to access the record, callers must handle the
395    /// possibility of error (typically with `?`).
396    /// 
397    /// Frequently turbo-fish notation is needed while calling this function:
398    /// `rdr.deserialize_with_pos::<RecordTyme>();`
399    ///
400    /// # Example
401    ///
402    /// This shows how to deserialize CSV data into normal Rust structures. The
403    /// fields of the header row are used to match up the values in each row
404    /// to the fields of the struct.
405    ///
406    /// ```
407    /// use std::error::Error;
408    ///
409    /// use futures::stream::StreamExt;
410    /// use serde::Deserialize;
411    /// use csv_async::AsyncDeserializer;
412    ///
413    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
414    /// struct Row {
415    ///     city: String,
416    ///     country: String,
417    ///     population: u64,
418    /// }
419    ///
420    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
421    /// async fn example() -> Result<(), Box<dyn Error>> {
422    ///     let data = "\
423    /// city,country,population
424    /// Boston,United States,4628910
425    /// Concord,United States,42695
426    /// ";
427    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
428    ///     let mut iter = rdr.deserialize_with_pos();
429    ///
430    ///     if let Some((result, pos)) = iter.next().await {
431    ///         let record: Row = result?;
432    ///         assert_eq!(record, Row {
433    ///             city: "Boston".to_string(),
434    ///             country: "United States".to_string(),
435    ///             population: 4628910,
436    ///         });
437    ///         assert_eq!(pos.byte(), 24);
438    ///         assert_eq!(pos.line(), 2);
439    ///         assert_eq!(pos.record(), 1);
440    ///     } else {
441    ///         return Err(From::from("expected at least one record but got none"));
442    ///     }
443    ///     if let Some((result, pos)) = iter.next().await {
444    ///         let record: Row = result?;
445    ///         assert_eq!(record, Row {
446    ///             city: "Concord".to_string(),
447    ///             country: "United States".to_string(),
448    ///             population: 42695,
449    ///         });
450    ///         assert_eq!(pos.byte(), 53);
451    ///         assert_eq!(pos.line(), 3);
452    ///         assert_eq!(pos.record(), 2);
453    ///     } else {
454    ///         return Err(From::from("expected at least two records but got one only"));
455    ///     }
456    ///     assert!(iter.next().await.is_none());
457    ///     Ok(())
458    /// }
459    /// ```
460    #[inline]
461    pub fn deserialize_with_pos<D:'r>(&'r mut self) -> DeserializeRecordsStreamPos<'r, R, D>
462    where
463        D: DeserializeOwned,
464    {
465        DeserializeRecordsStreamPos::new(& mut self.0)
466    }
467
468    /// Returns a owned stream over deserialized records.
469    ///
470    /// Each item yielded by this stream is a `Result<D, Error>`.
471    /// Therefore, in order to access the record, callers must handle the
472    /// possibility of error (typically with `?`).
473    ///
474    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
475    /// default), then this does not include the first record. Additionally,
476    /// if `has_headers` is enabled, then deserializing into a struct will
477    /// automatically align the values in each row to the fields of a struct
478    /// based on the header row.
479    /// 
480    /// Frequently turbo-fish notation is needed while calling this function:
481    /// `rdr.into_deserialize::<RecordTyme>();`
482    ///
483    /// # Example
484    ///
485    /// This shows how to deserialize CSV data into normal Rust structs. The
486    /// fields of the header row are used to match up the values in each row
487    /// to the fields of the struct.
488    ///
489    /// ```
490    /// use std::error::Error;
491    ///
492    /// use futures::stream::StreamExt;
493    /// use serde::Deserialize;
494    /// use csv_async::AsyncDeserializer;
495    ///
496    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
497    /// struct Row {
498    ///     city: String,
499    ///     country: String,
500    ///     #[serde(rename = "popcount")]
501    ///     population: u64,
502    /// }
503    ///
504    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
505    /// async fn example() -> Result<(), Box<dyn Error>> {
506    ///     let data = "\
507    /// city,country,popcount
508    /// Boston,United States,4628910
509    /// ";
510    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
511    ///     let mut iter = rdr.into_deserialize();
512    ///
513    ///     if let Some(result) = iter.next().await {
514    ///         let record: Row = result?;
515    ///         assert_eq!(record, Row {
516    ///             city: "Boston".to_string(),
517    ///             country: "United States".to_string(),
518    ///             population: 4628910,
519    ///         });
520    ///         Ok(())
521    ///     } else {
522    ///         Err(From::from("expected at least one record but got none"))
523    ///     }
524    /// }
525    /// ```
526    #[inline]
527    pub fn into_deserialize<D:'r>(self) -> DeserializeRecordsIntoStream<'r, R, D>
528    where
529        D: DeserializeOwned,
530    {
531        DeserializeRecordsIntoStream::new(self.0)
532    }
533
534    /// Returns a owned stream over pairs of deserialized record and position
535    /// in reader stream before record read.
536    ///
537    #[inline]
538    pub fn into_deserialize_with_pos<D:'r>(self) -> DeserializeRecordsIntoStreamPos<'r, R, D>
539    where
540        D: DeserializeOwned,
541    {
542        DeserializeRecordsIntoStreamPos::new(self.0)
543    }
544
545    /// Returns a reference to the first row read by this parser.
546    ///
547    /// If no row has been read yet, then this will force parsing of the first
548    /// row.
549    ///
550    /// If there was a problem parsing the row or if it wasn't valid UTF-8,
551    /// then this returns an error.
552    ///
553    /// If the underlying reader emits EOF before any data, then this returns
554    /// an empty record.
555    ///
556    /// Note that this method may be used regardless of whether `has_headers`
557    /// was enabled (but it is enabled by default).
558    ///
559    /// # Example
560    ///
561    /// This example shows how to get the header row of CSV data. Notice that
562    /// the header row does not appear as a record in the iterator!
563    ///
564    /// ```
565    /// use std::error::Error;
566    /// use futures::stream::StreamExt;
567    /// use serde::Deserialize;
568    /// use csv_async::AsyncDeserializer;
569    /// 
570    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
571    /// struct Row {
572    ///     city: String,
573    ///     country: String,
574    ///     pop: u64,
575    /// }
576    ///
577    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
578    /// async fn example() -> Result<(), Box<dyn Error>> {
579    ///     let data = "\
580    /// city,country,pop
581    /// Boston,United States,4628910
582    /// ";
583    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
584    ///
585    ///     // We can read the headers before iterating.
586    ///     {
587    ///     // `headers` borrows from the reader, so we put this in its
588    ///     // own scope. That way, the borrow ends before we try iterating
589    ///     // below. Alternatively, we could clone the headers.
590    ///     let headers = rdr.headers().await?;
591    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
592    ///     }
593    ///
594    ///     {
595    ///     let mut records = rdr.deserialize::<Row>();
596    ///     assert_eq!(records.next().await.unwrap()?,
597    ///                Row {city: "Boston".to_string(),
598    ///                     country: "United States".to_string(),
599    ///                     pop: 4628910 });
600    ///     assert!(records.next().await.is_none());
601    ///     }
602    ///
603    ///     // We can also read the headers after iterating.
604    ///     let headers = rdr.headers().await?;
605    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
606    ///     Ok(())
607    /// }
608    /// ```
609    #[inline]
610    pub async fn headers(&mut self) -> Result<&StringRecord> {
611        self.0.headers().await
612    }
613
614    /// Returns a reference to the first row read by this parser as raw bytes.
615    ///
616    /// If no row has been read yet, then this will force parsing of the first
617    /// row.
618    ///
619    /// If there was a problem parsing the row then this returns an error.
620    ///
621    /// If the underlying reader emits EOF before any data, then this returns
622    /// an empty record.
623    ///
624    /// Note that this method may be used regardless of whether `has_headers`
625    /// was enabled (but it is enabled by default).
626    ///
627    /// # Example
628    ///
629    /// This example shows how to get the header row of CSV data. Notice that
630    /// the header row does not appear as a record in the iterator!
631    ///
632    /// ```
633    /// use std::error::Error;
634    /// use futures::stream::StreamExt;
635    /// use serde::Deserialize;
636    /// use csv_async::AsyncDeserializer;
637    ///
638    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
639    /// struct Row {
640    ///     city: String,
641    ///     country: String,
642    ///     #[serde(rename = "pop")]
643    ///     population: u64,
644    /// }
645    /// 
646    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
647    /// async fn example() -> Result<(), Box<dyn Error>> {
648    ///     let data = indoc::indoc! {"
649    ///         city,country,pop
650    ///         Boston,United States,4628910
651    ///     "};
652    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
653    ///
654    ///     // We can read the headers before iterating.
655    ///     {
656    ///     // `headers` borrows from the reader, so we put this in its
657    ///     // own scope. That way, the borrow ends before we try iterating
658    ///     // below. Alternatively, we could clone the headers.
659    ///     let headers = rdr.byte_headers().await?;
660    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
661    ///     }
662    ///
663    ///     {
664    ///     let mut records = rdr.deserialize::<Row>();
665    ///     assert_eq!(records.next().await.unwrap()?,
666    ///                Row {city: "Boston".to_string(),
667    ///                     country: "United States".to_string(),
668    ///                     population: 4628910 });
669    ///     assert!(records.next().await.is_none());
670    ///     }
671    ///
672    ///     // We can also read the headers after iterating.
673    ///     let headers = rdr.byte_headers().await?;
674    ///     assert_eq!(headers, vec!["city", "country", "pop"]);
675    ///     Ok(())
676    /// }
677    /// ```
678    #[inline]
679    pub async fn byte_headers(&mut self) -> Result<&ByteRecord> {
680        self.0.byte_headers().await
681    }
682
683    /// Set the headers of this CSV parser manually.
684    ///
685    /// This overrides any other setting (including `set_byte_headers`). Any
686    /// automatic detection of headers is disabled. This may be called at any
687    /// time.
688    ///
689    /// # Example
690    ///
691    /// ```
692    /// use std::error::Error;
693    /// use csv_async::{AsyncDeserializer, StringRecord};
694    ///
695    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
696    /// async fn example() -> Result<(), Box<dyn Error>> {
697    ///     let data = "\
698    /// city,country,pop
699    /// Boston,United States,4628910
700    /// ";
701    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
702    ///
703    ///     assert_eq!(rdr.headers().await?, vec!["city", "country", "pop"]);
704    ///     rdr.set_headers(StringRecord::from(vec!["a", "b", "c"]));
705    ///     assert_eq!(rdr.headers().await?, vec!["a", "b", "c"]);
706    ///
707    ///     Ok(())
708    /// }
709    /// ```
710    #[inline]
711    pub fn set_headers(&mut self, headers: StringRecord) {
712        self.0.set_headers(headers);
713    }
714
715    /// Set the headers of this CSV parser manually as raw bytes.
716    ///
717    /// This overrides any other setting (including `set_headers`). Any
718    /// automatic detection of headers is disabled. This may be called at any
719    /// time.
720    ///
721    /// # Example
722    ///
723    /// ```
724    /// use std::error::Error;
725    /// use csv_async::{AsyncDeserializer, ByteRecord};
726    ///
727    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
728    /// async fn example() -> Result<(), Box<dyn Error>> {
729    ///     let data = "\
730    /// city,country,pop
731    /// Boston,United States,4628910
732    /// ";
733    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
734    ///
735    ///     assert_eq!(rdr.byte_headers().await?, vec!["city", "country", "pop"]);
736    ///     rdr.set_byte_headers(ByteRecord::from(vec!["a", "b", "c"]));
737    ///     assert_eq!(rdr.byte_headers().await?, vec!["a", "b", "c"]);
738    ///
739    ///     Ok(())
740    /// }
741    /// ```
742    #[inline]
743    pub fn set_byte_headers(&mut self, headers: ByteRecord) {
744        self.0.set_byte_headers(headers);
745    }
746
747    /// Read a single row into the given record. Returns false when no more
748    /// records could be read.
749    ///
750    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
751    /// default), then this will never read the first record.
752    ///
753    /// This method is useful when you want to read records as fast as
754    /// as possible. It's less ergonomic than an iterator, but it permits the
755    /// caller to reuse the `StringRecord` allocation, which usually results
756    /// in higher throughput.
757    ///
758    /// Records read via this method are guaranteed to have a position set
759    /// on them, even if the reader is at EOF or if an error is returned.
760    ///
761    /// # Example
762    ///
763    /// ```
764    /// use std::error::Error;
765    /// use csv_async::{AsyncDeserializer, StringRecord};
766    ///
767    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
768    /// async fn example() -> Result<(), Box<dyn Error>> {
769    ///     let data = "\
770    /// city,country,pop
771    /// Boston,United States,4628910
772    /// ";
773    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
774    ///     let mut record = StringRecord::new();
775    ///
776    ///     if rdr.read_record(&mut record).await? {
777    ///         assert_eq!(record, vec!["Boston", "United States", "4628910"]);
778    ///         Ok(())
779    ///     } else {
780    ///         Err(From::from("expected at least one record but got none"))
781    ///     }
782    /// }
783    /// ```
784    #[inline]
785    pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool> {
786        self.0.read_record(record).await
787    }
788
789    /// Read a single row into the given byte record. Returns false when no
790    /// more records could be read.
791    ///
792    /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
793    /// default), then this will never read the first record.
794    ///
795    /// This method is useful when you want to read records as fast as
796    /// as possible. It's less ergonomic than an iterator, but it permits the
797    /// caller to reuse the `ByteRecord` allocation, which usually results
798    /// in higher throughput.
799    ///
800    /// Records read via this method are guaranteed to have a position set
801    /// on them, even if the reader is at EOF or if an error is returned.
802    ///
803    /// # Example
804    ///
805    /// ```
806    /// use std::error::Error;
807    /// use csv_async::{ByteRecord, AsyncDeserializer};
808    ///
809    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
810    /// async fn example() -> Result<(), Box<dyn Error>> {
811    ///     let data = "\
812    /// city,country,pop
813    /// Boston,United States,4628910
814    /// ";
815    ///     let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
816    ///     let mut record = ByteRecord::new();
817    ///
818    ///     if rdr.read_byte_record(&mut record).await? {
819    ///         assert_eq!(record, vec!["Boston", "United States", "4628910"]);
820    ///         Ok(())
821    ///     } else {
822    ///         Err(From::from("expected at least one record but got none"))
823    ///     }
824    /// }
825    /// ```
826    #[inline]
827    pub async fn read_byte_record(&mut self, record: &mut ByteRecord) -> Result<bool> {
828        self.0.read_byte_record(record).await
829    }
830
831    /// Return the current position of this CSV deserializer.
832    /// 
833    /// Because of borrowing rules this function can only be used when there is no
834    /// alive deserializer (which borrows mutable reader).
835    /// To know position during deserialization, `deserialize_with_pos` should be
836    /// used as shown in below example.
837    ///
838    /// The byte offset in the position returned can be used to `seek` this
839    /// deserializer. In particular, seeking to a position returned here on the same
840    /// data will result in parsing the same subsequent record.
841    ///
842    /// # Example: reading the position
843    ///
844    /// ```
845    /// use std::error::Error;
846    /// use futures::io;
847    /// use futures::stream::StreamExt;
848    /// use serde::Deserialize;
849    /// use csv_async::{AsyncDeserializer, Position};
850    ///
851    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
852    /// struct Row {
853    ///     city: String,
854    ///     country: String,
855    ///     popcount: u64,
856    /// }
857    ///
858    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
859    /// async fn example() -> Result<(), Box<dyn Error>> {
860    ///     let data = "\
861    /// city,country,popcount
862    /// Boston,United States,4628910
863    /// Concord,United States,42695
864    /// ";
865    ///     let mut rdr = AsyncDeserializer::from_reader(io::Cursor::new(data));
866    ///     let mut iter = rdr.deserialize_with_pos::<Row>();
867    ///     let mut pos_at_boston = Position::new();
868    ///     while let Some((rec, pos)) = iter.next().await {
869    ///         if rec?.city == "Boston" {
870    ///             pos_at_boston = pos;
871    ///         }
872    ///     }
873    ///     drop(iter); // releases rdr borrow by iter
874    ///     let pos_at_end = rdr.position();
875    /// 
876    ///     assert_eq!(pos_at_boston.byte(),  22);
877    ///     assert_eq!(pos_at_boston.line(),   2);
878    ///     assert_eq!(pos_at_boston.record(), 1);
879    ///     assert_eq!(pos_at_end.byte(),  79);
880    ///     assert_eq!(pos_at_end.line(),   4);
881    ///     assert_eq!(pos_at_end.record(), 3);
882    ///     Ok(())
883    /// }
884    /// ```
885    #[inline]
886    pub fn position(&self) -> &Position {
887        self.0.position()
888    }
889
890    /// Returns true if and only if this reader has been exhausted.
891    ///
892    /// When this returns true, no more records can be read from this reader
893    /// (unless it has been seek to another position).
894    ///
895    /// # Example
896    ///
897    /// ```
898    /// use std::error::Error;
899    /// use futures::io;
900    /// use futures::stream::StreamExt;
901    /// use serde::Deserialize;
902    /// use csv_async::{AsyncDeserializer, Position};
903    ///
904    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
905    /// struct Row {
906    ///     city: String,
907    ///     country: String,
908    ///     popcount: u64,
909    /// }
910    ///
911    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
912    /// async fn example() -> Result<(), Box<dyn Error>> {
913    ///     let data = "\
914    /// city,country,popcount
915    /// Boston,United States,4628910
916    /// Concord,United States,42695
917    /// ";
918    ///     let mut rdr = AsyncDeserializer::from_reader(io::Cursor::new(data));
919    ///     assert!(!rdr.is_done());
920    ///     {
921    ///         let mut records = rdr.deserialize::<Row>();
922    ///         while let Some(record) = records.next().await {
923    ///             let _ = record?;
924    ///         }
925    ///     }
926    ///     assert!(rdr.is_done());
927    ///     Ok(())
928    /// }
929    /// ```
930    #[inline]
931    pub fn is_done(&self) -> bool {
932        self.0.is_done()
933    }
934
935    /// Returns true if and only if this reader has been configured to
936    /// interpret the first record as a header record.
937    #[inline]
938    pub fn has_headers(&self) -> bool {
939        self.0.has_headers()
940    }
941
942    /// Returns a reference to the underlying reader.
943    #[inline]
944    pub fn get_ref(&self) -> &R {
945        self.0.get_ref()
946    }
947
948    /// Returns a mutable reference to the underlying reader.
949    #[inline]
950    pub fn get_mut(&mut self) -> &mut R {
951        self.0.get_mut()
952    }
953
954    /// Unwraps this CSV reader, returning the underlying reader.
955    ///
956    /// Note that any leftover data inside this reader's internal buffer is
957    /// lost.
958    #[inline]
959    pub fn into_inner(self) -> R {
960        self.0.into_inner()
961    }
962}
963
964impl<R: io::AsyncRead + io::AsyncSeek + std::marker::Unpin> AsyncDeserializer<R> {
965    /// Seeks the underlying reader to the position given.
966    ///
967    /// This comes with a few caveats:
968    ///
969    /// * Any internal buffer associated with this reader is cleared.
970    /// * If the given position does not correspond to a position immediately
971    ///   before the start of a record, then the behavior of this reader is
972    ///   unspecified.
973    /// * Any special logic that skips the first record in the CSV reader
974    ///   when reading or iterating over records is disabled.
975    ///
976    /// If the given position has a byte offset equivalent to the current
977    /// position, then no seeking is performed.
978    ///
979    /// If the header row has not already been read, then this will attempt
980    /// to read the header row before seeking. Therefore, it is possible that
981    /// this returns an error associated with reading CSV data.
982    ///
983    /// Note that seeking is performed based only on the byte offset in the
984    /// given position. Namely, the record or line numbers in the position may
985    /// be incorrect, but this will cause any future position generated by
986    /// this CSV reader to be similarly incorrect.
987    ///
988    /// # Example: seek to parse a record twice
989    ///
990    /// ```
991    /// use std::error::Error;
992    /// use futures::io;
993    /// use futures::stream::StreamExt;
994    /// use serde::Deserialize;
995    /// use csv_async::{AsyncDeserializer, Position};
996    ///
997    /// #[derive(Debug, Deserialize, Eq, PartialEq)]
998    /// struct Row {
999    ///     city: String,
1000    ///     country: String,
1001    ///     popcount: u64,
1002    /// }
1003    ///
1004    /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
1005    /// async fn example() -> Result<(), Box<dyn Error>> {
1006    ///     let data = "\
1007    /// city,country,popcount
1008    /// Boston,United States,4628910
1009    /// Concord,United States,42695
1010    /// ";
1011    ///     let mut rdr = AsyncDeserializer::from_reader(io::Cursor::new(data));
1012    ///     let mut records = rdr.deserialize_with_pos::<Row>();
1013    ///     let mut pos_at_boston = Position::new();
1014    ///     while let Some((rec, pos)) = records.next().await {
1015    ///         if rec?.city == "Boston" {
1016    ///             pos_at_boston = pos;
1017    ///             // no break here - we are reading data to end
1018    ///         }
1019    ///     }
1020    ///     drop(records); // releases rdr borrowed by records
1021    ///
1022    ///     // Now seek the reader back to `pos_at_boston`. This will let us read the
1023    ///     // Boston's record again.
1024    ///     rdr.seek(pos_at_boston).await?;
1025    ///     let mut records = rdr.into_deserialize::<Row>();
1026    ///     if let Some(rec) = records.next().await {
1027    ///         assert_eq!(rec?.city, "Boston");
1028    ///         Ok(())
1029    ///     } else {
1030    ///         Err(From::from("After seek we should be before last record, but was at end of stream"))
1031    ///     }
1032    /// }
1033    /// ```
1034    #[inline]
1035    pub async fn seek(&mut self, pos: Position) -> Result<()> {
1036        self.0.seek(pos).await
1037    }
1038
1039    /// This is like `seek`, but provides direct control over how the seeking
1040    /// operation is performed via `io::SeekFrom`.
1041    ///
1042    /// The `pos` position given *should* correspond the position indicated
1043    /// by `seek_from`, but there is no requirement. If the `pos` position
1044    /// given is incorrect, then the position information returned by this
1045    /// reader will be similarly incorrect.
1046    ///
1047    /// If the header row has not already been read, then this will attempt
1048    /// to read the header row before seeking. Therefore, it is possible that
1049    /// this returns an error associated with reading CSV data.
1050    ///
1051    /// Unlike `seek`, this will always cause an actual seek to be performed.
1052    #[inline]
1053    pub async fn seek_raw(
1054        &mut self,
1055        seek_from: io::SeekFrom,
1056        pos: Position,
1057    ) -> Result<()> {
1058        self.0.seek_raw(seek_from, pos).await
1059    }
1060}
1061
1062
1063#[cfg(test)]
1064mod tests {
1065    use std::pin::Pin;
1066    use std::task::{Context, Poll};
1067
1068    use futures::io;
1069    use futures::stream::StreamExt;
1070    use serde::Deserialize;
1071    use async_std::task;
1072
1073    use crate::byte_record::ByteRecord;
1074    use crate::error::ErrorKind;
1075    use crate::string_record::StringRecord;
1076    use crate::Trim;
1077
1078    use super::{Position, AsyncReaderBuilder, AsyncDeserializer};
1079
1080    fn b(s: &str) -> &[u8] {
1081        s.as_bytes()
1082    }
1083    fn s(b: &[u8]) -> &str {
1084        ::std::str::from_utf8(b).unwrap()
1085    }
1086
1087    fn newpos(byte: u64, line: u64, record: u64) -> Position {
1088        let mut p = Position::new();
1089        p.set_byte(byte).set_line(line).set_record(record);
1090        p
1091    }
1092
1093    async fn count(stream: impl StreamExt) -> usize {
1094        stream.fold(0, |acc, _| async move { acc + 1 }).await
1095    }
1096
1097    #[async_std::test]
1098    async fn read_byte_record() {
1099        let data = b("foo,\"b,ar\",baz\nabc,mno,xyz");
1100        let mut rdr =
1101            AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1102        let mut rec = ByteRecord::new();
1103
1104        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1105        assert_eq!(3, rec.len());
1106        assert_eq!("foo", s(&rec[0]));
1107        assert_eq!("b,ar", s(&rec[1]));
1108        assert_eq!("baz", s(&rec[2]));
1109
1110        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1111        assert_eq!(3, rec.len());
1112        assert_eq!("abc", s(&rec[0]));
1113        assert_eq!("mno", s(&rec[1]));
1114        assert_eq!("xyz", s(&rec[2]));
1115
1116        assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1117    }
1118
1119    #[async_std::test]
1120    async fn read_trimmed_records_and_headers() {
1121        let data = b("foo,  bar,\tbaz\n  1,  2,  3\n1\t,\t,3\t\t");
1122        let mut rdr = AsyncReaderBuilder::new()
1123            .has_headers(true)
1124            .trim(Trim::All)
1125            .create_deserializer(data);
1126        let mut rec = ByteRecord::new();
1127        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1128        assert_eq!("1", s(&rec[0]));
1129        assert_eq!("2", s(&rec[1]));
1130        assert_eq!("3", s(&rec[2]));
1131        let mut rec = StringRecord::new();
1132        assert!(rdr.read_record(&mut rec).await.unwrap());
1133        assert_eq!("1", &rec[0]);
1134        assert_eq!("", &rec[1]);
1135        assert_eq!("3", &rec[2]);
1136        {
1137            let headers = rdr.headers().await.unwrap();
1138            assert_eq!(3, headers.len());
1139            assert_eq!("foo", &headers[0]);
1140            assert_eq!("bar", &headers[1]);
1141            assert_eq!("baz", &headers[2]);
1142        }
1143    }
1144
1145    #[async_std::test]
1146    async fn read_trimmed_header() {
1147        let data = b("foo,  bar,\tbaz\n  1,  2,  3\n1\t,\t,3\t\t");
1148        let mut rdr = AsyncReaderBuilder::new()
1149            .has_headers(true)
1150            .trim(Trim::Headers)
1151            .create_deserializer(data);
1152        let mut rec = ByteRecord::new();
1153        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1154        assert_eq!("  1", s(&rec[0]));
1155        assert_eq!("  2", s(&rec[1]));
1156        assert_eq!("  3", s(&rec[2]));
1157        {
1158            let headers = rdr.headers().await.unwrap();
1159            assert_eq!(3, headers.len());
1160            assert_eq!("foo", &headers[0]);
1161            assert_eq!("bar", &headers[1]);
1162            assert_eq!("baz", &headers[2]);
1163        }
1164    }
1165
1166    #[async_std::test]
1167    async fn read_trimed_header_invalid_utf8() {
1168        let data = &b"foo,  b\xFFar,\tbaz\na,b,c\nd,e,f"[..];
1169        let mut rdr = AsyncReaderBuilder::new()
1170            .has_headers(true)
1171            .trim(Trim::Headers)
1172            .create_deserializer(data);
1173        let mut rec = StringRecord::new();
1174
1175        // force the headers to be read
1176        let _ = rdr.read_record(&mut rec).await;
1177        // Check the byte headers are trimmed
1178        {
1179            let headers = rdr.byte_headers().await.unwrap();
1180            assert_eq!(3, headers.len());
1181            assert_eq!(b"foo", &headers[0]);
1182            assert_eq!(b"b\xFFar", &headers[1]);
1183            assert_eq!(b"baz", &headers[2]);
1184        }
1185        match *rdr.headers().await.unwrap_err().kind() {
1186            ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
1187                assert_eq!(pos, &newpos(0, 1, 0));
1188                assert_eq!(err.field(), 1);
1189                assert_eq!(err.valid_up_to(), 3);
1190            }
1191            ref err => panic!("match failed, got {:?}", err),
1192        }
1193    }
1194
1195    #[async_std::test]
1196    async fn read_trimmed_records() {
1197        let data = b("foo,  bar,\tbaz\n  1,  2,  3\n1\t,\t,3\t\t");
1198        let mut rdr = AsyncReaderBuilder::new()
1199            .has_headers(true)
1200            .trim(Trim::Fields)
1201            .create_deserializer(data);
1202        let mut rec = ByteRecord::new();
1203        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1204        assert_eq!("1", s(&rec[0]));
1205        assert_eq!("2", s(&rec[1]));
1206        assert_eq!("3", s(&rec[2]));
1207        {
1208            let headers = rdr.headers().await.unwrap();
1209            assert_eq!(3, headers.len());
1210            assert_eq!("foo", &headers[0]);
1211            assert_eq!("  bar", &headers[1]);
1212            assert_eq!("\tbaz", &headers[2]);
1213        }
1214    }
1215
1216    #[async_std::test]
1217    async fn read_record_unequal_fails() {
1218        let data = b("foo\nbar,baz");
1219        let mut rdr =
1220            AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1221        let mut rec = ByteRecord::new();
1222
1223        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1224        assert_eq!(1, rec.len());
1225        assert_eq!("foo", s(&rec[0]));
1226
1227        match rdr.read_byte_record(&mut rec).await {
1228            Err(err) => match *err.kind() {
1229                ErrorKind::UnequalLengths {
1230                    expected_len: 1,
1231                    ref pos,
1232                    len: 2,
1233                } => {
1234                    assert_eq!(pos, &Some(newpos(4, 2, 1)));
1235                }
1236                ref wrong => panic!("match failed, got {:?}", wrong),
1237            },
1238            wrong => panic!("match failed, got {:?}", wrong),
1239        }
1240    }
1241
1242    #[async_std::test]
1243    async fn read_record_unequal_ok() {
1244        let data = b("foo\nbar,baz");
1245        let mut rdr = AsyncReaderBuilder::new()
1246            .has_headers(false)
1247            .flexible(true)
1248            .create_deserializer(data);
1249        let mut rec = ByteRecord::new();
1250
1251        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1252        assert_eq!(1, rec.len());
1253        assert_eq!("foo", s(&rec[0]));
1254
1255        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1256        assert_eq!(2, rec.len());
1257        assert_eq!("bar", s(&rec[0]));
1258        assert_eq!("baz", s(&rec[1]));
1259
1260        assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1261    }
1262
1263    // This tests that even if we get a CSV error, we can continue reading
1264    // if we want.
1265    #[async_std::test]
1266    async fn read_record_unequal_continue() {
1267        let data = b("foo\nbar,baz\nquux");
1268        let mut rdr =
1269            AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1270        let mut rec = ByteRecord::new();
1271
1272        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1273        assert_eq!(1, rec.len());
1274        assert_eq!("foo", s(&rec[0]));
1275
1276        match rdr.read_byte_record(&mut rec).await {
1277            Err(err) => match err.kind() {
1278                &ErrorKind::UnequalLengths {
1279                    expected_len: 1,
1280                    ref pos,
1281                    len: 2,
1282                } => {
1283                    assert_eq!(pos, &Some(newpos(4, 2, 1)));
1284                }
1285                wrong => panic!("match failed, got {:?}", wrong),
1286            },
1287            wrong => panic!("match failed, got {:?}", wrong),
1288        }
1289
1290        assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1291        assert_eq!(1, rec.len());
1292        assert_eq!("quux", s(&rec[0]));
1293
1294        assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1295    }
1296
1297    #[async_std::test]
1298    async fn read_record_headers() {
1299        let data = b("foo,bar,baz\na,b,c\nd,e,f");
1300        let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_deserializer(data);
1301        let mut rec = StringRecord::new();
1302
1303        assert!(rdr.read_record(&mut rec).await.unwrap());
1304        assert_eq!(3, rec.len());
1305        assert_eq!("a", &rec[0]);
1306
1307        assert!(rdr.read_record(&mut rec).await.unwrap());
1308        assert_eq!(3, rec.len());
1309        assert_eq!("d", &rec[0]);
1310
1311        assert!(!rdr.read_record(&mut rec).await.unwrap());
1312
1313        {
1314            let headers = rdr.byte_headers().await.unwrap();
1315            assert_eq!(3, headers.len());
1316            assert_eq!(b"foo", &headers[0]);
1317            assert_eq!(b"bar", &headers[1]);
1318            assert_eq!(b"baz", &headers[2]);
1319        }
1320        {
1321            let headers = rdr.headers().await.unwrap();
1322            assert_eq!(3, headers.len());
1323            assert_eq!("foo", &headers[0]);
1324            assert_eq!("bar", &headers[1]);
1325            assert_eq!("baz", &headers[2]);
1326        }
1327    }
1328
1329    #[async_std::test]
1330    async fn read_record_headers_invalid_utf8() {
1331        let data = &b"foo,b\xFFar,baz\na,b,c\nd,e,f"[..];
1332        let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_deserializer(data);
1333        let mut rec = StringRecord::new();
1334
1335        assert!(rdr.read_record(&mut rec).await.unwrap());
1336        assert_eq!(3, rec.len());
1337        assert_eq!("a", &rec[0]);
1338
1339        assert!(rdr.read_record(&mut rec).await.unwrap());
1340        assert_eq!(3, rec.len());
1341        assert_eq!("d", &rec[0]);
1342
1343        assert!(!rdr.read_record(&mut rec).await.unwrap());
1344
1345        // Check that we can read the headers as raw bytes, but that
1346        // if we read them as strings, we get an appropriate UTF-8 error.
1347        {
1348            let headers = rdr.byte_headers().await.unwrap();
1349            assert_eq!(3, headers.len());
1350            assert_eq!(b"foo", &headers[0]);
1351            assert_eq!(b"b\xFFar", &headers[1]);
1352            assert_eq!(b"baz", &headers[2]);
1353        }
1354        match *rdr.headers().await.unwrap_err().kind() {
1355            ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
1356                assert_eq!(pos, &newpos(0, 1, 0));
1357                assert_eq!(err.field(), 1);
1358                assert_eq!(err.valid_up_to(), 1);
1359            }
1360            ref err => panic!("match failed, got {:?}", err),
1361        }
1362    }
1363
1364    #[async_std::test]
1365    async fn read_record_no_headers_before() {
1366        let data = b("foo,bar,baz\na,b,c\nd,e,f");
1367        let mut rdr =
1368            AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1369        let mut rec = StringRecord::new();
1370
1371        {
1372            let headers = rdr.headers().await.unwrap();
1373            assert_eq!(3, headers.len());
1374            assert_eq!("foo", &headers[0]);
1375            assert_eq!("bar", &headers[1]);
1376            assert_eq!("baz", &headers[2]);
1377        }
1378
1379        assert!(rdr.read_record(&mut rec).await.unwrap());
1380        assert_eq!(3, rec.len());
1381        assert_eq!("foo", &rec[0]);
1382
1383        assert!(rdr.read_record(&mut rec).await.unwrap());
1384        assert_eq!(3, rec.len());
1385        assert_eq!("a", &rec[0]);
1386
1387        assert!(rdr.read_record(&mut rec).await.unwrap());
1388        assert_eq!(3, rec.len());
1389        assert_eq!("d", &rec[0]);
1390
1391        assert!(!rdr.read_record(&mut rec).await.unwrap());
1392    }
1393
1394    #[async_std::test]
1395    async fn read_record_no_headers_after() {
1396        let data = b("foo,bar,baz\na,b,c\nd,e,f");
1397        let mut rdr =
1398            AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1399        let mut rec = StringRecord::new();
1400
1401        assert!(rdr.read_record(&mut rec).await.unwrap());
1402        assert_eq!(3, rec.len());
1403        assert_eq!("foo", &rec[0]);
1404
1405        assert!(rdr.read_record(&mut rec).await.unwrap());
1406        assert_eq!(3, rec.len());
1407        assert_eq!("a", &rec[0]);
1408
1409        assert!(rdr.read_record(&mut rec).await.unwrap());
1410        assert_eq!(3, rec.len());
1411        assert_eq!("d", &rec[0]);
1412
1413        assert!(!rdr.read_record(&mut rec).await.unwrap());
1414
1415        let headers = rdr.headers().await.unwrap();
1416        assert_eq!(3, headers.len());
1417        assert_eq!("foo", &headers[0]);
1418        assert_eq!("bar", &headers[1]);
1419        assert_eq!("baz", &headers[2]);
1420    }
1421
1422    #[async_std::test]
1423    async fn seek() {
1424        let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1425        let mut rdr = AsyncReaderBuilder::new().create_deserializer(io::Cursor::new(data));
1426        rdr.seek(newpos(18, 3, 2)).await.unwrap();
1427
1428        let mut rec = StringRecord::new();
1429
1430        assert_eq!(18, rdr.position().byte());
1431        assert!(rdr.read_record(&mut rec).await.unwrap());
1432        assert_eq!(3, rec.len());
1433        assert_eq!("d", &rec[0]);
1434
1435        assert_eq!(24, rdr.position().byte());
1436        assert_eq!(4, rdr.position().line());
1437        assert_eq!(3, rdr.position().record());
1438        assert!(rdr.read_record(&mut rec).await.unwrap());
1439        assert_eq!(3, rec.len());
1440        assert_eq!("g", &rec[0]);
1441
1442        assert!(!rdr.read_record(&mut rec).await.unwrap());
1443    }
1444
1445    // Test that we can read headers after seeking even if the headers weren't
1446    // explicit read before seeking.
1447    #[async_std::test]
1448    async fn seek_headers_after() {
1449        let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1450        let mut rdr = AsyncReaderBuilder::new().create_deserializer(io::Cursor::new(data));
1451        rdr.seek(newpos(18, 3, 2)).await.unwrap();
1452        assert_eq!(rdr.headers().await.unwrap(), vec!["foo", "bar", "baz"]);
1453    }
1454
1455    // Test that we can read headers after seeking if the headers were read
1456    // before seeking.
1457    #[async_std::test]
1458    async fn seek_headers_before_after() {
1459        let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1460        let mut rdr = AsyncReaderBuilder::new().create_deserializer(io::Cursor::new(data));
1461        let headers = rdr.headers().await.unwrap().clone();
1462        rdr.seek(newpos(18, 3, 2)).await.unwrap();
1463        assert_eq!(&headers, rdr.headers().await.unwrap());
1464    }
1465
1466    // Test that even if we didn't read headers before seeking, if we seek to
1467    // the current byte offset, then no seeking is done and therefore we can
1468    // still read headers after seeking.
1469    #[async_std::test]
1470    async fn seek_headers_no_actual_seek() {
1471        let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1472        let mut rdr = AsyncReaderBuilder::new().create_deserializer(io::Cursor::new(data));
1473        rdr.seek(Position::new()).await.unwrap();
1474        assert_eq!("foo", &rdr.headers().await.unwrap()[0]);
1475    }
1476
1477    #[derive(Debug, Deserialize, Eq, PartialEq)]
1478    struct Row1([String; 3]);
1479
1480    // Test that position info is reported correctly in absence of headers.
1481    #[async_std::test]
1482    async fn positions_no_headers() {
1483        let mut rdr = AsyncReaderBuilder::new()
1484            .has_headers(false)
1485            .create_deserializer("a,b,c\nx,y,z".as_bytes())
1486            .into_deserialize_with_pos::<Row1>();
1487
1488        let (_, pos) = rdr.next().await.unwrap();
1489        assert_eq!(pos.byte(), 0);
1490        assert_eq!(pos.line(), 1);
1491        assert_eq!(pos.record(), 0);
1492
1493        let (_, pos) = rdr.next().await.unwrap();
1494        assert_eq!(pos.byte(), 6);
1495        assert_eq!(pos.line(), 2);
1496        assert_eq!(pos.record(), 1);
1497
1498        // Test that we are at end of stream, and properly signal this.
1499        assert!(rdr.next().await.is_none());
1500        // Testing that we are not panic, trying to pass over end of stream (Issue#22)
1501        assert!(rdr.next().await.is_none());
1502    }
1503
1504    // Test that position info is reported correctly with headers.
1505    #[async_std::test]
1506    async fn positions_headers() {
1507        let mut rdr = AsyncReaderBuilder::new()
1508            .has_headers(true)
1509            .create_deserializer("a,b,c\nx,y,z".as_bytes())
1510            .into_deserialize_with_pos::<Row1>();
1511
1512        let (_, pos) = rdr.next().await.unwrap();
1513        assert_eq!(pos.byte(), 6);
1514        assert_eq!(pos.line(), 2);
1515        // We could not count header as record, but we keep compatibility with 'csv' crate.
1516        assert_eq!(pos.record(), 1);
1517    }
1518
1519    // Test that reading headers on empty data yields an empty record.
1520    #[async_std::test]
1521    async fn headers_on_empty_data() {
1522        let mut rdr = AsyncReaderBuilder::new().create_deserializer("".as_bytes());
1523        let r = rdr.byte_headers().await.unwrap();
1524        assert_eq!(r.len(), 0);
1525    }
1526
1527    // Test that reading the first record on empty data works.
1528    #[async_std::test]
1529    async fn no_headers_on_empty_data() {
1530        let mut rdr =
1531        AsyncReaderBuilder::new().has_headers(false).create_deserializer("".as_bytes());
1532        assert_eq!(count(rdr.deserialize::<Row1>()).await, 0);
1533    }
1534
1535    // Test that reading the first record on empty data works, even if
1536    // we've tried to read headers before hand.
1537    #[async_std::test]
1538    async fn no_headers_on_empty_data_after_headers() {
1539        let mut rdr =
1540            AsyncReaderBuilder::new().has_headers(false).create_deserializer("".as_bytes());
1541        assert_eq!(rdr.headers().await.unwrap().len(), 0);
1542        assert_eq!(count(rdr.deserialize::<Row1>()).await, 0);
1543    }
1544
1545    #[test]
1546    fn behavior_on_io_errors() {
1547        struct FailingRead;
1548        impl io::AsyncRead for FailingRead {
1549            fn poll_read(
1550                self: Pin<&mut Self>,
1551                _cx: &mut Context,
1552                _buf: &mut [u8]
1553            ) -> Poll<Result<usize, io::Error>> {
1554                Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Broken reader")))
1555            }
1556        }
1557        impl std::marker::Unpin for FailingRead {}
1558
1559        #[derive(Deserialize)]
1560        struct Fake;
1561    
1562        task::block_on(async {
1563            let mut records = AsyncDeserializer::from_reader(FailingRead).into_deserialize::<Fake>();
1564            let first_record = records.next().await;
1565            assert!(
1566                matches!(&first_record, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1567            );
1568            assert!(records.next().await.is_none());
1569        });
1570    
1571        task::block_on(async {
1572            let mut records = AsyncReaderBuilder::new()
1573                .end_on_io_error(false)
1574                .create_deserializer(FailingRead)
1575                .into_deserialize::<Fake>();
1576            let first_record = records.next().await;
1577            assert!(
1578                matches!(&first_record, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1579            );
1580            let second_record = records.next().await;
1581            assert!(
1582                matches!(&second_record, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1583            );
1584        });
1585    }
1586}