// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // (Doc section: Dataset Example) // (Doc section: Includes) #include #include #include // We use Parquet headers for setting up examples; they are not required for using // datasets. #include #include #include #include // (Doc section: Includes) // (Doc section: Helper Functions) // Generate some data for the rest of this example. arrow::Result> CreateTable() { // This code should look familiar from the basic Arrow example, and is not the // focus of this example. However, we need data to work on it, and this makes that! auto schema = arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()), arrow::field("c", arrow::int64())}); std::shared_ptr array_a; std::shared_ptr array_b; std::shared_ptr array_c; arrow::NumericBuilder builder; ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); ARROW_RETURN_NOT_OK(builder.Finish(&array_a)); builder.Reset(); ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0})); ARROW_RETURN_NOT_OK(builder.Finish(&array_b)); builder.Reset(); ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2})); ARROW_RETURN_NOT_OK(builder.Finish(&array_c)); return arrow::Table::Make(schema, {array_a, array_b, array_c}); } // Set up a dataset by writing two Parquet files. arrow::Result CreateExampleParquetDataset( const std::shared_ptr& filesystem, const std::string& root_path) { // Much like CreateTable(), this is utility that gets us the dataset we'll be reading // from. Don't worry, we also write a dataset in the example proper. auto base_path = root_path + "parquet_dataset"; ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path)); // Create an Arrow Table ARROW_ASSIGN_OR_RAISE(auto table, CreateTable()); // Write it into two Parquet files ARROW_ASSIGN_OR_RAISE(auto output, filesystem->OpenOutputStream(base_path + "/data1.parquet")); ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( *table->Slice(0, 5), arrow::default_memory_pool(), output, 2048)); ARROW_ASSIGN_OR_RAISE(output, filesystem->OpenOutputStream(base_path + "/data2.parquet")); ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( *table->Slice(5), arrow::default_memory_pool(), output, 2048)); return base_path; } arrow::Status PrepareEnv() { // Initilize the compute module to register the required kernels for Dataset ARROW_RETURN_NOT_OK(arrow::compute::Initialize()); // Get our environment prepared for reading, by setting up some quick writing. ARROW_ASSIGN_OR_RAISE(auto src_table, CreateTable()) std::shared_ptr setup_fs; // Note this operates in the directory the executable is built in. char setup_path[256]; char* result = getcwd(setup_path, 256); if (result == NULL) { return arrow::Status::IOError("Fetching PWD failed."); } ARROW_ASSIGN_OR_RAISE(setup_fs, arrow::fs::FileSystemFromUriOrPath(setup_path)); ARROW_ASSIGN_OR_RAISE(auto dset_path, CreateExampleParquetDataset(setup_fs, "")); return arrow::Status::OK(); } // (Doc section: Helper Functions) // (Doc section: RunMain) arrow::Status RunMain() { // (Doc section: RunMain) // (Doc section: PrepareEnv) ARROW_RETURN_NOT_OK(PrepareEnv()); // (Doc section: PrepareEnv) // (Doc section: FileSystem Declare) // First, we need a filesystem object, which lets us interact with our local // filesystem starting at a given path. For the sake of simplicity, that'll be // the current directory. std::shared_ptr fs; // (Doc section: FileSystem Declare) // (Doc section: FileSystem Init) // Get the CWD, use it to make the FileSystem object. char init_path[256]; char* result = getcwd(init_path, 256); if (result == NULL) { return arrow::Status::IOError("Fetching PWD failed."); } ARROW_ASSIGN_OR_RAISE(fs, arrow::fs::FileSystemFromUriOrPath(init_path)); // (Doc section: FileSystem Init) // (Doc section: FileSelector Declare) // A file selector lets us actually traverse a multi-file dataset. arrow::fs::FileSelector selector; // (Doc section: FileSelector Declare) // (Doc section: FileSelector Config) selector.base_dir = "parquet_dataset"; // Recursive is a safe bet if you don't know the nesting of your dataset. selector.recursive = true; // (Doc section: FileSelector Config) // (Doc section: FileSystemFactoryOptions) // Making an options object lets us configure our dataset reading. arrow::dataset::FileSystemFactoryOptions options; // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition // schema. We won't set any other options, defaults are fine. options.partitioning = arrow::dataset::HivePartitioning::MakeFactory(); // (Doc section: FileSystemFactoryOptions) // (Doc section: File Format Setup) auto read_format = std::make_shared(); // (Doc section: File Format Setup) // (Doc section: FileSystemDatasetFactory Make) // Now, we get a factory that will let us get our dataset -- we don't have the // dataset yet! ARROW_ASSIGN_OR_RAISE(auto factory, arrow::dataset::FileSystemDatasetFactory::Make( fs, selector, read_format, options)); // (Doc section: FileSystemDatasetFactory Make) // (Doc section: FileSystemDatasetFactory Finish) // Now we build our dataset from the factory. ARROW_ASSIGN_OR_RAISE(auto read_dataset, factory->Finish()); // (Doc section: FileSystemDatasetFactory Finish) // (Doc section: Dataset Fragments) // Print out the fragments ARROW_ASSIGN_OR_RAISE(auto fragments, read_dataset->GetFragments()); for (const auto& fragment : fragments) { std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl; std::cout << "Partition expression: " << (*fragment)->partition_expression().ToString() << std::endl; } // (Doc section: Dataset Fragments) // (Doc section: Read Scan Builder) // Scan dataset into a Table -- once this is done, you can do // normal table things with it, like computation and printing. However, now you're // also dedicated to being in memory. ARROW_ASSIGN_OR_RAISE(auto read_scan_builder, read_dataset->NewScan()); // (Doc section: Read Scan Builder) // (Doc section: Read Scanner) ARROW_ASSIGN_OR_RAISE(auto read_scanner, read_scan_builder->Finish()); // (Doc section: Read Scanner) // (Doc section: To Table) ARROW_ASSIGN_OR_RAISE(std::shared_ptr table, read_scanner->ToTable()); std::cout << table->ToString(); // (Doc section: To Table) // (Doc section: TableBatchReader) // Now, let's get a table out to disk as a dataset! // We make a RecordBatchReader from our Table, then set up a scanner, which lets us // go to a file. std::shared_ptr write_dataset = std::make_shared(table); // (Doc section: TableBatchReader) // (Doc section: WriteScanner) auto write_scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(write_dataset); ARROW_ASSIGN_OR_RAISE(auto write_scanner, write_scanner_builder->Finish()) // (Doc section: WriteScanner) // (Doc section: Partition Schema) // The partition schema determines which fields are used as keys for partitioning. auto partition_schema = arrow::schema({arrow::field("a", arrow::utf8())}); // (Doc section: Partition Schema) // (Doc section: Partition Create) // We'll use Hive-style partitioning, which creates directories with "key=value" // pairs. auto partitioning = std::make_shared(partition_schema); // (Doc section: Partition Create) // (Doc section: Write Format) // Now, we declare we'll be writing Parquet files. auto write_format = std::make_shared(); // (Doc section: Write Format) // (Doc section: Write Options) // This time, we make Options for writing, but do much more configuration. arrow::dataset::FileSystemDatasetWriteOptions write_options; // Defaults to start. write_options.file_write_options = write_format->DefaultWriteOptions(); // (Doc section: Write Options) // (Doc section: Options FS) // Use the filesystem we already have. write_options.filesystem = fs; // (Doc section: Options FS) // (Doc section: Options Target) // Write to the folder "write_dataset" in current directory. write_options.base_dir = "write_dataset"; // (Doc section: Options Target) // (Doc section: Options Partitioning) // Use the partitioning declared above. write_options.partitioning = partitioning; // (Doc section: Options Partitioning) // (Doc section: Options Name Template) // Define what the name for the files making up the dataset will be. write_options.basename_template = "part{i}.parquet"; // (Doc section: Options Name Template) // (Doc section: Options File Behavior) // Set behavior to overwrite existing data -- specifically, this lets this example // be run more than once, and allows whatever code you have to overwrite what's there. write_options.existing_data_behavior = arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore; // (Doc section: Options File Behavior) // (Doc section: Write Dataset) // Write to disk! ARROW_RETURN_NOT_OK( arrow::dataset::FileSystemDataset::Write(write_options, write_scanner)); // (Doc section: Write Dataset) // (Doc section: Ret) return arrow::Status::OK(); } // (Doc section: Ret) // (Doc section: Main) int main() { arrow::Status st = RunMain(); if (!st.ok()) { std::cerr << st << std::endl; return 1; } return 0; } // (Doc section: Main) // (Doc section: Dataset Example)