Skip to content

Reduce page metadata loading to only what is necessary for query execution in ParquetOpen #16200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
adrian-thurston opened this issue May 27, 2025 · 16 comments
Assignees
Labels
enhancement New feature or request

Comments

@adrian-thurston
Copy link

Is your feature request related to a problem or challenge?

The ParquetOpen will load all page metadata for a file, on an all tasks concurrently accessing that file. This can be costly for parquet files with a large number of rows, or a large number of columns, or both.

In testing at Influx we have noticed page metadata load time taking in the order of tens of milliseconds for some customer scenarios. We have directly timed this on customer parquet files. We estimate the contribution to query time being about 83% of those times.

Some individual page metadata load times:

Write Load File Size Row Groups Columns Rows Row Group Compression Rows of Page Metadata Page Metdata Load Time Estimated Query Savings
Telegraf 110 MB 11 65 10,523,008 10.3 / 17.4 MB 67,862 9ms 6ms / 36ms
Random Datagen 283 MB 5 11 4,620,000 61.1 / 66.7 MB 5,016 0.7ms nil
Cust A 144 MB 50 26 51,521,481 2.9 / 4.5 MB 132,864 16.9ms 14.1ms / ?
Cust B 104 MB 70 19 73,158,554 1.2 / 2.7 MB 137,864 23.3ms 19.4ms / ?
Cust C 122 MB 11 199 10,530,204 10.8 / 40.3 MB 208,156 25.4ms 21.1ms / ?

Note: for the Telegraf and Random Datagen datasets we were able to measure query time savings with our prototype. For customer scenarios we can only estimate.

Describe the solution you'd like

Rather than always loading all page metadata, instead load just file metadata, prune as much as we can, then load only the page metadata needed to execute the query.

  1. Read file metadata
  2. Prune row groups by range the task is targeting (file group breakdown of the file)
  3. Prune row groups by testing predicate against row-group stats
  4. Read page metadata only for needed row-groups and columns
  5. Prune access plan using minimally loaded page metadata.

Psuedo-code looks something like this:

let metadata = ArrowReaderMetadata::load_async_no_page_metadata(&mut reader,)?;
let access_plan = create_initial_plan()?;
let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
row_groups.prune_by_range(rg_metadata, range);
row_groups.prune_by_statistics();
let rg_accessed = row_groups.rg_needed();
let cols_accessed = predicate.columns_needed();
metadata.load_async_reduced_page_metadata(&mut reader, rg_accessed, cols_accessed,)?;
access_plan = p.prune_plan_with_page_index();

In our prototype we created a sparse page-metadata array. Row-group/column indexes that we don't need were left as Index::None. Psuedo-code:

let index = metadata.row_groups().iter()
        .map(|x| {
            if self.rg_accessed.as_ref().unwrap()[x.ordinal().unwrap() as usize] {
                x.columns().iter().enumerate()
                    .map(|(index, c)| {
                        if self.col_accessed.as_ref().unwrap()[index] {
                            match c.column_index_range() {
                                Some(r) => decode_column_index()
                                None => Ok(Index::NONE),
                            }
                        } else {
                            Ok(Index::NONE)
                        }
                    })
                    .collect::<Result<Vec<_>>>()
            } else {
                x.columns().iter()
                  .map(|_| Ok(Index::NONE) )
                  .collect::<Result<Vec<_>>>()
            }
        })
        .collect::<Result<Vec<_>>>()?;

Describe alternatives you've considered

No response

Additional context

No response

@adrian-thurston adrian-thurston added the enhancement New feature or request label May 27, 2025
@alamb
Copy link
Contributor

alamb commented May 27, 2025

FYI @adriangb -- I think this is something you were interested in too

@alamb
Copy link
Contributor

alamb commented May 27, 2025

Thank you @adrian-thurston -- this is a very neat idea

For others not following along, most of benchmarks don't have page metadata at the moment (e.g. the ClickBench partitioned set doesn't have any page index metadata) so this wouldn't show up in our existing benchmarks

@adriangb
Copy link
Contributor

Yes very neat. I was actually thinking this would be along the other axis: loading metadata only for the columns that are needed. My gut feeling is that a lot of compute is spent loading metadata for columns that aren't being filtered on. But I don't know if that's possible given the structure of the row group / page metadata.

