Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions core/src/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,33 @@ impl DuckDBTableProviderFactory {

Ok(pool)
}

/// Drop the cached pool entry for `key` if any, returning the previously
/// cached pool. Subsequent calls to `get_or_init_*` for the same key will
/// build a fresh pool.
///
/// This is intended for callers that replace the underlying database file
/// out-of-band (for example, after restoring it from a snapshot in object
/// storage). Existing connections held by other clones of the previously
/// returned pool keep operating against the file descriptor they opened;
/// once they are dropped the OS releases the prior inode. New providers
/// built after invalidation will open the file fresh and observe the
/// replacement contents.
pub async fn invalidate_instance(&self, key: &DbInstanceKey) -> Option<DuckDbConnectionPool> {
self.instances.lock().await.remove(key)
}

/// Drop the cached pool entry for the file at `path` if any.
///
/// Convenience wrapper over [`Self::invalidate_instance`] for the common
/// file-mode case.
pub async fn invalidate_file_instance(
&self,
path: impl Into<Arc<str>>,
) -> Option<DuckDbConnectionPool> {
self.invalidate_instance(&DbInstanceKey::file(path.into()))
.await
}
}

type DynDuckDbConnectionPool = dyn DbConnectionPool<r2d2::PooledConnection<DuckdbConnectionManager>, DuckDBParameter>
Expand Down Expand Up @@ -779,6 +806,45 @@ pub(crate) mod tests {
use std::collections::HashMap;
use std::sync::Arc;

#[tokio::test]
async fn invalidate_instance_drops_cached_pool() {
let factory = DuckDBTableProviderFactory::new(duckdb::AccessMode::ReadWrite);
let options = HashMap::new();

// First call populates the cache.
let _pool1 = factory
.get_or_init_memory_instance(&options)
.await
.expect("first init");
assert_eq!(factory.instances.lock().await.len(), 1);

// Second call (without invalidation) returns the cached pool clone
// without growing the registry.
let _pool2 = factory
.get_or_init_memory_instance(&options)
.await
.expect("cached init");
assert_eq!(factory.instances.lock().await.len(), 1);

// Invalidate; entry is evicted and returned.
let evicted = factory.invalidate_instance(&DbInstanceKey::memory()).await;
assert!(evicted.is_some(), "invalidate returns evicted pool");
assert_eq!(factory.instances.lock().await.len(), 0);

// Re-invalidating a missing key is a no-op.
assert!(factory
.invalidate_instance(&DbInstanceKey::file("never-cached".into()))
.await
.is_none());

// Next get_or_init repopulates the cache.
let _pool3 = factory
.get_or_init_memory_instance(&options)
.await
.expect("reinit after invalidate");
assert_eq!(factory.instances.lock().await.len(), 1);
}

#[tokio::test]
async fn test_create_with_memory_limit() {
let table_name = TableReference::bare("test_table");
Expand Down
70 changes: 70 additions & 0 deletions core/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,30 @@ impl SqliteTableProviderFactory {

Ok(pool)
}

/// Drop the cached pool entry for `key` if any, returning the previously
/// cached pool. Subsequent calls to [`Self::get_or_init_instance`] for
/// the same key will build a fresh pool.
///
/// This is intended for callers that replace the underlying database file
/// out-of-band (for example, after restoring it from a snapshot in object
/// storage). See [`crate::duckdb::DuckDBTableProviderFactory::invalidate_instance`]
/// for the semantics around in-flight connections.
pub async fn invalidate_instance(&self, key: &DbInstanceKey) -> Option<SqliteConnectionPool> {
self.instances.lock().await.remove(key)
}

/// Drop the cached pool entry for the file at `db_path` if any.
///
/// Convenience wrapper over [`Self::invalidate_instance`] for the common
/// file-mode case.
pub async fn invalidate_file_instance(
&self,
db_path: impl Into<Arc<str>>,
) -> Option<SqliteConnectionPool> {
self.invalidate_instance(&DbInstanceKey::file(db_path.into()))
.await
}
}

impl Default for SqliteTableProviderFactory {
Expand Down Expand Up @@ -1511,6 +1535,52 @@ pub(crate) mod tests {

use super::*;

#[tokio::test]
async fn invalidate_instance_drops_cached_pool() {
let factory = SqliteTableProviderFactory::new();
let path: Arc<str> = Arc::from("/tmp/spice-tp-invalidate-test.db");
// Best-effort cleanup of leftover files from prior runs.
let _ = std::fs::remove_file(path.as_ref());

let _pool = factory
.get_or_init_instance(
Arc::clone(&path),
Mode::File,
std::time::Duration::from_millis(5_000),
)
.await
.expect("init file pool");
assert_eq!(factory.instances.lock().await.len(), 1);

let evicted = factory
.invalidate_instance(&DbInstanceKey::file(Arc::clone(&path)))
.await;
assert!(evicted.is_some(), "invalidate returns evicted pool");
assert_eq!(factory.instances.lock().await.len(), 0);

// Convenience wrapper: re-init then invalidate via path.
let _pool2 = factory
.get_or_init_instance(
Arc::clone(&path),
Mode::File,
std::time::Duration::from_millis(5_000),
)
.await
.expect("reinit file pool");
assert_eq!(factory.instances.lock().await.len(), 1);
let evicted2 = factory.invalidate_file_instance(Arc::clone(&path)).await;
assert!(evicted2.is_some());
assert_eq!(factory.instances.lock().await.len(), 0);

// Missing key is a no-op.
assert!(factory
.invalidate_instance(&DbInstanceKey::file("never-cached".into()))
.await
.is_none());

let _ = std::fs::remove_file(path.as_ref());
}

#[tokio::test]
async fn test_sqlite_table_creation_with_indexes() {
let schema = Arc::new(Schema::new(vec![
Expand Down
Loading