Add a memory bound FileStatisticsCache for the Listing Table#20047
Add a memory bound FileStatisticsCache for the Listing Table#20047mkleen wants to merge 40 commits intoapache:mainfrom
Conversation
a66420a to
3b33739
Compare
3b33739 to
8e5560b
Compare
e273afc to
b297378
Compare
59c6bce to
4542db8
Compare
|
@kosiew Thank you for the feedback! |
|
@kosiew Anything else needed to get this merged? Another approval maybe? |
205f96c to
92899a7
Compare
datafusion/common/src/heap_size.rs
Outdated
| impl<T: DFHeapSize> DFHeapSize for Arc<T> { | ||
| fn heap_size(&self) -> usize { | ||
| // Arc stores weak and strong counts on the heap alongside an instance of T | ||
| 2 * size_of::<usize>() + size_of::<T>() + self.as_ref().heap_size() |
There was a problem hiding this comment.
This won't be accurate.
let a1 = Arc::new(vec![1, 2, 3]);
let a2 = a1.clone();
let a3 = a1.clone();
let a4 = a3.clone();
// this should be true because all `a`s point to the same object in memory
// but the current implementation does not detect this and counts them separately
assert_eq!(a4.heap_size(), a1.heap_size() + a2.heap_size() + a3.heap_size() + a4.heap_size());The only solution I imagine is the caller to keep track of the pointer addresses which have been "sized" and ignore any Arc's which point to an address which has been "sized" earlier.
There was a problem hiding this comment.
Good catch! I took this implementation from https://github.com/apache/arrow-rs/blob/main/parquet/src/file/metadata/memory.rs#L97-L102 . I would suggest to also do a follow-up here. We are planing anyway to restructure the whole heap size estimation.
|
@martin-g Thanks for this great review! I am on it. |
92899a7 to
2e3aff9
Compare
|
I’ve incorporated all reviewer sfeedback and addressed the issues raised.
With these changes, all tests are now passing. However, I’m not fully confident that the overall design is ideal. I’d appreciate another round of review. |
|
@martin-g Could you also do another review round please? |
| STORED AS PARQUET LOCATION 'test_files/scratch/encrypted_parquet/' | ||
|
|
||
| query error DataFusion error: Parquet error: Parquet error: Parquet file has an encrypted footer but decryption properties were not provided | ||
| query error Parquet error: Parquet error: Parquet file has an encrypted footer but decryption properties were not provided |
There was a problem hiding this comment.
Error message changed from:
DataFusion error: Parquet error: Parquet error: Parquet file has an encrypted footer but decryption properties were not provided
to:
DataFusion error: Parquet error: Parquet error: Failed to fetch metadata for file Users/mkleen/datafusion/datafusion/sqllogictest/test_files/scratch/encrypted_parquet/YkbUUuKrhTO6FhwX_3.parquet: Parquet error: Parquet error: Parquet file has an encrypted footer but decryption properties were not provided
|
And sorry to everyone that this took so long. I was very busy and could not find the time. |
| return None; | ||
| } | ||
|
|
||
| let old_value = self.lru_queue.put(key.clone(), value); |
There was a problem hiding this comment.
Nice change overall, but I think there is a bug in the replacement path for memory_used.
When an existing cache entry is overwritten, this path adds the new key size and value size, but if old_value is present it only subtracts old_entry.heap_size(). It does not subtract the old key's heap usage.
That means repeatedly refreshing the same file will slowly inflate memory_used, which can trigger eviction earlier than expected and make the memory bound inaccurate.
Could you update the overwrite path to subtract the previous key contribution as well? Please also add a regression test that does repeated put calls on the same TableScopedPath and verifies memory_used stays stable.
| @@ -1418,8 +1427,11 @@ impl SessionContext { | |||
| schema.deregister_table(&table)?; | |||
| if table_type == TableType::Base | |||
| && let Some(lfc) = self.runtime_env().cache_manager.get_list_files_cache() | |||
There was a problem hiding this comment.
I think this invalidation logic now only runs when both caches are enabled, because get_list_files_cache() and get_file_statistic_cache() are chained in the same if let.
That creates a cleanup gap. For example, if a session disables the list-files cache but keeps the file-statistics cache enabled, deregistering a table will leave the statistics entries behind. That leaves stale session-scoped state around and can leak memory if the table is re-registered.
Can we make these invalidations independent so each cache is cleaned up whenever it is present?
| /// Updates the cache with a new memory limit in bytes. | ||
| fn update_cache_limit(&self, limit: usize); | ||
|
|
||
| /// Retrieves the information about the entries currently cached. |
There was a problem hiding this comment.
Non-blocking thought: now that the file statistics cache key is TableScopedPath, list_entries() -> HashMap<Path, FileStatisticsCacheEntry> seems to flatten away the table dimension again.
That can make observability a bit confusing for helpers like statistics_cache(), especially if two tables point at the same object-store path.
It may be worth exposing TableScopedPath here, or otherwise carrying the table reference through the reported entry, so the debug and introspection surface matches the actual cache semantics.
Which issue does this PR close?
This change introduces a default
FileStatisticsCacheimplementation for the Listing-Table with a size limit, implementing the following steps following #19052 (comment) :Add heap size estimation for file statistics and the relevant data types used in caching (This is temporary until Add a crate for HeapSize trait arrow-rs#9138 is resolved)
Redesign
DefaultFileStatisticsCacheto use aLruQueueto make it memory-bound following Adds memory-bound DefaultListFilesCache #18855Introduce a size limit and use it together with the heap-size to limit the memory usage of the cache
Move
FileStatisticsCachecreation intoCacheManager, making it session-scoped and shared across statements and tables.Disable caching in some of the SQL-logic tests where the change altered the output result, because the cache is now session-scoped and not query-scoped anymore.
Closes Add a default
FileStatisticsCacheimplementation for theListingTable#19217Closes Add limit to
DefaultFileStatisticsCache#19052Rationale for this change
See above.
What changes are included in this PR?
See above.
Are these changes tested?
Yes.
Are there any user-facing changes?
A new runtime setting
datafusion.runtime.file_statistics.cache_limit