diff --git a/core/src/duckdb.rs b/core/src/duckdb.rs index c21cadaf..35e87971 100644 --- a/core/src/duckdb.rs +++ b/core/src/duckdb.rs @@ -356,6 +356,33 @@ impl DuckDBTableProviderFactory { Ok(pool) } + + /// Drop the cached pool entry for `key` if any, returning the previously + /// cached pool. Subsequent calls to `get_or_init_*` for the same key will + /// build a fresh pool. + /// + /// This is intended for callers that replace the underlying database file + /// out-of-band (for example, after restoring it from a snapshot in object + /// storage). Existing connections held by other clones of the previously + /// returned pool keep operating against the file descriptor they opened; + /// once they are dropped the OS releases the prior inode. New providers + /// built after invalidation will open the file fresh and observe the + /// replacement contents. + pub async fn invalidate_instance(&self, key: &DbInstanceKey) -> Option { + self.instances.lock().await.remove(key) + } + + /// Drop the cached pool entry for the file at `path` if any. + /// + /// Convenience wrapper over [`Self::invalidate_instance`] for the common + /// file-mode case. + pub async fn invalidate_file_instance( + &self, + path: impl Into>, + ) -> Option { + self.invalidate_instance(&DbInstanceKey::file(path.into())) + .await + } } type DynDuckDbConnectionPool = dyn DbConnectionPool, DuckDBParameter> @@ -779,6 +806,45 @@ pub(crate) mod tests { use std::collections::HashMap; use std::sync::Arc; + #[tokio::test] + async fn invalidate_instance_drops_cached_pool() { + let factory = DuckDBTableProviderFactory::new(duckdb::AccessMode::ReadWrite); + let options = HashMap::new(); + + // First call populates the cache. + let _pool1 = factory + .get_or_init_memory_instance(&options) + .await + .expect("first init"); + assert_eq!(factory.instances.lock().await.len(), 1); + + // Second call (without invalidation) returns the cached pool clone + // without growing the registry. + let _pool2 = factory + .get_or_init_memory_instance(&options) + .await + .expect("cached init"); + assert_eq!(factory.instances.lock().await.len(), 1); + + // Invalidate; entry is evicted and returned. + let evicted = factory.invalidate_instance(&DbInstanceKey::memory()).await; + assert!(evicted.is_some(), "invalidate returns evicted pool"); + assert_eq!(factory.instances.lock().await.len(), 0); + + // Re-invalidating a missing key is a no-op. + assert!(factory + .invalidate_instance(&DbInstanceKey::file("never-cached".into())) + .await + .is_none()); + + // Next get_or_init repopulates the cache. + let _pool3 = factory + .get_or_init_memory_instance(&options) + .await + .expect("reinit after invalidate"); + assert_eq!(factory.instances.lock().await.len(), 1); + } + #[tokio::test] async fn test_create_with_memory_limit() { let table_name = TableReference::bare("test_table"); diff --git a/core/src/sqlite.rs b/core/src/sqlite.rs index 2ad5588d..c76ce6ca 100644 --- a/core/src/sqlite.rs +++ b/core/src/sqlite.rs @@ -246,6 +246,30 @@ impl SqliteTableProviderFactory { Ok(pool) } + + /// Drop the cached pool entry for `key` if any, returning the previously + /// cached pool. Subsequent calls to [`Self::get_or_init_instance`] for + /// the same key will build a fresh pool. + /// + /// This is intended for callers that replace the underlying database file + /// out-of-band (for example, after restoring it from a snapshot in object + /// storage). See [`crate::duckdb::DuckDBTableProviderFactory::invalidate_instance`] + /// for the semantics around in-flight connections. + pub async fn invalidate_instance(&self, key: &DbInstanceKey) -> Option { + self.instances.lock().await.remove(key) + } + + /// Drop the cached pool entry for the file at `db_path` if any. + /// + /// Convenience wrapper over [`Self::invalidate_instance`] for the common + /// file-mode case. + pub async fn invalidate_file_instance( + &self, + db_path: impl Into>, + ) -> Option { + self.invalidate_instance(&DbInstanceKey::file(db_path.into())) + .await + } } impl Default for SqliteTableProviderFactory { @@ -1511,6 +1535,52 @@ pub(crate) mod tests { use super::*; + #[tokio::test] + async fn invalidate_instance_drops_cached_pool() { + let factory = SqliteTableProviderFactory::new(); + let path: Arc = Arc::from("/tmp/spice-tp-invalidate-test.db"); + // Best-effort cleanup of leftover files from prior runs. + let _ = std::fs::remove_file(path.as_ref()); + + let _pool = factory + .get_or_init_instance( + Arc::clone(&path), + Mode::File, + std::time::Duration::from_millis(5_000), + ) + .await + .expect("init file pool"); + assert_eq!(factory.instances.lock().await.len(), 1); + + let evicted = factory + .invalidate_instance(&DbInstanceKey::file(Arc::clone(&path))) + .await; + assert!(evicted.is_some(), "invalidate returns evicted pool"); + assert_eq!(factory.instances.lock().await.len(), 0); + + // Convenience wrapper: re-init then invalidate via path. + let _pool2 = factory + .get_or_init_instance( + Arc::clone(&path), + Mode::File, + std::time::Duration::from_millis(5_000), + ) + .await + .expect("reinit file pool"); + assert_eq!(factory.instances.lock().await.len(), 1); + let evicted2 = factory.invalidate_file_instance(Arc::clone(&path)).await; + assert!(evicted2.is_some()); + assert_eq!(factory.instances.lock().await.len(), 0); + + // Missing key is a no-op. + assert!(factory + .invalidate_instance(&DbInstanceKey::file("never-cached".into())) + .await + .is_none()); + + let _ = std::fs::remove_file(path.as_ref()); + } + #[tokio::test] async fn test_sqlite_table_creation_with_indexes() { let schema = Arc::new(Schema::new(vec![