@alamb
Copy link
Contributor

alamb commented May 27, 2025

Yes very neat. I was actually thinking this would be along the other axis: loading metadata only for the columns that are needed. My gut feeling is that a lot of compute is spent loading metadata for columns that aren't being filtered on. But I don't know if that's possible given the structure of the row group / page metadata.

I think we could certainly avoid loading page metadata for columns

We would probably have to add some sort of new API to ParquetMetadataLoader

One challenge / tradeoff that would be interesting/required is that doing another async load to read more of the metdata will be very bad if that has to actually go to object store again. Influx has it all cached in memory so it doesn't matter, but in general we need to be careful of adding additional requests

@alamb
Copy link
Contributor

alamb commented May 27, 2025

Though since DataFusion knows what columns are needed for predicates (and thus would be used in pruning) we could easily disable loading the page index for the other columns 🤔

@adriangb
Copy link
Contributor

Though since DataFusion knows what columns are needed for predicates (and thus would be used in pruning) we could easily disable loading the page index for the other columns 🤔

That's exactly my thought. You know which columns you need to filter on and after the ParquetOpener the metadata is discarded anyway, so there's no reason to read any (row group or page) metadata / stats for columns you're not going to filter on.

@zhuqi-lucas
Copy link
Contributor

I'd like to take this issue and try. And feel free to reassign if i don't submit a PR for a long time.

@zhuqi-lucas
Copy link
Contributor

take

@zhuqi-lucas
Copy link
Contributor

  1. The first step which i have done the experiment to rewrite the clickbench partition to support page_index, details:

#16149 (comment)

  1. The second step is to implement the code.

  2. The third step is to show performance improvement for our new implementation.

@alamb
Copy link
Contributor

alamb commented May 29, 2025

Thank you @zhuqi-lucas

@zhuqi-lucas
Copy link
Contributor

Created a arrow-rs issue, we can implement the interface first.

@etseidl
Copy link
Contributor

etseidl commented Jun 1, 2025

One challenge / tradeoff that would be interesting/required is that doing another async load to read more of the metdata will be very bad if that has to actually go to object store again.

Yes, this has me very worried. The layout of the column index is by row group, then column. So to read just a single column requires jumping around quite a bit if there are many row groups. Also, if there is no projection involved, the entire offset index will be read as well. This will need some careful testing to see if multiple fetches are worthwhile, or if doing a single fetch with a range large enough to include all column and offset indexes needed (and then only parsing the needed indexes) would be better.

@adriangb
Copy link
Contributor

adriangb commented Jun 2, 2025

Sadly I doubt there's a correct answer. It might be the opposite for a local SSD vs object storage.

@alamb
Copy link
Contributor

alamb commented Jun 2, 2025

I feel like we may need to add some sort of policy as this same tradeoff is coming up when implementing filter_pushdown optimizations. Namely, is it important to minimize IO operations or are more IO operations ok if it reduces CPU/Memory requirements.

As @adriangb and @etseidl say, this tradeoff is quite different depending on local vs object store.

Maybe we could make some sort of ObjectStore based interface that allows the parquet reader to hint what data might be necessary (e.g. the entire range of metadata / pages before pruning) and then allow the lower level system to decide if it wanted to prefetch, buffer or just pass through the request 🤔

@alamb
Copy link
Contributor

alamb commented Jun 2, 2025

FWIW we should still be able to have a significant win by not copying the page index values into the rust structures for columns we don't need in the query (even if we had to fetch the bytes from object store and decode them in thrift)

@adriangb
Copy link
Contributor

adriangb commented Jun 2, 2025

Yeah I see two ways to go about that:

  1. We make individual wrappers for each operation that decide how to fetch things. A MetadataLoader -> ObjectStoreMetadataLoader, FileReaderFactory, etc. I think this is the direction things are headed in now.
  2. Configs/settings like Postgres' random_page_cost or some sort of target_fetch_size, etc and we have DataFusion determine what to do with that information. We could even add the ability to dynamically measure and set that so that eg DataFusion CLI can adapt itself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants