perf: add hash compact persist-delta bridge mode

This commit is contained in:
Youzini-afk
2026-04-13 16:37:19 +08:00
parent 8f9c0d0b98
commit 67cf5fe7fa
10 changed files with 489 additions and 70 deletions

View File

@@ -149,6 +149,15 @@ struct PersistCompactRecordSet {
serialized: Vec<String>,
}
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
struct PersistCompactHashRecordSet {
#[serde(default)]
ids: Vec<String>,
#[serde(default)]
hashes: Vec<u32>,
}
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
struct PersistCompactTombstoneSet {
@@ -160,6 +169,17 @@ struct PersistCompactTombstoneSet {
target_keys: Vec<String>,
}
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
struct PersistCompactHashTombstoneSet {
#[serde(default)]
ids: Vec<String>,
#[serde(default)]
hashes: Vec<u32>,
#[serde(default)]
target_keys: Vec<String>,
}
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
struct PersistDeltaCompactPayload {
@@ -177,6 +197,23 @@ struct PersistDeltaCompactPayload {
after_tombstones: PersistCompactTombstoneSet,
}
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
struct PersistDeltaCompactHashPayload {
#[serde(default)]
before_nodes: PersistCompactHashRecordSet,
#[serde(default)]
after_nodes: PersistCompactHashRecordSet,
#[serde(default)]
before_edges: PersistCompactHashRecordSet,
#[serde(default)]
after_edges: PersistCompactHashRecordSet,
#[serde(default)]
before_tombstones: PersistCompactHashRecordSet,
#[serde(default)]
after_tombstones: PersistCompactHashTombstoneSet,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct PersistDeltaIdResult {
@@ -285,6 +322,19 @@ fn build_json_serialized_index(records: &[JsonValue]) -> HashMap<String, String>
map
}
fn build_compact_hash_lookup<'a>(ids: &'a [String], hashes: &'a [u32]) -> HashMap<&'a str, u32> {
let mut map = HashMap::new();
let len = ids.len().min(hashes.len());
for index in 0..len {
let id = ids[index].trim();
if id.is_empty() {
continue;
}
map.insert(id, hashes[index]);
}
map
}
fn build_json_value_index(records: &[JsonValue]) -> HashMap<String, JsonValue> {
let mut map = HashMap::new();
for record in records {
@@ -605,6 +655,115 @@ fn solve_persist_delta_compact_in_rust(payload: PersistDeltaCompactPayload) -> P
}
}
fn solve_persist_delta_compact_hash_in_rust(
payload: PersistDeltaCompactHashPayload,
) -> PersistDeltaIdResult {
let before_node_hash_by_id =
build_compact_hash_lookup(&payload.before_nodes.ids, &payload.before_nodes.hashes);
let after_node_hash_by_id =
build_compact_hash_lookup(&payload.after_nodes.ids, &payload.after_nodes.hashes);
let before_edge_hash_by_id =
build_compact_hash_lookup(&payload.before_edges.ids, &payload.before_edges.hashes);
let after_edge_hash_by_id =
build_compact_hash_lookup(&payload.after_edges.ids, &payload.after_edges.hashes);
let before_tombstone_hash_by_id = build_compact_hash_lookup(
&payload.before_tombstones.ids,
&payload.before_tombstones.hashes,
);
let after_tombstone_target_key_by_id = build_compact_target_key_lookup(
&payload.after_tombstones.ids,
&payload.after_tombstones.target_keys,
);
let mut upsert_node_ids = Vec::new();
let after_node_len = payload
.after_nodes
.ids
.len()
.min(payload.after_nodes.hashes.len());
for index in 0..after_node_len {
let id = payload.after_nodes.ids[index].trim();
if id.is_empty() {
continue;
}
let hash = payload.after_nodes.hashes[index];
if before_node_hash_by_id.get(id) != Some(&hash) {
upsert_node_ids.push(id.to_string());
}
}
let mut upsert_edge_ids = Vec::new();
let after_edge_len = payload
.after_edges
.ids
.len()
.min(payload.after_edges.hashes.len());
for index in 0..after_edge_len {
let id = payload.after_edges.ids[index].trim();
if id.is_empty() {
continue;
}
let hash = payload.after_edges.hashes[index];
if before_edge_hash_by_id.get(id) != Some(&hash) {
upsert_edge_ids.push(id.to_string());
}
}
let mut delete_node_ids = Vec::new();
for id in &payload.before_nodes.ids {
let normalized_id = id.trim();
if normalized_id.is_empty() {
continue;
}
if !after_node_hash_by_id.contains_key(normalized_id) {
delete_node_ids.push(normalized_id.to_string());
}
}
let mut delete_edge_ids = Vec::new();
for id in &payload.before_edges.ids {
let normalized_id = id.trim();
if normalized_id.is_empty() {
continue;
}
if !after_edge_hash_by_id.contains_key(normalized_id) {
delete_edge_ids.push(normalized_id.to_string());
}
}
let mut upsert_tombstone_ids = Vec::new();
let after_tombstone_len = payload
.after_tombstones
.ids
.len()
.min(payload.after_tombstones.hashes.len());
for index in 0..after_tombstone_len {
let id = payload.after_tombstones.ids[index].trim();
if id.is_empty() {
continue;
}
let target_key = after_tombstone_target_key_by_id
.get(id)
.copied()
.unwrap_or_default();
if target_key.is_empty() {
continue;
}
let hash = payload.after_tombstones.hashes[index];
if before_tombstone_hash_by_id.get(id) != Some(&hash) {
upsert_tombstone_ids.push(id.to_string());
}
}
PersistDeltaIdResult {
upsert_node_ids,
upsert_edge_ids,
delete_node_ids,
delete_edge_ids,
upsert_tombstone_ids,
}
}
fn build_region_buckets(nodes: &[LayoutNode]) -> HashMap<String, Vec<usize>> {
let mut region_buckets = HashMap::new();
for (index, node) in nodes.iter().enumerate() {
@@ -849,3 +1008,13 @@ pub fn build_persist_delta_compact(payload: JsValue) -> Result<JsValue, JsValue>
JsValue::from_str(&format!("serialize compact persist result failed: {error}"))
})
}
#[wasm_bindgen]
pub fn build_persist_delta_compact_hash(payload: JsValue) -> Result<JsValue, JsValue> {
let parsed: PersistDeltaCompactHashPayload = serde_wasm_bindgen::from_value(payload)
.map_err(|error| JsValue::from_str(&format!("invalid hash compact persist payload: {error}")))?;
let solved = solve_persist_delta_compact_hash_in_rust(parsed);
serde_wasm_bindgen::to_value(&solved).map_err(|error| {
JsValue::from_str(&format!("serialize hash compact persist result failed: {error}"))
})
}