Use tokio async/await instead of callbacks

This commit is contained in:
junderw
2023-06-24 19:28:19 -07:00
committed by Mononaut
parent 99101ddc4f
commit 46fb6a68eb
6 changed files with 148 additions and 67 deletions

View File

@@ -15,7 +15,7 @@ crate-type = ["cdylib"]
priority-queue = "1.3.2"
bytes = "1.4.0"
once_cell = "1.18.0"
napi = { version = "2.13.2", features = ["napi8"] }
napi = { version = "2.13.2", features = ["napi8", "tokio_rt"] }
napi-derive = "2.13.0"
[build-dependencies]

View File

@@ -3,8 +3,8 @@
/* auto-generated by NAPI-RS */
export function make(mempoolBuffer: Uint8Array, callback: (result: GbtResult) => void): void
export function update(newTxs: Uint8Array, removeTxs: Uint8Array, callback: (result: GbtResult) => void): void
export function make(mempoolBuffer: Uint8Array): Promise<GbtResult>
export function update(newTxs: Uint8Array, removeTxs: Uint8Array): Promise<GbtResult>
/**
* The result from calling the gbt function.
*

View File

@@ -7,8 +7,8 @@
"scripts": {
"artifacts": "napi artifacts",
"build": "napi build --platform",
"build-debug": "npm run build --",
"build-release": "npm run build -- --release",
"build-debug": "npm run build",
"build-release": "npm run build -- --release --strip",
"install": "npm run build-release",
"prepublishOnly": "napi prepublish -t npm",
"test": "cargo test"

View File

@@ -1,7 +1,4 @@
use napi::{
bindgen_prelude::*,
threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi::bindgen_prelude::*;
use napi_derive::napi;
use once_cell::sync::Lazy;
@@ -14,43 +11,41 @@ mod thread_transaction;
mod utils;
use thread_transaction::ThreadTransaction;
/// Used for ThreadsafeFunction's queue size parameter
const UNBOUNDED_QUEUE: usize = 0;
static THREAD_TRANSACTIONS: Lazy<Mutex<HashMap<u32, ThreadTransaction>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
#[napi(ts_args_type = "mempoolBuffer: Uint8Array, callback: (result: GbtResult) => void")]
pub fn make(mempool_buffer: Uint8Array, callback: JsFunction) -> Result<()> {
#[napi(ts_args_type = "mempoolBuffer: Uint8Array")]
pub async fn make(mempool_buffer: Uint8Array) -> Result<GbtResult> {
let mut map = HashMap::new();
for tx in ThreadTransaction::batch_from_buffer(&mempool_buffer) {
map.insert(tx.uid, tx);
}
let mut global_map = THREAD_TRANSACTIONS
.lock()
.map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?;
*global_map = map;
{
let mut global_map = THREAD_TRANSACTIONS
.lock()
.map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?;
*global_map = map;
}
run_in_thread(callback)
run_in_thread().await
}
#[napi(
ts_args_type = "newTxs: Uint8Array, removeTxs: Uint8Array, callback: (result: GbtResult) => void"
)]
pub fn update(new_txs: Uint8Array, remove_txs: Uint8Array, callback: JsFunction) -> Result<()> {
let mut map = THREAD_TRANSACTIONS
.lock()
.map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?;
for tx in ThreadTransaction::batch_from_buffer(&new_txs) {
map.insert(tx.uid, tx);
#[napi(ts_args_type = "newTxs: Uint8Array, removeTxs: Uint8Array")]
pub async fn update(new_txs: Uint8Array, remove_txs: Uint8Array) -> Result<GbtResult> {
{
let mut map = THREAD_TRANSACTIONS
.lock()
.map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?;
for tx in ThreadTransaction::batch_from_buffer(&new_txs) {
map.insert(tx.uid, tx);
}
for txid in &utils::txids_from_buffer(&remove_txs) {
map.remove(txid);
}
}
for txid in &utils::txids_from_buffer(&remove_txs) {
map.remove(txid);
}
drop(map);
run_in_thread(callback)
run_in_thread().await
}
/// The result from calling the gbt function.
@@ -66,31 +61,15 @@ pub struct GbtResult {
pub rates: Vec<Vec<f64>>, // Tuples not supported. u32 fits inside f64
}
fn run_in_thread(callback: JsFunction) -> Result<()> {
let thread_safe_callback: ThreadsafeFunction<GbtResult, ErrorStrategy::Fatal> =
callback.create_threadsafe_function(UNBOUNDED_QUEUE, |ctx| Ok(vec![ctx.value]))?;
let handle = std::thread::spawn(move || {
let result = {
let mut map = THREAD_TRANSACTIONS
.lock()
.map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?;
gbt::gbt(&mut map).ok_or_else(|| napi::Error::from_reason("gbt failed"))?
};
// Note: A call mode of Blocking does not mean it will block, but rather it tells
// the N-API what to do in the event of a full queue.
// The queue will never be full, so Blocking is fine.
match thread_safe_callback.call(result, ThreadsafeFunctionCallMode::Blocking) {
Status::Ok => Ok(()),
error => Err(napi::Error::from_reason(format!(
"Callback failure: {}",
error
))),
}
async fn run_in_thread() -> Result<GbtResult> {
let handle = napi::tokio::task::spawn_blocking(move || {
let mut map = THREAD_TRANSACTIONS
.lock()
.map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?;
gbt::gbt(&mut map).ok_or_else(|| napi::Error::from_reason("gbt failed"))
});
handle
.join()
.await
.map_err(|_| napi::Error::from_reason("thread panicked"))?
}