Skip to content

Commit 7c1d446

Browse files
committed
Add tablescoping for file stats cache
1 parent 27e35d0 commit 7c1d446

6 files changed

Lines changed: 120 additions & 52 deletions

File tree

datafusion/catalog-listing/src/helpers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ fn try_into_partitioned_file(
359359

360360
let mut pf: PartitionedFile = object_meta.into();
361361
pf.partition_values = partition_values;
362-
362+
pf.table_reference = table_path.get_table_ref().clone();
363363
Ok(pf)
364364
}
365365

datafusion/catalog-listing/src/table.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -804,12 +804,12 @@ impl ListingTable {
804804
) -> datafusion_common::Result<(Arc<Statistics>, Option<LexOrdering>)> {
805805
use datafusion_execution::cache::cache_manager::CachedFileMetadata;
806806

807-
let path = &part_file.object_meta.location;
807+
let path = TableScopedPath { table: part_file.table_reference.clone(), path : part_file.object_meta.location.clone()};
808808
let meta = &part_file.object_meta;
809809

810810
// Check cache first - if we have valid cached statistics and ordering
811811
if let Some(cache) = &self.collected_statistics
812-
&& let Some(cached) = cache.get(path)
812+
&& let Some(cached) = cache.get(&path)
813813
&& cached.is_valid_for(meta)
814814
{
815815
// Return cached statistics and ordering
@@ -828,7 +828,7 @@ impl ListingTable {
828828
// Store in cache
829829
if let Some(cache) = &self.collected_statistics {
830830
cache.put(
831-
path,
831+
&path,
832832
CachedFileMetadata::new(
833833
meta.clone(),
834834
Arc::clone(&statistics),

datafusion/core/src/execution/context/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1427,8 +1427,10 @@ impl SessionContext {
14271427
schema.deregister_table(&table)?;
14281428
if table_type == TableType::Base
14291429
&& let Some(lfc) = self.runtime_env().cache_manager.get_list_files_cache()
1430+
&& let Some(fsc) = self.runtime_env().cache_manager.get_file_statistic_cache()
14301431
{
1431-
lfc.drop_table_entries(&Some(table_ref))?;
1432+
lfc.drop_table_entries(&Some(table_ref.clone()))?;
1433+
fsc.drop_table_entries(&Some(table_ref.clone()))?;
14321434
}
14331435
return Ok(true);
14341436
}

datafusion/datasource/src/mod.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub use self::url::ListingTableUrl;
5555
use crate::file_groups::FileGroup;
5656
use chrono::TimeZone;
5757
use datafusion_common::stats::Precision;
58-
use datafusion_common::{ColumnStatistics, Result, exec_datafusion_err};
58+
use datafusion_common::{ColumnStatistics, Result, exec_datafusion_err, TableReference};
5959
use datafusion_common::{ScalarValue, Statistics};
6060
use datafusion_physical_expr::LexOrdering;
6161
use futures::{Stream, StreamExt};
@@ -151,6 +151,7 @@ pub struct PartitionedFile {
151151
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
152152
/// The estimated size of the parquet metadata, in bytes
153153
pub metadata_size_hint: Option<usize>,
154+
pub table_reference: Option<TableReference>,
154155
}
155156

156157
impl PartitionedFile {
@@ -170,6 +171,7 @@ impl PartitionedFile {
170171
ordering: None,
171172
extensions: None,
172173
metadata_size_hint: None,
174+
table_reference: None
173175
}
174176
}
175177

@@ -183,6 +185,7 @@ impl PartitionedFile {
183185
ordering: None,
184186
extensions: None,
185187
metadata_size_hint: None,
188+
table_reference: None
186189
}
187190
}
188191

@@ -202,6 +205,7 @@ impl PartitionedFile {
202205
ordering: None,
203206
extensions: None,
204207
metadata_size_hint: None,
208+
table_reference: None
205209
}
206210
.with_range(start, end)
207211
}
@@ -213,6 +217,12 @@ impl PartitionedFile {
213217
self
214218
}
215219

220+
pub fn with_table_reference(mut self, table_reference: Option<TableReference>) -> Self {
221+
self.table_reference = table_reference;
222+
self
223+
}
224+
225+
216226
/// Size of the file to be scanned (taking into account the range, if present).
217227
pub fn effective_size(&self) -> u64 {
218228
if let Some(range) = &self.range {
@@ -339,6 +349,7 @@ impl From<ObjectMeta> for PartitionedFile {
339349
ordering: None,
340350
extensions: None,
341351
metadata_size_hint: None,
352+
table_reference: None,
342353
}
343354
}
344355
}
@@ -536,6 +547,7 @@ pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGro
536547
ordering: None,
537548
extensions: None,
538549
metadata_size_hint: None,
550+
table_reference: None,
539551
};
540552
files.push(file);
541553
}

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl CachedFileMetadata {
9595
/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
9696
///
9797
/// See [`crate::runtime_env::RuntimeEnv`] for more details
98-
pub trait FileStatisticsCache: CacheAccessor<Path, CachedFileMetadata> {
98+
pub trait FileStatisticsCache: CacheAccessor<TableScopedPath, CachedFileMetadata> {
9999
/// Cache memory limit in bytes.
100100
fn cache_limit(&self) -> usize;
101101

@@ -104,6 +104,9 @@ pub trait FileStatisticsCache: CacheAccessor<Path, CachedFileMetadata> {
104104

105105
/// Retrieves the information about the entries currently cached.
106106
fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry>;
107+
108+
fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> Result<()>;
109+
107110
}
108111

109112
impl DFHeapSize for CachedFileMetadata {

0 commit comments

Comments
 (0)