Skip to content

Commit bab3590

Browse files
committed
Include key into memory estimation
1 parent 25bbf27 commit bab3590

3 files changed

Lines changed: 15 additions & 5 deletions

File tree

datafusion/common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ indexmap = { workspace = true }
7575
itertools = { workspace = true }
7676
libc = "0.2.180"
7777
log = { workspace = true }
78-
object_store = { workspace = true, optional = true }
78+
object_store = { workspace = true, optional = true, default-features = true }
7979
parquet = { workspace = true, optional = true, default-features = true }
8080
paste = { workspace = true }
8181
recursive = { workspace = true, optional = true }

datafusion/common/src/heap_size.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use half::f16;
2929
use std::collections::HashMap;
3030
use std::fmt::Debug;
3131
use std::sync::Arc;
32+
use object_store::path::Path;
3233

3334
/// This is a temporary solution until <https://github.com/apache/datafusion/pull/19599> and
3435
/// <https://github.com/apache/arrow-rs/pull/9138> are resolved.
@@ -179,6 +180,12 @@ impl DFHeapSize for DataType {
179180
}
180181
}
181182

183+
impl DFHeapSize for Path {
184+
fn heap_size(&self) -> usize {
185+
self.as_ref().heap_size()
186+
}
187+
}
188+
182189
impl<T: DFHeapSize> DFHeapSize for Vec<T> {
183190
fn heap_size(&self) -> usize {
184191
let item_size = size_of::<T>();

datafusion/execution/src/cache/cache_unit.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,17 @@ impl DefaultFileStatisticsCacheState {
9797
key: &Path,
9898
value: CachedFileMetadata,
9999
) -> Option<CachedFileMetadata> {
100+
let key_size = key.heap_size();
100101
let entry_size = value.heap_size();
101102

102-
if entry_size > self.memory_limit {
103+
if entry_size + key_size > self.memory_limit {
103104
// Remove stale entry if exists
104105
self.remove(key);
105106
return None;
106107
}
107108

108109
let old_value = self.lru_queue.put(key.clone(), value);
109-
self.memory_used += entry_size;
110+
self.memory_used += entry_size + key_size;
110111

111112
if let Some(old_entry) = &old_value {
112113
self.memory_used -= old_entry.heap_size();
@@ -119,6 +120,7 @@ impl DefaultFileStatisticsCacheState {
119120

120121
fn remove(&mut self, k: &Path) -> Option<CachedFileMetadata> {
121122
if let Some(old_entry) = self.lru_queue.remove(k) {
123+
self.memory_used -= k.heap_size();
122124
self.memory_used -= old_entry.heap_size();
123125
Some(old_entry)
124126
} else {
@@ -142,6 +144,7 @@ impl DefaultFileStatisticsCacheState {
142144
fn evict_entries(&mut self) {
143145
while self.memory_used > self.memory_limit {
144146
if let Some(removed) = self.lru_queue.pop() {
147+
self.memory_used -= removed.0.heap_size();
145148
self.memory_used -= removed.1.heap_size();
146149
} else {
147150
// cache is empty while memory_used > memory_limit, cannot happen
@@ -547,7 +550,7 @@ mod tests {
547550
let (meta_2, value_2) = create_cached_file_metadata_with_stats("test2.parquet");
548551
let (meta_3, value_3) = create_cached_file_metadata_with_stats("test3.parquet");
549552

550-
let limit_for_2_entries = value_1.heap_size() + value_2.heap_size();
553+
let limit_for_2_entries = &meta_1.location.heap_size() + value_1.heap_size() + &meta_2.location.heap_size() + value_2.heap_size();
551554

552555
// create a cache with a limit which fits exactly 2 entries
553556
let cache = DefaultFileStatisticsCache::new(limit_for_2_entries);
@@ -579,7 +582,7 @@ mod tests {
579582

580583
cache.remove(&meta_2.location);
581584
assert_eq!(cache.len(), 1);
582-
assert_eq!(cache.memory_used(), value_3.heap_size());
585+
assert_eq!(cache.memory_used(), &meta_3.location.heap_size() + value_3.heap_size());
583586

584587
cache.clear();
585588
assert_eq!(cache.len(), 0);

0 commit comments

Comments
 (0)