diff --git a/README.md b/README.md index 62ac419..e101abf 100644 --- a/README.md +++ b/README.md @@ -30,3 +30,7 @@ meson compile -C _build ### Use libp2panda in python The tests contain a example on how to use libp2padna in python. + +## Credits + +Thanks a lot to Sophie Herold, for writing [libglycin](https://gitlab.gnome.org/GNOME/glycin) that was used as a base for figuring out how to write the bindings and Sergey Bugaev for reviewing the GLib introspection API. diff --git a/libp2panda/include/p2panda.h b/libp2panda/include/p2panda.h index 75448fd..bd3152f 100644 --- a/libp2panda/include/p2panda.h +++ b/libp2panda/include/p2panda.h @@ -6,10 +6,63 @@ G_BEGIN_DECLS +/** + * P2pandaNode:bootstrap: + * + * A node that is used to find other nodes. + */ + +/** + * P2pandaNode:database-url: + * + * The [uri](https://www.sqlite.org/uri.html) to a sqlite database. + */ + +/** + * P2pandaNode:network-id: + * + * The id of the network to this node will join. + */ + +/** + * P2pandaNode:private-key: + * + * The private key representing the node's identity. + */ + +/** + * P2pandaNode:relay-url: + * + * The url to a [iroh relay](https://docs.iroh.computer/concepts/relays). + */ + /** * P2pandaNode::system-event: * - * Emitted on system events + * Emitted on system events. + * + * This signal is emitted on the main context that was thread default when the node was spawned. + */ + +/** + * P2pandaTopic:node: + * + * A spawned [class@Node]. + * + */ + +/** + * P2pandaTopic:flags: + * + * Configuration for the [class@Topic]. + * + */ + +/** + * P2pandaTopic:topic-id: + * + * The id of the topic this [class@Topic] will be subscribed to. + * */ /** @@ -19,7 +72,9 @@ G_BEGIN_DECLS * @datetime: The timestamp * @bytes: The message * - * Emitted on incomming persistent message + * Emitted on incomming persistent message. + * + * This signal is emitted on the main context that was thread default when the topic was spawned. */ /** @@ -29,7 +84,9 @@ G_BEGIN_DECLS * @datetime: The timestamp * @bytes: The content of the message * - * Emitted on incomming ephemeral message + * Emitted on incomming ephemeral message. + * + * This signal is emitted on the main context that was thread default when the topic was spawned. */ /** @@ -42,7 +99,9 @@ G_BEGIN_DECLS * @incoming_bytes: * @outgoing_bytes: * - * Emitted for errors + * Emitted for errors. + * + * This signal is emitted on the main context that was thread default when the topic was spawned. */ /** @@ -51,7 +110,9 @@ G_BEGIN_DECLS * @remote_node_id: The public key for the remote node * @session_id: * - * Emitted when a topic finishes syncing + * Emitted when a topic finishes syncing. + * + * This signal is emitted on the main context that was thread default when the topic was spawned. */ /** @@ -59,7 +120,9 @@ G_BEGIN_DECLS * @topic: * @error: * - * Emitted when a topic finishes syncing + * Emitted when a topic finishes syncing. + * + * This signal is emitted on the main context that was thread default when the topic was spawned. */ /** @@ -160,18 +223,18 @@ const uint8_t* p2panda_node_id_get_data(P2pandaNodeId *node_id); /** * P2pandaTopicFlags: - * @P2PANDA_TOPIC_FLAGS_NONE: - * @P2PANDA_TOPIC_FLAGS_PERSISTENT: - * @P2PANDA_TOPIC_FLAGS_EPHEMERAL: - * @P2PANDA_TOPIC_FLAGS_FROM_START: + * @P2PANDA_TOPIC_NONE: + * @P2PANDA_TOPIC_PERSISTENT: + * @P2PANDA_TOPIC_EPHEMERAL: + * @P2PANDA_TOPIC_FROM_START: * */ typedef enum { - P2PANDA_TOPIC_FLAGS_NONE, - P2PANDA_TOPIC_FLAGS_PERSISTENT, - P2PANDA_TOPIC_FLAGS_EPHEMERAL, - P2PANDA_TOPIC_FLAGS_FROM_START, + P2PANDA_TOPIC_NONE = 0, + P2PANDA_TOPIC_PERSISTENT = 1 << 0, + P2PANDA_TOPIC_EPHEMERAL = 1 << 1, + P2PANDA_TOPIC_FROM_START = 1 << 2, } P2pandaTopicFlags; GType p2panda_topic_flags_get_type(void); @@ -329,9 +392,14 @@ P2pandaNode *p2panda_node_new(P2pandaPrivateKey *private_key, * p2panda_node_spawn_async: * @node: * @cancellable: (nullable): - * @callback: + * @callback: (nullable): * @user_data: user data to pass to @callback * + * Spawn the node. + * + * Before spawning the node no network communcation takes place, and the [class@Node] + * can't be used. + * */ void p2panda_node_spawn_async(P2pandaNode *node, GCancellable *cancellable, @@ -362,7 +430,7 @@ gboolean p2panda_node_spawn_finish(P2pandaNode *node, * * Create a topic handle for the give [class@Node] * - * Returns: (transfer full): a new [class@Node] + * Returns: (transfer full): a new [class@Topic] */ P2pandaTopic *p2panda_topic_new(P2pandaNode *node, P2pandaTopicId *topic_id, @@ -372,9 +440,14 @@ P2pandaTopic *p2panda_topic_new(P2pandaNode *node, * p2panda_topic_spawn_async: * @topic: * @cancellable: (nullable): - * @callback: + * @callback: (nullable): * @user_data: user data to pass to @callback * + * Spawn the topic. + * + * Before spawning the topic no network communcation takes place, and the [class@Topic] + * won't be able to send nor receive messages. + * */ void p2panda_topic_spawn_async(P2pandaTopic *topic, GCancellable *cancellable, @@ -398,12 +471,14 @@ gboolean p2panda_topic_spawn_finish(P2pandaTopic *topic, /** * p2panda_topic_publish_async: * @topic: - * @bytes: (transfer full): + * @bytes: (transfer none): * @ephemeral: Whether this message should be ephemeral or persistent * @cancellable: (nullable): - * @callback: + * @callback (nullable): * @user_data: user data to pass to @callback * + * Publish a message to a topic. + * */ void p2panda_topic_publish_async(P2pandaTopic *topic, GBytes *bytes, @@ -413,7 +488,7 @@ void p2panda_topic_publish_async(P2pandaTopic *topic, gpointer user_data); /** - * p2panda_publish_publish_finish: + * p2panda_topic_publish_finish: * @topic: * @result: A [iface@Gio.AsyncResult] * @error: @@ -430,13 +505,17 @@ gboolean p2panda_topic_publish_finish(P2pandaTopic *topic, /** * P2pandaError: - * @P2PANDA_LOADER_ERROR_FAILED: - * @P2PANDA_LOADER_ERROR_UNKNOWN_IMAGE_FORMAT: - * @P2PANDA_LOADER_ERROR_NO_MORE_FRAMES: - * - * Errors that can appear while loading images. - * - * Since: 2.0 + * @P2PANDA_ERROR_FAILED: + * @P2PANDA_ERROR_SPAWN_NODE: + * @P2PANDA_ERROR_SPAWN_TOPIC: + * @P2PANDA_ERROR_NOT_SPAWNED: + * @P2PANDA_ERROR_DECODING: + * @P2PANDA_ERROR_REPLAY: + * @P2PANDA_ERROR_HAS_NO_PERSISTENT: + * @P2PANDA_ERROR_PUBLISH: + * @P2PANDA_ERROR_SIGNATURE, + * + * Errors that may happen while interacting with a node and topic */ typedef enum { diff --git a/libp2panda/src/identity.rs b/libp2panda/src/identity.rs index 582955c..3c16760 100644 --- a/libp2panda/src/identity.rs +++ b/libp2panda/src/identity.rs @@ -34,7 +34,7 @@ pub unsafe extern "C" fn p2panda_private_key_new() -> *const identity::PrivateKe #[unsafe(no_mangle)] pub unsafe extern "C" fn p2panda_private_key_get_public_key( private_key: *const identity::PrivateKey, -) -> *const identity::PublicKey { +) -> *mut identity::PublicKey { unsafe { let private_key = identity::PrivateKey::from_glib_none(private_key); private_key.public_key().into_glib_ptr() diff --git a/libp2panda/src/node.rs b/libp2panda/src/node.rs index 6148152..f4c8c28 100644 --- a/libp2panda/src/node.rs +++ b/libp2panda/src/node.rs @@ -1,5 +1,6 @@ use std::ffi::c_char; use std::ffi::c_int; +use std::ops::Deref; use futures::future::{AbortHandle, Abortable}; use gio::ffi::{GAsyncReadyCallback, GAsyncResult, GTask}; @@ -126,23 +127,23 @@ pub unsafe extern "C" fn p2panda_node_new( mdns_mode: c_int, ) -> *mut P2pandaNode { unsafe { - let private_key = Option::::from_glib_none(private_key); + let private_key = Option::::from_glib_borrow(private_key); let database_url = if database_url.is_null() { None } else { glib::GStr::from_ptr_checked(database_url) }; - let network_id = Option::::from_glib_none(network_id); - let relay_url = Option::::from_glib_none(relay_url); - let bootstrap_node = Option::::from_glib_none(bootstrap_node); + let network_id = Option::::from_glib_borrow(network_id); + let relay_url = Option::::from_glib_borrow(relay_url); + let bootstrap_node = Option::::from_glib_borrow(bootstrap_node); let mdns_mode = node::MdnsDiscoveryMode::from_glib(mdns_mode); node::Node::new( - private_key.as_ref(), + private_key.deref().as_ref(), database_url, - network_id.as_ref(), - relay_url.as_ref(), - bootstrap_node.as_ref(), + network_id.deref().as_ref(), + relay_url.deref().as_ref(), + bootstrap_node.deref().as_ref(), mdns_mode, ) .into_glib_ptr() @@ -159,7 +160,7 @@ pub unsafe extern "C" fn p2panda_node_spawn_async( unsafe { let obj = node::Node::from_glib_none(node); let cancellable: Option = from_glib_none(cancellable); - let callback = GAsyncReadyCallbackSend::new(callback, user_data); + let callback = callback.map(|callback| GAsyncReadyCallbackSend::new(callback, user_data)); let (abort_handle, abort_registration) = AbortHandle::new_pair(); let cancel_signal = if let Some(cancellable) = &cancellable { @@ -174,8 +175,10 @@ pub unsafe extern "C" fn p2panda_node_spawn_async( cancellable.disconnect_cancelled(cancel_signal); } - let result = task.upcast_ref::().as_ptr(); - callback.call(obj.unwrap(), result); + if let Some(callback) = callback { + let result = task.upcast_ref::().as_ptr(); + callback.call(obj.unwrap(), result); + } }; let task = gio::Task::new(Some(&obj), cancellable_.as_ref(), closure); diff --git a/libp2panda/src/topic.rs b/libp2panda/src/topic.rs index 010e155..e6eb4b0 100644 --- a/libp2panda/src/topic.rs +++ b/libp2panda/src/topic.rs @@ -76,7 +76,7 @@ pub unsafe extern "C" fn p2panda_topic_spawn_async( unsafe { let obj = topic::Topic::from_glib_none(topic); let cancellable: Option = from_glib_none(cancellable); - let callback = GAsyncReadyCallbackSend::new(callback, user_data); + let callback = callback.map(|callback| GAsyncReadyCallbackSend::new(callback, user_data)); let (abort_handle, abort_registration) = AbortHandle::new_pair(); let cancel_signal = if let Some(cancellable) = &cancellable { @@ -91,8 +91,10 @@ pub unsafe extern "C" fn p2panda_topic_spawn_async( cancellable.disconnect_cancelled(cancel_signal); } - let result = task.upcast_ref::().as_ptr(); - callback.call(obj.unwrap(), result); + if let Some(callback) = callback { + let result = task.upcast_ref::().as_ptr(); + callback.call(obj.unwrap(), result); + } }; let task = gio::Task::new(Some(&obj), cancellable_.as_ref(), closure); @@ -142,9 +144,9 @@ pub unsafe extern "C" fn p2panda_topic_publish_async( ) { unsafe { let obj = topic::Topic::from_glib_none(topic); - let bytes = glib::Bytes::from_glib_full(bytes); + let bytes = glib::Bytes::from_glib_none(bytes); let cancellable: Option = from_glib_none(cancellable); - let callback = GAsyncReadyCallbackSend::new(callback, user_data); + let callback = callback.map(|callback| GAsyncReadyCallbackSend::new(callback, user_data)); let (abort_handle, abort_registration) = AbortHandle::new_pair(); let cancel_signal = if let Some(cancellable) = &cancellable { @@ -159,8 +161,10 @@ pub unsafe extern "C" fn p2panda_topic_publish_async( cancellable.disconnect_cancelled(cancel_signal); } - let result = task.upcast_ref::().as_ptr(); - callback.call(obj.unwrap(), result); + if let Some(callback) = callback { + let result = task.upcast_ref::().as_ptr(); + callback.call(obj.unwrap(), result); + } }; let task = gio::Task::new(Some(&obj), cancellable_.as_ref(), closure); diff --git a/libp2panda/src/utils.rs b/libp2panda/src/utils.rs index 1b12681..57af1b5 100644 --- a/libp2panda/src/utils.rs +++ b/libp2panda/src/utils.rs @@ -1,18 +1,16 @@ /* Taken from glycin https://gitlab.gnome.org/GNOME/glycin/-/blob/main/libglycin/src/common.rs */ -use gio::ffi::GAsyncReadyCallback; +use gio::ffi::GAsyncResult; use gio::prelude::*; -use glib::ffi::gpointer; +use glib::{ffi::gpointer, gobject_ffi::GObject}; + +type GAsyncReadyCallback = unsafe extern "C" fn(*mut GObject, *mut GAsyncResult, gpointer); struct GPointerSend(pub gpointer); unsafe impl Send for GPointerSend {} pub struct GAsyncReadyCallbackSend { - callback: unsafe extern "C" fn( - *mut glib::gobject_ffi::GObject, - *mut gio::ffi::GAsyncResult, - gpointer, - ), + callback: GAsyncReadyCallback, user_data: GPointerSend, } @@ -21,7 +19,7 @@ unsafe impl Send for GAsyncReadyCallbackSend {} impl GAsyncReadyCallbackSend { pub fn new(callback: GAsyncReadyCallback, user_data: gpointer) -> Self { Self { - callback: callback.unwrap(), + callback, user_data: GPointerSend(user_data), } } diff --git a/p2panda-gobject/src/node.rs b/p2panda-gobject/src/node.rs index d87a4ca..d5828ac 100644 --- a/p2panda-gobject/src/node.rs +++ b/p2panda-gobject/src/node.rs @@ -10,6 +10,7 @@ use p2panda::node; use rand::prelude::*; use tokio::runtime::Runtime; use tokio::sync::Mutex; +use tokio::task::AbortHandle; use tokio_stream::StreamExt; use crate::{error::Error, identity::PrivateKey}; @@ -47,7 +48,10 @@ pub struct NodeId { impl From for NodeId { fn from(public_key: node::PublicKey) -> Self { - NodeId { id: public_key, relay_url: None } + NodeId { + id: public_key, + relay_url: None, + } } } @@ -55,8 +59,9 @@ impl NodeId { pub fn from_data(data: [u8; 32], relay_url: Option) -> Result { let id = node::NodeId::try_from(data) .map_err(|error| glib::Error::new(Error::Signature, &error.to_string()))?; - let relay_url = relay_url.map(|relay_url| - node::RelayUrl::from_str(relay_url.to_str().as_str()).expect("Malformed URL")); + let relay_url = relay_url.map(|relay_url| { + node::RelayUrl::from_str(relay_url.to_str().as_str()).expect("Malformed URL") + }); Ok(Self { id, relay_url }) } @@ -107,6 +112,7 @@ pub mod imp { runtime: OnceLock, pub(super) node_builder: RefCell>, pub(super) node: OnceCell, + pub(super) system_event_abort_handle: OnceCell, } #[glib::object_subclass] @@ -116,6 +122,12 @@ pub mod imp { } impl ObjectImpl for Node { + fn dispose(&self) { + if let Some(abort_handle) = self.system_event_abort_handle.get() { + abort_handle.abort(); + } + } + fn signals() -> &'static [Signal] { static SIGNALS: OnceLock> = OnceLock::new(); SIGNALS.get_or_init(|| vec![Signal::builder("system-event").build()]) @@ -188,7 +200,9 @@ pub mod imp { .expect("type conformity checked by `Object::set_property`") .map(|node_id| { let id = node_id.id; - let relay_url = node_id.relay_url.expect("A boostrap node needs a known relay url"); + let relay_url = node_id + .relay_url + .expect("A boostrap node needs a known relay url"); builder.bootstrap(id, relay_url) }), @@ -272,15 +286,25 @@ impl Node { let weak_obj = obj.downgrade(); // TODO: We need to abort this spawn when the Node is dropped (but the Runtime is // dropped anyway) - tokio::spawn(async move { + let main_context = glib::MainContext::ref_thread_default(); + let abort_handle = tokio::spawn(async move { tokio::pin!(event_stream); while let Some(_event) = event_stream.next().await { - if let Some(obj) = weak_obj.upgrade() { - // TODO: add the event to the signal - obj.emit_by_name::<()>("system-event", &[]); - } + let weak_obj = weak_obj.clone(); + main_context.invoke(move || { + if let Some(obj) = weak_obj.upgrade() { + // TODO: add the event to the signal + obj.emit_by_name::<()>("system-event", &[]); + } + }) } - }); + }) + .abort_handle(); + + obj.imp() + .system_event_abort_handle + .set(abort_handle) + .expect("Node can be spawned only once"); *spawned = true; diff --git a/p2panda-gobject/src/topic.rs b/p2panda-gobject/src/topic.rs index 0600707..48d7c18 100644 --- a/p2panda-gobject/src/topic.rs +++ b/p2panda-gobject/src/topic.rs @@ -8,6 +8,7 @@ use glib::subclass::prelude::*; use p2panda::{node, streams}; use tokio::sync::Mutex; +use tokio::task::AbortHandle; use tokio_stream::StreamExt; use crate::{ @@ -40,10 +41,10 @@ type Body = Vec; #[glib::flags(name = "P2pandaTopicFlags")] pub enum TopicFlags { - NONE = (1 << 0), - PERSISTENT = (1 << 1), - EPHEMERAL = (1 << 2), - FROM_START = (1 << 3), + NONE = 0, + PERSISTENT = (1 << 0), + EPHEMERAL = (1 << 1), + FROM_START = (1 << 2), } impl Default for TopicFlags { @@ -69,6 +70,8 @@ pub mod imp { flags: Cell, pub(super) stream_publisher: OnceCell>, pub(super) ephemeral_publisher: OnceCell>, + pub(super) stream_abort_handle: OnceCell, + pub(super) ephemeral_abort_handle: OnceCell, } #[glib::object_subclass] @@ -79,6 +82,15 @@ pub mod imp { #[glib::derived_properties] impl ObjectImpl for Topic { + fn dispose(&self) { + if let Some(abort_handle) = self.stream_abort_handle.get() { + abort_handle.abort(); + } + if let Some(abort_handle) = self.ephemeral_abort_handle.get() { + abort_handle.abort(); + } + } + fn signals() -> &'static [Signal] { static SIGNALS: OnceLock> = OnceLock::new(); SIGNALS.get_or_init(|| { @@ -241,17 +253,28 @@ impl Topic { .set(publisher) .expect("Topic can be spawned only once"); + let main_context = glib::MainContext::ref_thread_default(); let obj_weak = obj.downgrade(); // TODO: we need to abort this spawn when the Topic is dropped - tokio::spawn(async move { + let abort_handle = tokio::spawn(async move { while let Some(event) = subscription.next().await { - if let Some(obj) = obj_weak.upgrade() { - obj.emit_signal_for_event(event).await; - } else { - break; - } + let obj_weak = obj_weak.clone(); + main_context + .spawn(async move { + if let Some(obj) = obj_weak.upgrade() { + obj.emit_signal_for_event(event).await; + } + }) + .await + .unwrap(); } - }); + }) + .abort_handle(); + + obj.imp() + .stream_abort_handle + .set(abort_handle) + .expect("Topic can be spawned only once"); } if flags.contains(TopicFlags::EPHEMERAL) { @@ -265,17 +288,25 @@ impl Topic { .set(publisher) .expect("Topic can be spawned only once"); + let main_context = glib::MainContext::ref_thread_default(); let obj_weak = obj.downgrade(); // TODO: we need to abort this spawn when the Topic is dropped - tokio::spawn(async move { + let abort_handle = tokio::spawn(async move { while let Some(message) = subscription.next().await { - if let Some(obj) = obj_weak.upgrade() { - obj.emit_signal_for_ephemeral_message(message); - } else { - break; - } + let obj_weak = obj_weak.clone(); + main_context.invoke(move || { + if let Some(obj) = obj_weak.upgrade() { + obj.emit_signal_for_ephemeral_message(message); + } + }); } - }); + }) + .abort_handle(); + + obj.imp() + .ephemeral_abort_handle + .set(abort_handle) + .expect("Topic can be spawned only once"); } *spawned = true; @@ -298,7 +329,6 @@ impl Topic { let datetime = glib::DateTime::from_unix_utc_usec(operation.timestamp() as i64).unwrap(); let bytes: glib::Bytes = operation.message().into(); - //TODO: invoke on the thread owning the main context let ack = self.emit_by_name::( "message", &[ @@ -347,7 +377,10 @@ impl Topic { received_bytes_topic_total: _, error: _, } => { - self.emit_by_name::<()>("sync-ended", &[&NodeId::from(remote_node_id), &session_id]); + self.emit_by_name::<()>( + "sync-ended", + &[&NodeId::from(remote_node_id), &session_id], + ); } streams::StreamEvent::DecodingFailed { error, .. } => { // TODO: figure out whether we need to expose more about the event