/plushcap/analysis/datastax/datastax-datastax-bulk-loader-introduction-and-loading

DataStax Bulk Loader Pt. 1 — Introduction and Loading

What's this blog post about?

In this tutorial, we will continue to explore the DataStax Bulk Loader (DSBulk) by discussing how to load and unload data from a CSV file into Cassandra using DSBulk. We will cover various aspects of loading and unloading data, such as handling errors, dealing with missing or extra fields, ignoring leading whitespaces, specifying the insert statement, deleting data, and more. First, let's create a new table in our keyspace called "iris_with_id" that includes an additional column for the id: ``` CREATE TABLE dsbulkblog.iris_with_id ( id INT PRIMARY KEY, sepal_length FLOAT, sepal_width FLOAT, petal_length FLOAT, petal_width FLOAT, species TEXT ); ``` Now we can load data from a CSV file into this table using DSBulk. We will use the same iris.csv file as before: ``` $ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -header true -m "0=id,1=sepal_length,2=sepal_width,3=petal_length,4=petal_width,5=species" ``` We can see that DSBulk is trying to convert Iris-setosa into a number. This is because it believes that this column is the id column, which is an INT. If we look through the file we see the same error over and over again. We’ve clearly messed up the mapping. After 100 errors (that is the default, but it can be overridden by setting -maxErrors) DSBulk quits trying to load. Whenever DSBulk encounters an input line that it cannot parse, it will add that line to the mapping.bad file. This is the input line as it was seen by DSBulk on ingest. If a line or two got garbled or had different format than other lines, DSBulk will populate the mapping.bad file, log the error in mapping-errors.log, but keep going, until it reaches the maximum number of errors. This way, a few bad lines don’t mess up the whole load, and the user can address the few bad lines, either manually inserting them or even running DSBulk on the mapping.bad file with different arguments. For example, to load these bad lines we could load with: ``` $ dsbulk load -url /tmp/logs/LOAD_20190314-162539-255317/mapping.bad -k dsbulkblog -t iris_with_id -header false -m "0=sepal_length,1=sepal_width,2=petal_length,3=petal_width,4=species, 5=id" ``` Sometimes we are only loading some of the columns. When the table has more columns than the input, DSBulk will by default throw an error. Now, while it is necessary that all primary key columns be specified, it is allowable to leave other columns undefined or unset. For example, let’s say we didn’t have the sepal_length column defined. We could mimic this with awk, such as: ``` $ cat /tmp/dsbulkblog/iris.csv | awk -F, '{printf("%s,%s,%s,%s,%s\n", $2, $3, $4, $5, $6)}' | dsbulk load -k dsbulkblog -t iris_with_id ``` This will result in an error because we have not provided the sepal_length column: ``` Operation directory: /tmp/logs/LOAD_20190314-163121-694645 At least 1 record does not match the provided schema.mapping or schema.query. Please check that the connector configuration and the schema configuration are correct. Operation LOAD_20190314-163121-694645 aborted: Too many errors, the maximum allowed is 100. total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches 101 |    101 |  0 | 0.00 |  0.00 | 0.00 |  0.00 | 0.00 |   0.00 Rejected records can be found in the following file(s): mapping.bad Errors are detailed in the following file(s): mapping-errors.log Last processed positions can be found in positions.txt ``` To address this, we can add the... --schema.allowMissingFields: ``` $ cat /tmp/dsbulkblog/iris.csv | awk -F, '{printf("%s,%s,%s,%s,%s\n", $2, $3, $4, $5, $6)}' | dsbulk load -k dsbulkblog -t iris_with_id --schema.allowMissingFields true ``` Similarly, we could have a situation where the input file has extra columns. ``` $ cat /tmp/dsbulkblog/iris.csv | awk -F, '{printf("%s,%s,%s,%s,%s,%s,extra\n", $1, $2, $3, $4, $5, $6)}' | dsbulk load -k dsbulkblog -t iris_with_id ``` This will load just fine, and the extra column will be ignored. However, if we wish to be strict about the inputs, we could cause DSBulk to error if there are extra fields using: ``` $ cat /tmp/dsbulkblog/iris.csv | awk -F, '{printf("%s,%s,%s,%s,%s,%s,extra\n", $1, $2, $3, $4, $5, $6)}' | dsbulk load -k dsbulkblog -t iris_with_id --schema.allowExtraFields false ``` This will result in an error because we have not said how to map the extra column: ``` Operation directory: /tmp/logs/LOAD_20190314-163305-346514 At least 1 record does not match the provided schema.mapping or schema.query. Please check that the connector configuration and the schema configuration are correct. Operation LOAD_20190314-163305-346514 aborted: Too many errors, the maximum allowed is 100. total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches 102 |    101 |  0 | 0.00 |  0.00 | 0.00 |  0.00 | 0.00 |   0.00 Rejected records can be found in the following file(s): mapping.bad Errors are detailed in the following file(s): mapping-errors.log Last processed positions can be found in positions.txt ``` To address this, we can add the... --schema.allowExtraFields: ``` $ cat /tmp/dsbulkblog/iris.csv | awk -F, '{printf("%s,%s,%s,%s,%s,%s,extra\n", $1, $2, $3, $4, $5, $6)}' | dsbulk load -k dsbulkblog -t iris_with_id --schema.allowExtraFields true ``` Sometimes there is whitespace at the beginning of a text string. We sometimes want this whitespace and sometimes we do not. This is controlled by the --connector.csv.ignoreLeadingWhitespaces parameter, which defaults to false (whitespaces are retained). For example: ``` $ cat /tmp/dsbulkblog/iris.csv | sed "s/Iris/  Iris/g" | dsbulk load -k dsbulkblog -t iris_with_id ``` We can check that the leading whitespaces are retained via DSBulk’s unload command (which we will discuss in a later blog post): ``` $ dsbulk unload -query "SELECT species FROM dsbulkblog.iris_with_id" | head Operation directory: /tmp/logs/UNLOAD_20190321-144112-777452 total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms 150 | 0 | 392 | 0.01 | 0.02 | 9.49 | 41.68 | 41.68 Operation UNLOAD_20190321-144112-777452 completed successfully in 0 seconds. species Iris-setosa Iris-setosa Iris-virginica Iris-versicolor Iris-virginica Iris-versicolor Iris-virginica Iris-virginica Iris-virginica ``` To strip the leading whitespaces, we can run the load command again with the --connector.csv.ignoreLeadingWhitespaces set to true: ``` $ cat /tmp/dsbulkblog/iris.csv | sed "s/Iris/ Iris/g" | dsbulk load -k dsbulkblog -t iris_with_id --connector.csv.ignoreLeadingWhitespaces true ``` Again, we can check this with the DSBulk unload command: ``` $ dsbulk unload -query "SELECT species FROM dsbulkblog.iris_with_id" | head Operation directory: /tmp/logs/UNLOAD_20190321-144510-244786 total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms 150 |      0 | 416 | 0.01 |   0.01 | 8.16 | 36.96 |  36.96 Operation UNLOAD_20190321-144510-244786 completed successfully in 0 seconds. species Iris-setosa Iris-setosa Iris-virginica Iris-versicolor Iris-virginica Iris-versicolor Iris-virginica Iris-virginica Iris-virginica ``` We can also specify a mapping by providing the actual INSERT statement itself: ``` $ dsbulk load -url /tmp/dsbulkblog/iris.csv -query "INSERT INTO dsbulkblog.iris_with_id(id,petal_width,petal_length,sepal_width,sepal_length,species) VALUES (:id, :petal_width, :petal_length, :sepal_width, :sepal_length, 'some kind of iris')" ``` Notice that we do not need to specify the -k or -t parameters, since it is included in the query itself. We can also specify a mapping by providing the actual INSERT statement itself: ``` $ dsbulk load -url /tmp/dsbulkblog/iris_no_header.csv -query "INSERT INTO dsbulkblog.iris_with_id(petal_width,petal_length,sepal_width,sepal_length,species,id) VALUES (?,?,?,?,?,?)" -header false ``` Notice that I needed to move the id column to the end of the list. This is because the order here matters and must match the order in the input data. In our data, the id column is last. We can also specify a mapping to place constant values into a column. For example, let’s set the species to the same value for all entries: ``` $ dsbulk load -url /tmp/dsbulkblog/iris.csv -query "INSERT INTO dsbulkblog.iris_with_id(id,petal_width,petal_length,sepal_width,sepal_length,species) VALUES (:id, :petal_width, :petal_length, :sepal_width, :sepal_length, 'some kind of iris')" ``` It may seem counterintuitive, but we can delete data using the DSBulk load command. Instead of an INSERT statement, we can issue a DELETE statement. For example, let’s delete the rows in our table that correspond to the first 10 lines (11 if you include the header) of the iris.csv file: ``` $ head -11 /tmp/dsbulkblog/iris.csv | dsbulk load -query "DELETE FROM dsbulkblog.iris_with_id WHERE id=:id" Operation directory: /tmp/logs/LOAD_20190320-180959-025572 total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches 10 |      0 | 32 | 0.00 |   0.00 | 4.59 | 5.96 |   5.96 | 1.00 Operation LOAD_20190320-180959-025572 completed successfully in 0 seconds. ``` We can check that those rows have been deleted: ``` $ cqlsh -e "SELECT * FROM dsbulkblog.iris_with_id WHERE id IN (0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15)" id | petal_length | petal_width | sepal_length | sepal_width | species ----+--------------+-------------+--------------+-------------+------------- 10 |          1.5 | 0.2 |          5.4 | 3.7 | Iris-setosa 11 |          1.6 | 0.2 |          4.8 | 3.4 | Iris-setosa 12 |          1.4 | 0.1 |          4.8 | 3 | Iris-setosa 13 |          1.1 | 0.1 |          4.3 | 3 | Iris-setosa 14 |          1.2 | 0.2 |          5.8 | 4 | Iris-setosa 15 |          1.5 | 0.4 |          5.7 | 4.4 | Iris-setosa (6 rows) Clearly, we could do all manner of other types of queries here. We could do counter updates, collection updates, and so on. To download the DataStax Bulk Loader click here. To learn additional elements for data loading, read Part 2 of the Bulk Loader series here.

Company
DataStax

Date published
March 26, 2019

Author(s)
Brian Hess

Word count
3815

Language
English

Hacker News points
None found.


By Matt Makai. 2021-2024.