csv_async/async_readers/ades_futures.rs
1use futures::io;
2use serde::de::DeserializeOwned;
3
4use crate::AsyncReaderBuilder;
5use crate::byte_record::{ByteRecord, Position};
6use crate::error::Result;
7use crate::string_record::StringRecord;
8use super::{
9 AsyncReaderImpl,
10 DeserializeRecordsStream, DeserializeRecordsIntoStream,
11 DeserializeRecordsStreamPos, DeserializeRecordsIntoStreamPos,
12};
13
14
15impl AsyncReaderBuilder {
16 /// Build a CSV `serde` deserializer from this configuration that reads data from `rdr`.
17 ///
18 /// Note that the CSV reader is buffered automatically, so you should not
19 /// wrap `rdr` in a buffered reader like `io::BufReader`.
20 ///
21 /// # Example
22 ///
23 /// ```
24 /// use std::error::Error;
25 /// use futures::stream::StreamExt;
26 /// use serde::Deserialize;
27 /// use csv_async::AsyncReaderBuilder;
28 ///
29 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
30 /// struct Row {
31 /// city: String,
32 /// country: String,
33 /// pop: u64,
34 /// }
35 ///
36 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
37 /// async fn example() -> Result<(), Box<dyn Error>> {
38 /// let data = "\
39 /// city,country,pop
40 /// Boston,United States,4628910
41 /// Concord,United States,42695
42 /// ";
43 /// let mut rdr = AsyncReaderBuilder::new().create_deserializer(data.as_bytes());
44 /// let mut records = rdr.into_deserialize::<Row>();
45 /// while let Some(record) = records.next().await {
46 /// println!("{:?}", record?);
47 /// }
48 /// Ok(())
49 /// }
50 /// ```
51 pub fn create_deserializer<R: io::AsyncRead + Unpin + Send>(&self, rdr: R) -> AsyncDeserializer<R> {
52 AsyncDeserializer::new(self, rdr)
53 }
54}
55
56/// Configured CSV `serde` deserializer.
57///
58/// A CSV deserializer takes as input CSV data and transforms that into standard Rust
59/// values. The reader reads CSV data is as a sequence of records,
60/// where a record is either a sequence of string fields or structure with derived
61/// `serde::Deserialize` interface.
62///
63/// # Configuration
64///
65/// A CSV deserializer has convenient constructor method `from_reader`.
66/// However, if you want to configure the CSV deserializer to use
67/// a different delimiter or quote character (among many other things), then
68/// you should use a [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html) to construct
69/// a `AsyncDeserializer`. For example, to change the field delimiter:
70///
71/// ```
72/// use std::error::Error;
73/// use futures::stream::StreamExt;
74/// use serde::Deserialize;
75/// use csv_async::AsyncReaderBuilder;
76///
77/// #[derive(Debug, Deserialize, Eq, PartialEq)]
78/// struct Row {
79/// city: String,
80/// country: String,
81/// pop: u64,
82/// }
83///
84/// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
85/// async fn example() -> Result<(), Box<dyn Error>> {
86/// let data = indoc::indoc! {"
87/// city;country;pop
88/// Boston;United States;4628910
89/// "};
90/// let mut rdr = AsyncReaderBuilder::new()
91/// .delimiter(b';')
92/// .create_deserializer(data.as_bytes());
93///
94/// let mut records = rdr.deserialize::<Row>();
95/// assert_eq!(records.next().await.unwrap()?,
96/// Row {city: "Boston".to_string(),
97/// country: "United States".to_string(),
98/// pop: 4628910 });
99/// Ok(())
100/// }
101/// ```
102///
103/// # Error handling
104///
105/// In general, CSV *parsing* does not ever return an error. That is, there is
106/// no such thing as malformed CSV data. Instead, this reader will prioritize
107/// finding a parse over rejecting CSV data that it does not understand. This
108/// choice was inspired by other popular CSV parsers, but also because it is
109/// pragmatic. CSV data varies wildly, so even if the CSV data is malformed,
110/// it might still be possible to work with the data. In the land of CSV, there
111/// is no "right" or "wrong," only "right" and "less right."
112///
113/// With that said, a number of errors can occur while reading CSV data:
114///
115/// * By default, all records in CSV data must have the same number of fields.
116/// If a record is found with a different number of fields than a prior
117/// record, then an error is returned. This behavior can be disabled by
118/// enabling flexible parsing via the `flexible` method on
119/// [`AsyncReaderBuilder`](struct.AsyncReaderBuilder.html).
120/// * When reading CSV data from a resource (like a file), it is possible for
121/// reading from the underlying resource to fail. This will return an error.
122/// For subsequent calls to the reader after encountering a such error
123/// (unless `seek` is used), it will behave as if end of file had been
124/// reached, in order to avoid running into infinite loops when still
125/// attempting to read the next record when one has errored.
126/// * When reading CSV data into `String` or `&str` fields (e.g., via a
127/// [`StringRecord`](struct.StringRecord.html)), UTF-8 is strictly
128/// enforced. If CSV data is invalid UTF-8, then an error is returned. If
129/// you want to read invalid UTF-8, then you should use the byte oriented
130/// APIs such as [`ByteRecord`](struct.ByteRecord.html). If you need explicit
131/// support for another encoding entirely, then you'll need to use another
132/// crate to transcode your CSV data to UTF-8 before parsing it.
133/// * When using Serde to deserialize CSV data into Rust types, it is possible
134/// for a number of additional errors to occur. For example, deserializing
135/// a field `xyz` into an `i32` field will result in an error.
136///
137/// For more details on the precise semantics of errors, see the
138/// [`Error`](enum.Error.html) type.
139#[derive(Debug)]
140pub struct AsyncDeserializer<R>(AsyncReaderImpl<R>);
141
142impl<'r, R> AsyncDeserializer<R>
143where
144 R: io::AsyncRead + Unpin + Send + 'r,
145{
146 /// Create a new CSV reader given a builder and a source of underlying
147 /// bytes.
148 fn new(builder: &AsyncReaderBuilder, rdr: R) -> AsyncDeserializer<R> {
149 AsyncDeserializer(AsyncReaderImpl::new(builder, rdr))
150 }
151
152 /// Create a new CSV parser with a default configuration for the given
153 /// reader.
154 ///
155 /// To customize CSV parsing, use a `ReaderBuilder`.
156 ///
157 /// # Example
158 ///
159 /// ```
160 /// use std::error::Error;
161 /// use futures::stream::StreamExt;
162 /// use serde::Deserialize;
163 /// use csv_async::AsyncDeserializer;
164 ///
165 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
166 /// struct Row {
167 /// city: String,
168 /// country: String,
169 /// pop: u64,
170 /// }
171 ///
172 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
173 /// async fn example() -> Result<(), Box<dyn Error>> {
174 /// let data = "\
175 /// city,country,pop
176 /// Boston,United States,4628910
177 /// Concord,United States,42695
178 /// ";
179 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
180 /// let mut records = rdr.into_deserialize::<Row>();
181 /// while let Some(record) = records.next().await {
182 /// println!("{:?}", record?);
183 /// }
184 /// Ok(())
185 /// }
186 /// ```
187 #[inline]
188 pub fn from_reader(rdr: R) -> AsyncDeserializer<R> {
189 AsyncReaderBuilder::new().create_deserializer(rdr)
190 }
191
192 /// Returns a borrowed stream over deserialized records.
193 ///
194 /// Each item yielded by this stream is a `Result<D, Error>`.
195 /// Therefore, in order to access the record, callers must handle the
196 /// possibility of error (typically with `?`).
197 ///
198 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
199 /// default), then this does not include the first record. Additionally,
200 /// if `has_headers` is enabled, then deserializing into a struct will
201 /// automatically align the values in each row to the fields of a struct
202 /// based on the header row.
203 ///
204 /// Frequently turbo-fish notation is needed while calling this function:
205 /// `rdr.deserialize::<RecordTyme>();`
206 ///
207 /// # Example
208 ///
209 /// This shows how to deserialize CSV data into normal Rust structures. The
210 /// fields of the header row are used to match up the values in each row
211 /// to the fields of the struct.
212 ///
213 /// ```
214 /// use std::error::Error;
215 ///
216 /// use futures::stream::StreamExt;
217 /// use serde::Deserialize;
218 /// use csv_async::AsyncDeserializer;
219 ///
220 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
221 /// struct Row {
222 /// city: String,
223 /// country: String,
224 /// #[serde(rename = "popcount")]
225 /// population: u64,
226 /// }
227 ///
228 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
229 /// async fn example() -> Result<(), Box<dyn Error>> {
230 /// let data = "\
231 /// city,country,popcount
232 /// Boston,United States,4628910
233 /// ";
234 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
235 /// let mut iter = rdr.deserialize();
236 ///
237 /// if let Some(result) = iter.next().await {
238 /// let record: Row = result?;
239 /// assert_eq!(record, Row {
240 /// city: "Boston".to_string(),
241 /// country: "United States".to_string(),
242 /// population: 4628910,
243 /// });
244 /// Ok(())
245 /// } else {
246 /// Err(From::from("expected at least one record but got none"))
247 /// }
248 /// }
249 /// ```
250 ///
251 /// # Rules
252 ///
253 /// For the most part, any Rust type that maps straight-forwardly to a CSV
254 /// record is supported. This includes maps, structs, tuples and tuple
255 /// structs. Other Rust types, such as `Vec`s, arrays, and enums have
256 /// a more complicated story. In general, when working with CSV data, one
257 /// should avoid *nested sequences* as much as possible.
258 ///
259 /// Maps, structs, tuples and tuple structs map to CSV records in a simple
260 /// way. Tuples and tuple structs decode their fields in the order that
261 /// they are defined. Structs will do the same only if `has_headers` has
262 /// been disabled using [`ReaderBuilder`](struct.ReaderBuilder.html),
263 /// otherwise, structs and maps are deserialized based on the fields
264 /// defined in the header row. (If there is no header row, then
265 /// deserializing into a map will result in an error.)
266 ///
267 /// Nested sequences are supported in a limited capacity. Namely, they
268 /// are flattened. As a result, it's often useful to use a `Vec` to capture
269 /// a "tail" of fields in a record:
270 ///
271 /// ```
272 /// use std::error::Error;
273 ///
274 /// use futures::stream::StreamExt;
275 /// use serde::Deserialize;
276 /// use csv_async::AsyncReaderBuilder;
277 ///
278 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
279 /// struct Row {
280 /// label: String,
281 /// values: Vec<i32>,
282 /// }
283 ///
284 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
285 /// async fn example() -> Result<(), Box<dyn Error>> {
286 /// let data = "foo,1,2,3";
287 /// let mut rdr = AsyncReaderBuilder::new()
288 /// .has_headers(false)
289 /// .create_deserializer(data.as_bytes());
290 /// let mut iter = rdr.deserialize();
291 ///
292 /// if let Some(result) = iter.next().await {
293 /// let record: Row = result?;
294 /// assert_eq!(record, Row {
295 /// label: "foo".to_string(),
296 /// values: vec![1, 2, 3],
297 /// });
298 /// Ok(())
299 /// } else {
300 /// Err(From::from("expected at least one record but got none"))
301 /// }
302 /// }
303 /// ```
304 ///
305 /// In the above example, adding another field to the `Row` struct after
306 /// the `values` field will result in a deserialization error. This is
307 /// because the deserializer doesn't know when to stop reading fields
308 /// into the `values` vector, so it will consume the rest of the fields in
309 /// the record leaving none left over for the additional field.
310 ///
311 /// Finally, simple enums in Rust can be deserialized as well. Namely,
312 /// enums must either be variants with no arguments or variants with a
313 /// single argument. Variants with no arguments are deserialized based on
314 /// which variant name the field matches. Variants with a single argument
315 /// are deserialized based on which variant can store the data. The latter
316 /// is only supported when using "untagged" enum deserialization. The
317 /// following example shows both forms in action:
318 ///
319 /// ```
320 /// use std::error::Error;
321 ///
322 /// use futures::stream::StreamExt;
323 /// use serde::Deserialize;
324 /// use csv_async::AsyncDeserializer;
325 ///
326 /// #[derive(Debug, Deserialize, PartialEq)]
327 /// struct Row {
328 /// label: Label,
329 /// value: Number,
330 /// }
331 ///
332 /// #[derive(Debug, Deserialize, PartialEq)]
333 /// #[serde(rename_all = "lowercase")]
334 /// enum Label {
335 /// Celsius,
336 /// Fahrenheit,
337 /// }
338 ///
339 /// #[derive(Debug, Deserialize, PartialEq)]
340 /// #[serde(untagged)]
341 /// enum Number {
342 /// Integer(i64),
343 /// Float(f64),
344 /// }
345 ///
346 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
347 /// async fn example() -> Result<(), Box<dyn Error>> {
348 /// let data = "\
349 /// label,value
350 /// celsius,22.2222
351 /// fahrenheit,72
352 /// ";
353 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
354 /// let mut iter = rdr.deserialize();
355 ///
356 /// // Read the first record.
357 /// if let Some(result) = iter.next().await {
358 /// let record: Row = result?;
359 /// assert_eq!(record, Row {
360 /// label: Label::Celsius,
361 /// value: Number::Float(22.2222),
362 /// });
363 /// } else {
364 /// return Err(From::from(
365 /// "expected at least two records but got none"));
366 /// }
367 ///
368 /// // Read the second record.
369 /// if let Some(result) = iter.next().await {
370 /// let record: Row = result?;
371 /// assert_eq!(record, Row {
372 /// label: Label::Fahrenheit,
373 /// value: Number::Integer(72),
374 /// });
375 /// Ok(())
376 /// } else {
377 /// Err(From::from(
378 /// "expected at least two records but got only one"))
379 /// }
380 /// }
381 /// ```
382 #[inline]
383 pub fn deserialize<D:'r>(&'r mut self) -> DeserializeRecordsStream<'r, R, D>
384 where
385 D: DeserializeOwned,
386 {
387 DeserializeRecordsStream::new(& mut self.0)
388 }
389
390 /// Returns a borrowed stream over pairs of deserialized record and position
391 /// in reader stream before record read.
392 ///
393 /// Each item yielded by this stream is a `(Result<D, Error>, Position)`.
394 /// Therefore, in order to access the record, callers must handle the
395 /// possibility of error (typically with `?`).
396 ///
397 /// Frequently turbo-fish notation is needed while calling this function:
398 /// `rdr.deserialize_with_pos::<RecordTyme>();`
399 ///
400 /// # Example
401 ///
402 /// This shows how to deserialize CSV data into normal Rust structures. The
403 /// fields of the header row are used to match up the values in each row
404 /// to the fields of the struct.
405 ///
406 /// ```
407 /// use std::error::Error;
408 ///
409 /// use futures::stream::StreamExt;
410 /// use serde::Deserialize;
411 /// use csv_async::AsyncDeserializer;
412 ///
413 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
414 /// struct Row {
415 /// city: String,
416 /// country: String,
417 /// population: u64,
418 /// }
419 ///
420 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
421 /// async fn example() -> Result<(), Box<dyn Error>> {
422 /// let data = "\
423 /// city,country,population
424 /// Boston,United States,4628910
425 /// Concord,United States,42695
426 /// ";
427 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
428 /// let mut iter = rdr.deserialize_with_pos();
429 ///
430 /// if let Some((result, pos)) = iter.next().await {
431 /// let record: Row = result?;
432 /// assert_eq!(record, Row {
433 /// city: "Boston".to_string(),
434 /// country: "United States".to_string(),
435 /// population: 4628910,
436 /// });
437 /// assert_eq!(pos.byte(), 24);
438 /// assert_eq!(pos.line(), 2);
439 /// assert_eq!(pos.record(), 1);
440 /// } else {
441 /// return Err(From::from("expected at least one record but got none"));
442 /// }
443 /// if let Some((result, pos)) = iter.next().await {
444 /// let record: Row = result?;
445 /// assert_eq!(record, Row {
446 /// city: "Concord".to_string(),
447 /// country: "United States".to_string(),
448 /// population: 42695,
449 /// });
450 /// assert_eq!(pos.byte(), 53);
451 /// assert_eq!(pos.line(), 3);
452 /// assert_eq!(pos.record(), 2);
453 /// } else {
454 /// return Err(From::from("expected at least two records but got one only"));
455 /// }
456 /// assert!(iter.next().await.is_none());
457 /// Ok(())
458 /// }
459 /// ```
460 #[inline]
461 pub fn deserialize_with_pos<D:'r>(&'r mut self) -> DeserializeRecordsStreamPos<'r, R, D>
462 where
463 D: DeserializeOwned,
464 {
465 DeserializeRecordsStreamPos::new(& mut self.0)
466 }
467
468 /// Returns a owned stream over deserialized records.
469 ///
470 /// Each item yielded by this stream is a `Result<D, Error>`.
471 /// Therefore, in order to access the record, callers must handle the
472 /// possibility of error (typically with `?`).
473 ///
474 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
475 /// default), then this does not include the first record. Additionally,
476 /// if `has_headers` is enabled, then deserializing into a struct will
477 /// automatically align the values in each row to the fields of a struct
478 /// based on the header row.
479 ///
480 /// Frequently turbo-fish notation is needed while calling this function:
481 /// `rdr.into_deserialize::<RecordTyme>();`
482 ///
483 /// # Example
484 ///
485 /// This shows how to deserialize CSV data into normal Rust structs. The
486 /// fields of the header row are used to match up the values in each row
487 /// to the fields of the struct.
488 ///
489 /// ```
490 /// use std::error::Error;
491 ///
492 /// use futures::stream::StreamExt;
493 /// use serde::Deserialize;
494 /// use csv_async::AsyncDeserializer;
495 ///
496 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
497 /// struct Row {
498 /// city: String,
499 /// country: String,
500 /// #[serde(rename = "popcount")]
501 /// population: u64,
502 /// }
503 ///
504 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
505 /// async fn example() -> Result<(), Box<dyn Error>> {
506 /// let data = "\
507 /// city,country,popcount
508 /// Boston,United States,4628910
509 /// ";
510 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
511 /// let mut iter = rdr.into_deserialize();
512 ///
513 /// if let Some(result) = iter.next().await {
514 /// let record: Row = result?;
515 /// assert_eq!(record, Row {
516 /// city: "Boston".to_string(),
517 /// country: "United States".to_string(),
518 /// population: 4628910,
519 /// });
520 /// Ok(())
521 /// } else {
522 /// Err(From::from("expected at least one record but got none"))
523 /// }
524 /// }
525 /// ```
526 #[inline]
527 pub fn into_deserialize<D:'r>(self) -> DeserializeRecordsIntoStream<'r, R, D>
528 where
529 D: DeserializeOwned,
530 {
531 DeserializeRecordsIntoStream::new(self.0)
532 }
533
534 /// Returns a owned stream over pairs of deserialized record and position
535 /// in reader stream before record read.
536 ///
537 #[inline]
538 pub fn into_deserialize_with_pos<D:'r>(self) -> DeserializeRecordsIntoStreamPos<'r, R, D>
539 where
540 D: DeserializeOwned,
541 {
542 DeserializeRecordsIntoStreamPos::new(self.0)
543 }
544
545 /// Returns a reference to the first row read by this parser.
546 ///
547 /// If no row has been read yet, then this will force parsing of the first
548 /// row.
549 ///
550 /// If there was a problem parsing the row or if it wasn't valid UTF-8,
551 /// then this returns an error.
552 ///
553 /// If the underlying reader emits EOF before any data, then this returns
554 /// an empty record.
555 ///
556 /// Note that this method may be used regardless of whether `has_headers`
557 /// was enabled (but it is enabled by default).
558 ///
559 /// # Example
560 ///
561 /// This example shows how to get the header row of CSV data. Notice that
562 /// the header row does not appear as a record in the iterator!
563 ///
564 /// ```
565 /// use std::error::Error;
566 /// use futures::stream::StreamExt;
567 /// use serde::Deserialize;
568 /// use csv_async::AsyncDeserializer;
569 ///
570 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
571 /// struct Row {
572 /// city: String,
573 /// country: String,
574 /// pop: u64,
575 /// }
576 ///
577 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
578 /// async fn example() -> Result<(), Box<dyn Error>> {
579 /// let data = "\
580 /// city,country,pop
581 /// Boston,United States,4628910
582 /// ";
583 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
584 ///
585 /// // We can read the headers before iterating.
586 /// {
587 /// // `headers` borrows from the reader, so we put this in its
588 /// // own scope. That way, the borrow ends before we try iterating
589 /// // below. Alternatively, we could clone the headers.
590 /// let headers = rdr.headers().await?;
591 /// assert_eq!(headers, vec!["city", "country", "pop"]);
592 /// }
593 ///
594 /// {
595 /// let mut records = rdr.deserialize::<Row>();
596 /// assert_eq!(records.next().await.unwrap()?,
597 /// Row {city: "Boston".to_string(),
598 /// country: "United States".to_string(),
599 /// pop: 4628910 });
600 /// assert!(records.next().await.is_none());
601 /// }
602 ///
603 /// // We can also read the headers after iterating.
604 /// let headers = rdr.headers().await?;
605 /// assert_eq!(headers, vec!["city", "country", "pop"]);
606 /// Ok(())
607 /// }
608 /// ```
609 #[inline]
610 pub async fn headers(&mut self) -> Result<&StringRecord> {
611 self.0.headers().await
612 }
613
614 /// Returns a reference to the first row read by this parser as raw bytes.
615 ///
616 /// If no row has been read yet, then this will force parsing of the first
617 /// row.
618 ///
619 /// If there was a problem parsing the row then this returns an error.
620 ///
621 /// If the underlying reader emits EOF before any data, then this returns
622 /// an empty record.
623 ///
624 /// Note that this method may be used regardless of whether `has_headers`
625 /// was enabled (but it is enabled by default).
626 ///
627 /// # Example
628 ///
629 /// This example shows how to get the header row of CSV data. Notice that
630 /// the header row does not appear as a record in the iterator!
631 ///
632 /// ```
633 /// use std::error::Error;
634 /// use futures::stream::StreamExt;
635 /// use serde::Deserialize;
636 /// use csv_async::AsyncDeserializer;
637 ///
638 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
639 /// struct Row {
640 /// city: String,
641 /// country: String,
642 /// #[serde(rename = "pop")]
643 /// population: u64,
644 /// }
645 ///
646 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
647 /// async fn example() -> Result<(), Box<dyn Error>> {
648 /// let data = indoc::indoc! {"
649 /// city,country,pop
650 /// Boston,United States,4628910
651 /// "};
652 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
653 ///
654 /// // We can read the headers before iterating.
655 /// {
656 /// // `headers` borrows from the reader, so we put this in its
657 /// // own scope. That way, the borrow ends before we try iterating
658 /// // below. Alternatively, we could clone the headers.
659 /// let headers = rdr.byte_headers().await?;
660 /// assert_eq!(headers, vec!["city", "country", "pop"]);
661 /// }
662 ///
663 /// {
664 /// let mut records = rdr.deserialize::<Row>();
665 /// assert_eq!(records.next().await.unwrap()?,
666 /// Row {city: "Boston".to_string(),
667 /// country: "United States".to_string(),
668 /// population: 4628910 });
669 /// assert!(records.next().await.is_none());
670 /// }
671 ///
672 /// // We can also read the headers after iterating.
673 /// let headers = rdr.byte_headers().await?;
674 /// assert_eq!(headers, vec!["city", "country", "pop"]);
675 /// Ok(())
676 /// }
677 /// ```
678 #[inline]
679 pub async fn byte_headers(&mut self) -> Result<&ByteRecord> {
680 self.0.byte_headers().await
681 }
682
683 /// Set the headers of this CSV parser manually.
684 ///
685 /// This overrides any other setting (including `set_byte_headers`). Any
686 /// automatic detection of headers is disabled. This may be called at any
687 /// time.
688 ///
689 /// # Example
690 ///
691 /// ```
692 /// use std::error::Error;
693 /// use csv_async::{AsyncDeserializer, StringRecord};
694 ///
695 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
696 /// async fn example() -> Result<(), Box<dyn Error>> {
697 /// let data = "\
698 /// city,country,pop
699 /// Boston,United States,4628910
700 /// ";
701 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
702 ///
703 /// assert_eq!(rdr.headers().await?, vec!["city", "country", "pop"]);
704 /// rdr.set_headers(StringRecord::from(vec!["a", "b", "c"]));
705 /// assert_eq!(rdr.headers().await?, vec!["a", "b", "c"]);
706 ///
707 /// Ok(())
708 /// }
709 /// ```
710 #[inline]
711 pub fn set_headers(&mut self, headers: StringRecord) {
712 self.0.set_headers(headers);
713 }
714
715 /// Set the headers of this CSV parser manually as raw bytes.
716 ///
717 /// This overrides any other setting (including `set_headers`). Any
718 /// automatic detection of headers is disabled. This may be called at any
719 /// time.
720 ///
721 /// # Example
722 ///
723 /// ```
724 /// use std::error::Error;
725 /// use csv_async::{AsyncDeserializer, ByteRecord};
726 ///
727 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
728 /// async fn example() -> Result<(), Box<dyn Error>> {
729 /// let data = "\
730 /// city,country,pop
731 /// Boston,United States,4628910
732 /// ";
733 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
734 ///
735 /// assert_eq!(rdr.byte_headers().await?, vec!["city", "country", "pop"]);
736 /// rdr.set_byte_headers(ByteRecord::from(vec!["a", "b", "c"]));
737 /// assert_eq!(rdr.byte_headers().await?, vec!["a", "b", "c"]);
738 ///
739 /// Ok(())
740 /// }
741 /// ```
742 #[inline]
743 pub fn set_byte_headers(&mut self, headers: ByteRecord) {
744 self.0.set_byte_headers(headers);
745 }
746
747 /// Read a single row into the given record. Returns false when no more
748 /// records could be read.
749 ///
750 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
751 /// default), then this will never read the first record.
752 ///
753 /// This method is useful when you want to read records as fast as
754 /// as possible. It's less ergonomic than an iterator, but it permits the
755 /// caller to reuse the `StringRecord` allocation, which usually results
756 /// in higher throughput.
757 ///
758 /// Records read via this method are guaranteed to have a position set
759 /// on them, even if the reader is at EOF or if an error is returned.
760 ///
761 /// # Example
762 ///
763 /// ```
764 /// use std::error::Error;
765 /// use csv_async::{AsyncDeserializer, StringRecord};
766 ///
767 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
768 /// async fn example() -> Result<(), Box<dyn Error>> {
769 /// let data = "\
770 /// city,country,pop
771 /// Boston,United States,4628910
772 /// ";
773 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
774 /// let mut record = StringRecord::new();
775 ///
776 /// if rdr.read_record(&mut record).await? {
777 /// assert_eq!(record, vec!["Boston", "United States", "4628910"]);
778 /// Ok(())
779 /// } else {
780 /// Err(From::from("expected at least one record but got none"))
781 /// }
782 /// }
783 /// ```
784 #[inline]
785 pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool> {
786 self.0.read_record(record).await
787 }
788
789 /// Read a single row into the given byte record. Returns false when no
790 /// more records could be read.
791 ///
792 /// If `has_headers` was enabled via a `ReaderBuilder` (which is the
793 /// default), then this will never read the first record.
794 ///
795 /// This method is useful when you want to read records as fast as
796 /// as possible. It's less ergonomic than an iterator, but it permits the
797 /// caller to reuse the `ByteRecord` allocation, which usually results
798 /// in higher throughput.
799 ///
800 /// Records read via this method are guaranteed to have a position set
801 /// on them, even if the reader is at EOF or if an error is returned.
802 ///
803 /// # Example
804 ///
805 /// ```
806 /// use std::error::Error;
807 /// use csv_async::{ByteRecord, AsyncDeserializer};
808 ///
809 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
810 /// async fn example() -> Result<(), Box<dyn Error>> {
811 /// let data = "\
812 /// city,country,pop
813 /// Boston,United States,4628910
814 /// ";
815 /// let mut rdr = AsyncDeserializer::from_reader(data.as_bytes());
816 /// let mut record = ByteRecord::new();
817 ///
818 /// if rdr.read_byte_record(&mut record).await? {
819 /// assert_eq!(record, vec!["Boston", "United States", "4628910"]);
820 /// Ok(())
821 /// } else {
822 /// Err(From::from("expected at least one record but got none"))
823 /// }
824 /// }
825 /// ```
826 #[inline]
827 pub async fn read_byte_record(&mut self, record: &mut ByteRecord) -> Result<bool> {
828 self.0.read_byte_record(record).await
829 }
830
831 /// Return the current position of this CSV deserializer.
832 ///
833 /// Because of borrowing rules this function can only be used when there is no
834 /// alive deserializer (which borrows mutable reader).
835 /// To know position during deserialization, `deserialize_with_pos` should be
836 /// used as shown in below example.
837 ///
838 /// The byte offset in the position returned can be used to `seek` this
839 /// deserializer. In particular, seeking to a position returned here on the same
840 /// data will result in parsing the same subsequent record.
841 ///
842 /// # Example: reading the position
843 ///
844 /// ```
845 /// use std::error::Error;
846 /// use futures::io;
847 /// use futures::stream::StreamExt;
848 /// use serde::Deserialize;
849 /// use csv_async::{AsyncDeserializer, Position};
850 ///
851 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
852 /// struct Row {
853 /// city: String,
854 /// country: String,
855 /// popcount: u64,
856 /// }
857 ///
858 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
859 /// async fn example() -> Result<(), Box<dyn Error>> {
860 /// let data = "\
861 /// city,country,popcount
862 /// Boston,United States,4628910
863 /// Concord,United States,42695
864 /// ";
865 /// let mut rdr = AsyncDeserializer::from_reader(io::Cursor::new(data));
866 /// let mut iter = rdr.deserialize_with_pos::<Row>();
867 /// let mut pos_at_boston = Position::new();
868 /// while let Some((rec, pos)) = iter.next().await {
869 /// if rec?.city == "Boston" {
870 /// pos_at_boston = pos;
871 /// }
872 /// }
873 /// drop(iter); // releases rdr borrow by iter
874 /// let pos_at_end = rdr.position();
875 ///
876 /// assert_eq!(pos_at_boston.byte(), 22);
877 /// assert_eq!(pos_at_boston.line(), 2);
878 /// assert_eq!(pos_at_boston.record(), 1);
879 /// assert_eq!(pos_at_end.byte(), 79);
880 /// assert_eq!(pos_at_end.line(), 4);
881 /// assert_eq!(pos_at_end.record(), 3);
882 /// Ok(())
883 /// }
884 /// ```
885 #[inline]
886 pub fn position(&self) -> &Position {
887 self.0.position()
888 }
889
890 /// Returns true if and only if this reader has been exhausted.
891 ///
892 /// When this returns true, no more records can be read from this reader
893 /// (unless it has been seek to another position).
894 ///
895 /// # Example
896 ///
897 /// ```
898 /// use std::error::Error;
899 /// use futures::io;
900 /// use futures::stream::StreamExt;
901 /// use serde::Deserialize;
902 /// use csv_async::{AsyncDeserializer, Position};
903 ///
904 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
905 /// struct Row {
906 /// city: String,
907 /// country: String,
908 /// popcount: u64,
909 /// }
910 ///
911 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
912 /// async fn example() -> Result<(), Box<dyn Error>> {
913 /// let data = "\
914 /// city,country,popcount
915 /// Boston,United States,4628910
916 /// Concord,United States,42695
917 /// ";
918 /// let mut rdr = AsyncDeserializer::from_reader(io::Cursor::new(data));
919 /// assert!(!rdr.is_done());
920 /// {
921 /// let mut records = rdr.deserialize::<Row>();
922 /// while let Some(record) = records.next().await {
923 /// let _ = record?;
924 /// }
925 /// }
926 /// assert!(rdr.is_done());
927 /// Ok(())
928 /// }
929 /// ```
930 #[inline]
931 pub fn is_done(&self) -> bool {
932 self.0.is_done()
933 }
934
935 /// Returns true if and only if this reader has been configured to
936 /// interpret the first record as a header record.
937 #[inline]
938 pub fn has_headers(&self) -> bool {
939 self.0.has_headers()
940 }
941
942 /// Returns a reference to the underlying reader.
943 #[inline]
944 pub fn get_ref(&self) -> &R {
945 self.0.get_ref()
946 }
947
948 /// Returns a mutable reference to the underlying reader.
949 #[inline]
950 pub fn get_mut(&mut self) -> &mut R {
951 self.0.get_mut()
952 }
953
954 /// Unwraps this CSV reader, returning the underlying reader.
955 ///
956 /// Note that any leftover data inside this reader's internal buffer is
957 /// lost.
958 #[inline]
959 pub fn into_inner(self) -> R {
960 self.0.into_inner()
961 }
962}
963
964impl<R: io::AsyncRead + io::AsyncSeek + std::marker::Unpin> AsyncDeserializer<R> {
965 /// Seeks the underlying reader to the position given.
966 ///
967 /// This comes with a few caveats:
968 ///
969 /// * Any internal buffer associated with this reader is cleared.
970 /// * If the given position does not correspond to a position immediately
971 /// before the start of a record, then the behavior of this reader is
972 /// unspecified.
973 /// * Any special logic that skips the first record in the CSV reader
974 /// when reading or iterating over records is disabled.
975 ///
976 /// If the given position has a byte offset equivalent to the current
977 /// position, then no seeking is performed.
978 ///
979 /// If the header row has not already been read, then this will attempt
980 /// to read the header row before seeking. Therefore, it is possible that
981 /// this returns an error associated with reading CSV data.
982 ///
983 /// Note that seeking is performed based only on the byte offset in the
984 /// given position. Namely, the record or line numbers in the position may
985 /// be incorrect, but this will cause any future position generated by
986 /// this CSV reader to be similarly incorrect.
987 ///
988 /// # Example: seek to parse a record twice
989 ///
990 /// ```
991 /// use std::error::Error;
992 /// use futures::io;
993 /// use futures::stream::StreamExt;
994 /// use serde::Deserialize;
995 /// use csv_async::{AsyncDeserializer, Position};
996 ///
997 /// #[derive(Debug, Deserialize, Eq, PartialEq)]
998 /// struct Row {
999 /// city: String,
1000 /// country: String,
1001 /// popcount: u64,
1002 /// }
1003 ///
1004 /// # fn main() { async_std::task::block_on(async {example().await.unwrap()}); }
1005 /// async fn example() -> Result<(), Box<dyn Error>> {
1006 /// let data = "\
1007 /// city,country,popcount
1008 /// Boston,United States,4628910
1009 /// Concord,United States,42695
1010 /// ";
1011 /// let mut rdr = AsyncDeserializer::from_reader(io::Cursor::new(data));
1012 /// let mut records = rdr.deserialize_with_pos::<Row>();
1013 /// let mut pos_at_boston = Position::new();
1014 /// while let Some((rec, pos)) = records.next().await {
1015 /// if rec?.city == "Boston" {
1016 /// pos_at_boston = pos;
1017 /// // no break here - we are reading data to end
1018 /// }
1019 /// }
1020 /// drop(records); // releases rdr borrowed by records
1021 ///
1022 /// // Now seek the reader back to `pos_at_boston`. This will let us read the
1023 /// // Boston's record again.
1024 /// rdr.seek(pos_at_boston).await?;
1025 /// let mut records = rdr.into_deserialize::<Row>();
1026 /// if let Some(rec) = records.next().await {
1027 /// assert_eq!(rec?.city, "Boston");
1028 /// Ok(())
1029 /// } else {
1030 /// Err(From::from("After seek we should be before last record, but was at end of stream"))
1031 /// }
1032 /// }
1033 /// ```
1034 #[inline]
1035 pub async fn seek(&mut self, pos: Position) -> Result<()> {
1036 self.0.seek(pos).await
1037 }
1038
1039 /// This is like `seek`, but provides direct control over how the seeking
1040 /// operation is performed via `io::SeekFrom`.
1041 ///
1042 /// The `pos` position given *should* correspond the position indicated
1043 /// by `seek_from`, but there is no requirement. If the `pos` position
1044 /// given is incorrect, then the position information returned by this
1045 /// reader will be similarly incorrect.
1046 ///
1047 /// If the header row has not already been read, then this will attempt
1048 /// to read the header row before seeking. Therefore, it is possible that
1049 /// this returns an error associated with reading CSV data.
1050 ///
1051 /// Unlike `seek`, this will always cause an actual seek to be performed.
1052 #[inline]
1053 pub async fn seek_raw(
1054 &mut self,
1055 seek_from: io::SeekFrom,
1056 pos: Position,
1057 ) -> Result<()> {
1058 self.0.seek_raw(seek_from, pos).await
1059 }
1060}
1061
1062
1063#[cfg(test)]
1064mod tests {
1065 use std::pin::Pin;
1066 use std::task::{Context, Poll};
1067
1068 use futures::io;
1069 use futures::stream::StreamExt;
1070 use serde::Deserialize;
1071 use async_std::task;
1072
1073 use crate::byte_record::ByteRecord;
1074 use crate::error::ErrorKind;
1075 use crate::string_record::StringRecord;
1076 use crate::Trim;
1077
1078 use super::{Position, AsyncReaderBuilder, AsyncDeserializer};
1079
1080 fn b(s: &str) -> &[u8] {
1081 s.as_bytes()
1082 }
1083 fn s(b: &[u8]) -> &str {
1084 ::std::str::from_utf8(b).unwrap()
1085 }
1086
1087 fn newpos(byte: u64, line: u64, record: u64) -> Position {
1088 let mut p = Position::new();
1089 p.set_byte(byte).set_line(line).set_record(record);
1090 p
1091 }
1092
1093 async fn count(stream: impl StreamExt) -> usize {
1094 stream.fold(0, |acc, _| async move { acc + 1 }).await
1095 }
1096
1097 #[async_std::test]
1098 async fn read_byte_record() {
1099 let data = b("foo,\"b,ar\",baz\nabc,mno,xyz");
1100 let mut rdr =
1101 AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1102 let mut rec = ByteRecord::new();
1103
1104 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1105 assert_eq!(3, rec.len());
1106 assert_eq!("foo", s(&rec[0]));
1107 assert_eq!("b,ar", s(&rec[1]));
1108 assert_eq!("baz", s(&rec[2]));
1109
1110 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1111 assert_eq!(3, rec.len());
1112 assert_eq!("abc", s(&rec[0]));
1113 assert_eq!("mno", s(&rec[1]));
1114 assert_eq!("xyz", s(&rec[2]));
1115
1116 assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1117 }
1118
1119 #[async_std::test]
1120 async fn read_trimmed_records_and_headers() {
1121 let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
1122 let mut rdr = AsyncReaderBuilder::new()
1123 .has_headers(true)
1124 .trim(Trim::All)
1125 .create_deserializer(data);
1126 let mut rec = ByteRecord::new();
1127 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1128 assert_eq!("1", s(&rec[0]));
1129 assert_eq!("2", s(&rec[1]));
1130 assert_eq!("3", s(&rec[2]));
1131 let mut rec = StringRecord::new();
1132 assert!(rdr.read_record(&mut rec).await.unwrap());
1133 assert_eq!("1", &rec[0]);
1134 assert_eq!("", &rec[1]);
1135 assert_eq!("3", &rec[2]);
1136 {
1137 let headers = rdr.headers().await.unwrap();
1138 assert_eq!(3, headers.len());
1139 assert_eq!("foo", &headers[0]);
1140 assert_eq!("bar", &headers[1]);
1141 assert_eq!("baz", &headers[2]);
1142 }
1143 }
1144
1145 #[async_std::test]
1146 async fn read_trimmed_header() {
1147 let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
1148 let mut rdr = AsyncReaderBuilder::new()
1149 .has_headers(true)
1150 .trim(Trim::Headers)
1151 .create_deserializer(data);
1152 let mut rec = ByteRecord::new();
1153 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1154 assert_eq!(" 1", s(&rec[0]));
1155 assert_eq!(" 2", s(&rec[1]));
1156 assert_eq!(" 3", s(&rec[2]));
1157 {
1158 let headers = rdr.headers().await.unwrap();
1159 assert_eq!(3, headers.len());
1160 assert_eq!("foo", &headers[0]);
1161 assert_eq!("bar", &headers[1]);
1162 assert_eq!("baz", &headers[2]);
1163 }
1164 }
1165
1166 #[async_std::test]
1167 async fn read_trimed_header_invalid_utf8() {
1168 let data = &b"foo, b\xFFar,\tbaz\na,b,c\nd,e,f"[..];
1169 let mut rdr = AsyncReaderBuilder::new()
1170 .has_headers(true)
1171 .trim(Trim::Headers)
1172 .create_deserializer(data);
1173 let mut rec = StringRecord::new();
1174
1175 // force the headers to be read
1176 let _ = rdr.read_record(&mut rec).await;
1177 // Check the byte headers are trimmed
1178 {
1179 let headers = rdr.byte_headers().await.unwrap();
1180 assert_eq!(3, headers.len());
1181 assert_eq!(b"foo", &headers[0]);
1182 assert_eq!(b"b\xFFar", &headers[1]);
1183 assert_eq!(b"baz", &headers[2]);
1184 }
1185 match *rdr.headers().await.unwrap_err().kind() {
1186 ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
1187 assert_eq!(pos, &newpos(0, 1, 0));
1188 assert_eq!(err.field(), 1);
1189 assert_eq!(err.valid_up_to(), 3);
1190 }
1191 ref err => panic!("match failed, got {:?}", err),
1192 }
1193 }
1194
1195 #[async_std::test]
1196 async fn read_trimmed_records() {
1197 let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
1198 let mut rdr = AsyncReaderBuilder::new()
1199 .has_headers(true)
1200 .trim(Trim::Fields)
1201 .create_deserializer(data);
1202 let mut rec = ByteRecord::new();
1203 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1204 assert_eq!("1", s(&rec[0]));
1205 assert_eq!("2", s(&rec[1]));
1206 assert_eq!("3", s(&rec[2]));
1207 {
1208 let headers = rdr.headers().await.unwrap();
1209 assert_eq!(3, headers.len());
1210 assert_eq!("foo", &headers[0]);
1211 assert_eq!(" bar", &headers[1]);
1212 assert_eq!("\tbaz", &headers[2]);
1213 }
1214 }
1215
1216 #[async_std::test]
1217 async fn read_record_unequal_fails() {
1218 let data = b("foo\nbar,baz");
1219 let mut rdr =
1220 AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1221 let mut rec = ByteRecord::new();
1222
1223 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1224 assert_eq!(1, rec.len());
1225 assert_eq!("foo", s(&rec[0]));
1226
1227 match rdr.read_byte_record(&mut rec).await {
1228 Err(err) => match *err.kind() {
1229 ErrorKind::UnequalLengths {
1230 expected_len: 1,
1231 ref pos,
1232 len: 2,
1233 } => {
1234 assert_eq!(pos, &Some(newpos(4, 2, 1)));
1235 }
1236 ref wrong => panic!("match failed, got {:?}", wrong),
1237 },
1238 wrong => panic!("match failed, got {:?}", wrong),
1239 }
1240 }
1241
1242 #[async_std::test]
1243 async fn read_record_unequal_ok() {
1244 let data = b("foo\nbar,baz");
1245 let mut rdr = AsyncReaderBuilder::new()
1246 .has_headers(false)
1247 .flexible(true)
1248 .create_deserializer(data);
1249 let mut rec = ByteRecord::new();
1250
1251 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1252 assert_eq!(1, rec.len());
1253 assert_eq!("foo", s(&rec[0]));
1254
1255 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1256 assert_eq!(2, rec.len());
1257 assert_eq!("bar", s(&rec[0]));
1258 assert_eq!("baz", s(&rec[1]));
1259
1260 assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1261 }
1262
1263 // This tests that even if we get a CSV error, we can continue reading
1264 // if we want.
1265 #[async_std::test]
1266 async fn read_record_unequal_continue() {
1267 let data = b("foo\nbar,baz\nquux");
1268 let mut rdr =
1269 AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1270 let mut rec = ByteRecord::new();
1271
1272 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1273 assert_eq!(1, rec.len());
1274 assert_eq!("foo", s(&rec[0]));
1275
1276 match rdr.read_byte_record(&mut rec).await {
1277 Err(err) => match err.kind() {
1278 &ErrorKind::UnequalLengths {
1279 expected_len: 1,
1280 ref pos,
1281 len: 2,
1282 } => {
1283 assert_eq!(pos, &Some(newpos(4, 2, 1)));
1284 }
1285 wrong => panic!("match failed, got {:?}", wrong),
1286 },
1287 wrong => panic!("match failed, got {:?}", wrong),
1288 }
1289
1290 assert!(rdr.read_byte_record(&mut rec).await.unwrap());
1291 assert_eq!(1, rec.len());
1292 assert_eq!("quux", s(&rec[0]));
1293
1294 assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
1295 }
1296
1297 #[async_std::test]
1298 async fn read_record_headers() {
1299 let data = b("foo,bar,baz\na,b,c\nd,e,f");
1300 let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_deserializer(data);
1301 let mut rec = StringRecord::new();
1302
1303 assert!(rdr.read_record(&mut rec).await.unwrap());
1304 assert_eq!(3, rec.len());
1305 assert_eq!("a", &rec[0]);
1306
1307 assert!(rdr.read_record(&mut rec).await.unwrap());
1308 assert_eq!(3, rec.len());
1309 assert_eq!("d", &rec[0]);
1310
1311 assert!(!rdr.read_record(&mut rec).await.unwrap());
1312
1313 {
1314 let headers = rdr.byte_headers().await.unwrap();
1315 assert_eq!(3, headers.len());
1316 assert_eq!(b"foo", &headers[0]);
1317 assert_eq!(b"bar", &headers[1]);
1318 assert_eq!(b"baz", &headers[2]);
1319 }
1320 {
1321 let headers = rdr.headers().await.unwrap();
1322 assert_eq!(3, headers.len());
1323 assert_eq!("foo", &headers[0]);
1324 assert_eq!("bar", &headers[1]);
1325 assert_eq!("baz", &headers[2]);
1326 }
1327 }
1328
1329 #[async_std::test]
1330 async fn read_record_headers_invalid_utf8() {
1331 let data = &b"foo,b\xFFar,baz\na,b,c\nd,e,f"[..];
1332 let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_deserializer(data);
1333 let mut rec = StringRecord::new();
1334
1335 assert!(rdr.read_record(&mut rec).await.unwrap());
1336 assert_eq!(3, rec.len());
1337 assert_eq!("a", &rec[0]);
1338
1339 assert!(rdr.read_record(&mut rec).await.unwrap());
1340 assert_eq!(3, rec.len());
1341 assert_eq!("d", &rec[0]);
1342
1343 assert!(!rdr.read_record(&mut rec).await.unwrap());
1344
1345 // Check that we can read the headers as raw bytes, but that
1346 // if we read them as strings, we get an appropriate UTF-8 error.
1347 {
1348 let headers = rdr.byte_headers().await.unwrap();
1349 assert_eq!(3, headers.len());
1350 assert_eq!(b"foo", &headers[0]);
1351 assert_eq!(b"b\xFFar", &headers[1]);
1352 assert_eq!(b"baz", &headers[2]);
1353 }
1354 match *rdr.headers().await.unwrap_err().kind() {
1355 ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
1356 assert_eq!(pos, &newpos(0, 1, 0));
1357 assert_eq!(err.field(), 1);
1358 assert_eq!(err.valid_up_to(), 1);
1359 }
1360 ref err => panic!("match failed, got {:?}", err),
1361 }
1362 }
1363
1364 #[async_std::test]
1365 async fn read_record_no_headers_before() {
1366 let data = b("foo,bar,baz\na,b,c\nd,e,f");
1367 let mut rdr =
1368 AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1369 let mut rec = StringRecord::new();
1370
1371 {
1372 let headers = rdr.headers().await.unwrap();
1373 assert_eq!(3, headers.len());
1374 assert_eq!("foo", &headers[0]);
1375 assert_eq!("bar", &headers[1]);
1376 assert_eq!("baz", &headers[2]);
1377 }
1378
1379 assert!(rdr.read_record(&mut rec).await.unwrap());
1380 assert_eq!(3, rec.len());
1381 assert_eq!("foo", &rec[0]);
1382
1383 assert!(rdr.read_record(&mut rec).await.unwrap());
1384 assert_eq!(3, rec.len());
1385 assert_eq!("a", &rec[0]);
1386
1387 assert!(rdr.read_record(&mut rec).await.unwrap());
1388 assert_eq!(3, rec.len());
1389 assert_eq!("d", &rec[0]);
1390
1391 assert!(!rdr.read_record(&mut rec).await.unwrap());
1392 }
1393
1394 #[async_std::test]
1395 async fn read_record_no_headers_after() {
1396 let data = b("foo,bar,baz\na,b,c\nd,e,f");
1397 let mut rdr =
1398 AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
1399 let mut rec = StringRecord::new();
1400
1401 assert!(rdr.read_record(&mut rec).await.unwrap());
1402 assert_eq!(3, rec.len());
1403 assert_eq!("foo", &rec[0]);
1404
1405 assert!(rdr.read_record(&mut rec).await.unwrap());
1406 assert_eq!(3, rec.len());
1407 assert_eq!("a", &rec[0]);
1408
1409 assert!(rdr.read_record(&mut rec).await.unwrap());
1410 assert_eq!(3, rec.len());
1411 assert_eq!("d", &rec[0]);
1412
1413 assert!(!rdr.read_record(&mut rec).await.unwrap());
1414
1415 let headers = rdr.headers().await.unwrap();
1416 assert_eq!(3, headers.len());
1417 assert_eq!("foo", &headers[0]);
1418 assert_eq!("bar", &headers[1]);
1419 assert_eq!("baz", &headers[2]);
1420 }
1421
1422 #[async_std::test]
1423 async fn seek() {
1424 let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1425 let mut rdr = AsyncReaderBuilder::new().create_deserializer(io::Cursor::new(data));
1426 rdr.seek(newpos(18, 3, 2)).await.unwrap();
1427
1428 let mut rec = StringRecord::new();
1429
1430 assert_eq!(18, rdr.position().byte());
1431 assert!(rdr.read_record(&mut rec).await.unwrap());
1432 assert_eq!(3, rec.len());
1433 assert_eq!("d", &rec[0]);
1434
1435 assert_eq!(24, rdr.position().byte());
1436 assert_eq!(4, rdr.position().line());
1437 assert_eq!(3, rdr.position().record());
1438 assert!(rdr.read_record(&mut rec).await.unwrap());
1439 assert_eq!(3, rec.len());
1440 assert_eq!("g", &rec[0]);
1441
1442 assert!(!rdr.read_record(&mut rec).await.unwrap());
1443 }
1444
1445 // Test that we can read headers after seeking even if the headers weren't
1446 // explicit read before seeking.
1447 #[async_std::test]
1448 async fn seek_headers_after() {
1449 let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1450 let mut rdr = AsyncReaderBuilder::new().create_deserializer(io::Cursor::new(data));
1451 rdr.seek(newpos(18, 3, 2)).await.unwrap();
1452 assert_eq!(rdr.headers().await.unwrap(), vec!["foo", "bar", "baz"]);
1453 }
1454
1455 // Test that we can read headers after seeking if the headers were read
1456 // before seeking.
1457 #[async_std::test]
1458 async fn seek_headers_before_after() {
1459 let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1460 let mut rdr = AsyncReaderBuilder::new().create_deserializer(io::Cursor::new(data));
1461 let headers = rdr.headers().await.unwrap().clone();
1462 rdr.seek(newpos(18, 3, 2)).await.unwrap();
1463 assert_eq!(&headers, rdr.headers().await.unwrap());
1464 }
1465
1466 // Test that even if we didn't read headers before seeking, if we seek to
1467 // the current byte offset, then no seeking is done and therefore we can
1468 // still read headers after seeking.
1469 #[async_std::test]
1470 async fn seek_headers_no_actual_seek() {
1471 let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
1472 let mut rdr = AsyncReaderBuilder::new().create_deserializer(io::Cursor::new(data));
1473 rdr.seek(Position::new()).await.unwrap();
1474 assert_eq!("foo", &rdr.headers().await.unwrap()[0]);
1475 }
1476
1477 #[derive(Debug, Deserialize, Eq, PartialEq)]
1478 struct Row1([String; 3]);
1479
1480 // Test that position info is reported correctly in absence of headers.
1481 #[async_std::test]
1482 async fn positions_no_headers() {
1483 let mut rdr = AsyncReaderBuilder::new()
1484 .has_headers(false)
1485 .create_deserializer("a,b,c\nx,y,z".as_bytes())
1486 .into_deserialize_with_pos::<Row1>();
1487
1488 let (_, pos) = rdr.next().await.unwrap();
1489 assert_eq!(pos.byte(), 0);
1490 assert_eq!(pos.line(), 1);
1491 assert_eq!(pos.record(), 0);
1492
1493 let (_, pos) = rdr.next().await.unwrap();
1494 assert_eq!(pos.byte(), 6);
1495 assert_eq!(pos.line(), 2);
1496 assert_eq!(pos.record(), 1);
1497
1498 // Test that we are at end of stream, and properly signal this.
1499 assert!(rdr.next().await.is_none());
1500 // Testing that we are not panic, trying to pass over end of stream (Issue#22)
1501 assert!(rdr.next().await.is_none());
1502 }
1503
1504 // Test that position info is reported correctly with headers.
1505 #[async_std::test]
1506 async fn positions_headers() {
1507 let mut rdr = AsyncReaderBuilder::new()
1508 .has_headers(true)
1509 .create_deserializer("a,b,c\nx,y,z".as_bytes())
1510 .into_deserialize_with_pos::<Row1>();
1511
1512 let (_, pos) = rdr.next().await.unwrap();
1513 assert_eq!(pos.byte(), 6);
1514 assert_eq!(pos.line(), 2);
1515 // We could not count header as record, but we keep compatibility with 'csv' crate.
1516 assert_eq!(pos.record(), 1);
1517 }
1518
1519 // Test that reading headers on empty data yields an empty record.
1520 #[async_std::test]
1521 async fn headers_on_empty_data() {
1522 let mut rdr = AsyncReaderBuilder::new().create_deserializer("".as_bytes());
1523 let r = rdr.byte_headers().await.unwrap();
1524 assert_eq!(r.len(), 0);
1525 }
1526
1527 // Test that reading the first record on empty data works.
1528 #[async_std::test]
1529 async fn no_headers_on_empty_data() {
1530 let mut rdr =
1531 AsyncReaderBuilder::new().has_headers(false).create_deserializer("".as_bytes());
1532 assert_eq!(count(rdr.deserialize::<Row1>()).await, 0);
1533 }
1534
1535 // Test that reading the first record on empty data works, even if
1536 // we've tried to read headers before hand.
1537 #[async_std::test]
1538 async fn no_headers_on_empty_data_after_headers() {
1539 let mut rdr =
1540 AsyncReaderBuilder::new().has_headers(false).create_deserializer("".as_bytes());
1541 assert_eq!(rdr.headers().await.unwrap().len(), 0);
1542 assert_eq!(count(rdr.deserialize::<Row1>()).await, 0);
1543 }
1544
1545 #[test]
1546 fn behavior_on_io_errors() {
1547 struct FailingRead;
1548 impl io::AsyncRead for FailingRead {
1549 fn poll_read(
1550 self: Pin<&mut Self>,
1551 _cx: &mut Context,
1552 _buf: &mut [u8]
1553 ) -> Poll<Result<usize, io::Error>> {
1554 Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Broken reader")))
1555 }
1556 }
1557 impl std::marker::Unpin for FailingRead {}
1558
1559 #[derive(Deserialize)]
1560 struct Fake;
1561
1562 task::block_on(async {
1563 let mut records = AsyncDeserializer::from_reader(FailingRead).into_deserialize::<Fake>();
1564 let first_record = records.next().await;
1565 assert!(
1566 matches!(&first_record, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1567 );
1568 assert!(records.next().await.is_none());
1569 });
1570
1571 task::block_on(async {
1572 let mut records = AsyncReaderBuilder::new()
1573 .end_on_io_error(false)
1574 .create_deserializer(FailingRead)
1575 .into_deserialize::<Fake>();
1576 let first_record = records.next().await;
1577 assert!(
1578 matches!(&first_record, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1579 );
1580 let second_record = records.next().await;
1581 assert!(
1582 matches!(&second_record, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
1583 );
1584 });
1585 }
1586}