Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 66 additions & 52 deletions src/async_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bdk_chain::{
use bitcoin::{Amount, BlockHash, OutPoint, ScriptBuf, Transaction, TxOut, Txid, consensus};
use sqlx::{
Row,
sqlite::{SqliteConnectOptions, SqlitePool as Pool},
sqlite::{SqliteConnectOptions, SqliteConnection, SqlitePool as Pool},
};

use crate::Error;
Expand Down Expand Up @@ -62,48 +62,41 @@ impl Store {
impl Store {
/// Write tx_graph.
pub async fn write_tx_graph(
&self,
conn: &mut SqliteConnection,
tx_graph: &tx_graph::ChangeSet<ConfirmationBlockTime>,
) -> Result<(), Error> {
let txs = &tx_graph.txs;
let txouts = &tx_graph.txouts;
let anchors = &tx_graph.anchors;
let first_seen = &tx_graph.first_seen;
let last_seen = &tx_graph.last_seen;
let last_evicted = &tx_graph.last_evicted;

for tx in txs {
for tx in &tx_graph.txs {
let txid = tx.compute_txid();
sqlx::query(
"INSERT INTO tx(txid, tx) VALUES($1, $2) ON CONFLICT DO UPDATE SET tx = $2",
)
.bind(txid.to_string())
.bind(consensus::encode::serialize(tx))
.execute(&self.pool)
.execute(&mut *conn)
.await?;
}
for (txid, t) in first_seen {
for (txid, t) in &tx_graph.first_seen {
sqlx::query("INSERT INTO tx(txid, first_seen) VALUES($1, $2) ON CONFLICT DO UPDATE SET first_seen = $2")
.bind(txid.to_string())
.bind(i64::try_from(*t)?)
.execute(&self.pool)
.execute(&mut *conn)
.await?;
}
for (txid, t) in last_seen {
for (txid, t) in &tx_graph.last_seen {
sqlx::query("INSERT INTO tx(txid, last_seen) VALUES($1, $2) ON CONFLICT DO UPDATE SET last_seen = $2")
.bind(txid.to_string())
.bind(i64::try_from(*t)?)
.execute(&self.pool)
.execute(&mut *conn)
.await?;
}
for (txid, t) in last_evicted {
for (txid, t) in &tx_graph.last_evicted {
sqlx::query("INSERT INTO tx(txid, last_evicted) VALUES($1, $2) ON CONFLICT DO UPDATE SET last_evicted = $2")
.bind(txid.to_string())
.bind(i64::try_from(*t)?)
.execute(&self.pool)
.execute(&mut *conn)
.await?;
}
for (op, txout) in txouts {
for (op, txout) in &tx_graph.txouts {
let OutPoint { txid, vout } = op;
let TxOut {
value,
Expand All @@ -114,18 +107,18 @@ impl Store {
.bind(vout)
.bind(i64::try_from(value.to_sat())?)
.bind(script_pubkey.to_bytes())
.execute(&self.pool)
.execute(&mut *conn)
.await?;
}
for (anchor, txid) in anchors {
for (anchor, txid) in &tx_graph.anchors {
let BlockId { height, hash } = anchor.block_id;
let confirmation_time = anchor.confirmation_time;
sqlx::query("INSERT OR IGNORE INTO anchor(block_height, block_hash, txid, confirmation_time) VALUES($1, $2, $3, $4)")
.bind(height)
.bind(hash.to_string())
.bind(txid.to_string())
.bind(i64::try_from(confirmation_time)?)
.execute(&self.pool)
.execute(&mut *conn)
.await?;
}

Expand All @@ -134,7 +127,7 @@ impl Store {

/// Write local_chain.
pub async fn write_local_chain(
&self,
conn: &mut SqliteConnection,
local_chain: &local_chain::ChangeSet,
) -> Result<(), Error> {
for (&height, hash) in &local_chain.blocks {
Expand All @@ -143,13 +136,13 @@ impl Store {
sqlx::query("INSERT OR IGNORE INTO block(height, hash) VALUES($1, $2)")
.bind(height)
.bind(hash.to_string())
.execute(&self.pool)
.execute(&mut *conn)
.await?;
}
None => {
sqlx::query("DELETE FROM block WHERE height = $1")
.bind(height)
.execute(&self.pool)
.execute(&mut *conn)
.await?;
}
}
Expand All @@ -160,7 +153,7 @@ impl Store {

/// Write keychain_txout.
pub async fn write_keychain_txout(
&self,
conn: &mut SqliteConnection,
keychain_txout: &keychain_txout::ChangeSet,
) -> Result<(), Error> {
for (descriptor_id, last_revealed) in &keychain_txout.last_revealed {
Expand All @@ -169,7 +162,7 @@ impl Store {
)
.bind(descriptor_id.to_string())
.bind(last_revealed)
.execute(&self.pool)
.execute(&mut *conn)
.await?;
}
for (descriptor_id, spk_cache) in &keychain_txout.spk_cache {
Expand All @@ -180,7 +173,7 @@ impl Store {
.bind(descriptor_id.to_string())
.bind(*derivation_index)
.bind(script.to_bytes())
.execute(&self.pool)
.execute(&mut *conn)
.await?;
}
}
Expand All @@ -189,12 +182,14 @@ impl Store {
}

/// Read tx_graph.
pub async fn read_tx_graph(&self) -> Result<tx_graph::ChangeSet<ConfirmationBlockTime>, Error> {
pub async fn read_tx_graph(
conn: &mut SqliteConnection,
) -> Result<tx_graph::ChangeSet<ConfirmationBlockTime>, Error> {
let mut changeset = tx_graph::ChangeSet::default();

let rows: Vec<TxRow> =
sqlx::query_as("SELECT txid, tx, first_seen, last_seen, last_evicted FROM tx")
.fetch_all(&self.pool)
.fetch_all(&mut *conn)
.await?;
for row in rows {
let txid: Txid = row.txid.parse()?;
Expand All @@ -216,7 +211,7 @@ impl Store {
}

let rows = sqlx::query("SELECT txid, vout, value, script FROM txout")
.fetch_all(&self.pool)
.fetch_all(&mut *conn)
.await?;
for row in rows {
let txid: String = row.get("txid");
Expand All @@ -236,7 +231,7 @@ impl Store {

let rows =
sqlx::query("SELECT block_height, block_hash, txid, confirmation_time FROM anchor")
.fetch_all(&self.pool)
.fetch_all(&mut *conn)
.await?;
for row in rows {
let height: u32 = row.get("block_height");
Expand All @@ -256,11 +251,13 @@ impl Store {
}

/// Read local_chain.
pub async fn read_local_chain(&self) -> Result<local_chain::ChangeSet, Error> {
pub async fn read_local_chain(
conn: &mut SqliteConnection,
) -> Result<local_chain::ChangeSet, Error> {
let mut changeset = local_chain::ChangeSet::default();

let rows = sqlx::query("SELECT height, hash FROM block")
.fetch_all(&self.pool)
.fetch_all(&mut *conn)
.await?;
for row in rows {
let height: u32 = row.get("height");
Expand All @@ -273,11 +270,13 @@ impl Store {
}

/// Read keychain_txout.
pub async fn read_keychain_txout(&self) -> Result<keychain_txout::ChangeSet, Error> {
pub async fn read_keychain_txout(
conn: &mut SqliteConnection,
) -> Result<keychain_txout::ChangeSet, Error> {
let mut changeset = keychain_txout::ChangeSet::default();

let rows = sqlx::query("SELECT descriptor_id, last_revealed FROM keychain_last_revealed")
.fetch_all(&self.pool)
.fetch_all(&mut *conn)
.await?;
for row in rows {
let descriptor_id: String = row.get("descriptor_id");
Expand All @@ -289,7 +288,7 @@ impl Store {
let rows = sqlx::query(
"SELECT descriptor_id, derivation_index, script FROM keychain_script_pubkey",
)
.fetch_all(&self.pool)
.fetch_all(&mut *conn)
.await?;

for row in rows {
Expand Down Expand Up @@ -338,18 +337,25 @@ mod test {

let store = Store::new_memory().await?;
store.migrate().await?;
store
.write_local_chain(&cs)
.await
.expect("failed to write `local_chain`");

{
let mut txn = store.pool.begin().await?;
Store::write_local_chain(&mut txn, &cs)
.await
.expect("failed to write `local_chain`");
txn.commit().await?;
}

// Trying to replace the value of existing height should be ignored.
cs.blocks.insert(1, Some(Hash::hash(b"1a")));

store
.write_local_chain(&cs)
.await
.expect("failed to write `local_chain`");
{
let mut txn = store.pool.begin().await?;
Store::write_local_chain(&mut txn, &cs)
.await
.expect("failed to write `local_chain`");
txn.commit().await?;
}

let rows = sqlx::query("SELECT height, hash FROM block WHERE height = 1")
.fetch_all(&store.pool)
Expand All @@ -365,16 +371,24 @@ mod test {
// Delete row 1 and insert hash "1a" again.
let mut cs = local_chain::ChangeSet::default();
cs.blocks.insert(1, None);
store
.write_local_chain(&cs)
.await
.expect("failed to write `local_chain`");

{
let mut txn = store.pool.begin().await?;
Store::write_local_chain(&mut txn, &cs)
.await
.expect("failed to write `local_chain`");
txn.commit().await?;
}

cs.blocks.insert(1, Some(Hash::hash(b"1a")));
store
.write_local_chain(&cs)
.await
.expect("failed to write `local_chain`");

{
let mut txn = store.pool.begin().await?;
Store::write_local_chain(&mut txn, &cs)
.await
.expect("failed to write `local_chain`");
txn.commit().await?;
}

let rows = sqlx::query("SELECT height, hash FROM block WHERE height = 1")
.fetch_all(&store.pool)
Expand Down
Loading