Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
765 changes: 723 additions & 42 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions crates/common/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ version.workspace = true
thiserror.workspace = true
serde.workspace = true
ethereum-types.workspace = true

ethereum_ssz_derive = "0.10.0"
ethereum_ssz = "0.10.0"
ssz_types = "0.14.0"
tree_hash = "0.12.0"
tree_hash_derive = "0.12.0"
2 changes: 1 addition & 1 deletion crates/common/types/src/block.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use ethereum_types::H256;
use crate::primitives::H256;

use crate::state::Slot;

Expand Down
1 change: 1 addition & 0 deletions crates/common/types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod block;
pub mod genesis;
pub mod primitives;
pub mod state;
3 changes: 3 additions & 0 deletions crates/common/types/src/primitives.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use tree_hash::Hash256;

pub type H256 = Hash256;
30 changes: 22 additions & 8 deletions crates/common/types/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use ethereum_types::{H256, U256};
use serde::{Deserialize, Serialize};
use ssz_derive::{Decode, Encode};
use tree_hash_derive::TreeHash;

use crate::{block::BlockHeader, genesis::Genesis};
use crate::{block::BlockHeader, genesis::Genesis, primitives::H256};

#[derive(Debug)]
pub struct Slot(u64);
Expand Down Expand Up @@ -39,23 +40,36 @@ impl State {
latest_block_header: BlockHeader {
slot: Slot(0),
proposer_index: 0,
parent_root: H256::zero(),
state_root: H256::zero(),
parent_root: H256::ZERO,
state_root: H256::ZERO,
// TODO: this should be the hash_tree_root of an empty block body
body_root: H256::zero(),
body_root: H256::ZERO,
},
latest_justified: genesis.latest_justified.clone(),
latest_finalized: genesis.latest_finalized.clone(),
}
}
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Encode, Decode, TreeHash)]
pub struct Checkpoint {
pub root: H256,
// Used U256 due to it being serialized as string
// TODO: use u64 and implement custom serialization
pub slot: U256,
#[serde(deserialize_with = "deser_dec_str")]
pub slot: u64,
}

// Taken from ethrex-common
pub fn deser_dec_str<'de, D>(d: D) -> Result<u64, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;

let value = String::deserialize(d)?;
value
.parse()
.map_err(|_| D::Error::custom("Failed to deserialize u64 value"))
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
7 changes: 7 additions & 0 deletions crates/net/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,12 @@ ethrex-p2p = { git = "https://github.com/lambdaclass/ethrex", rev = "1af63a4de7c
ethrex-rlp = { git = "https://github.com/lambdaclass/ethrex", rev = "1af63a4de7c93eb7413b9b003df1be82e1484c69" }
ethrex-common = { git = "https://github.com/lambdaclass/ethrex", rev = "1af63a4de7c93eb7413b9b003df1be82e1484c69" }

# SSZ
ethereum_ssz_derive = "0.10.0"
ethereum_ssz = "0.10.0"
ssz_types = "0.14.0"
tree_hash = "0.12.0"
tree_hash_derive = "0.12.0"

[dev-dependencies]
hex = "0.4"
62 changes: 50 additions & 12 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use libp2p::{
};
use tracing::{info, trace};

use crate::messages::status::STATUS_PROTOCOL_V1;
use crate::messages::status::{STATUS_PROTOCOL_V1, Status};

mod messages;

Expand Down Expand Up @@ -110,25 +110,63 @@ struct Behaviour {
async fn event_loop(mut swarm: libp2p::Swarm<Behaviour>) {
while let Some(event) = swarm.next().await {
match event {
SwarmEvent::Behaviour(BehaviourEvent::ReqResp(Event::Message {
peer,
connection_id,
message:
Message::Request {
request_id,
request,
channel,
},
})) => {
info!(finalized_slot=%request.finalized.slot, head_slot=%request.head.slot, "Received status request from peer {peer}");
SwarmEvent::Behaviour(BehaviourEvent::ReqResp(message @ Event::Message { .. })) => {
handle_req_resp_message(&mut swarm, message).await;
}
// SwarmEvent::Behaviour(BehaviourEvent::ReqResp(Event::Message {
// peer,
// connection_id,
// message:
// Message::Request {
// request_id,
// request,
// channel,
// },
// })) => {
// info!(finalized_slot=%request.finalized.slot, head_slot=%request.head.slot, "Received status request from peer {peer}");
// }
_ => {
trace!(?event, "Ignored swarm event");
}
}
}
}

async fn handle_req_resp_message(
swarm: &mut libp2p::Swarm<Behaviour>,
event: Event<Status, Status>,
) {
let Event::Message {
peer,
connection_id: _,
message,
} = event
else {
unreachable!("we already matched on event_loop");
};
match message {
Message::Request {
request_id: _,
request,
channel,
} => {
info!(finalized_slot=%request.finalized.slot, head_slot=%request.head.slot, "Received status request from peer {peer}");
swarm
.behaviour_mut()
.req_resp
.send_response(channel, request.clone())
.unwrap();
swarm.behaviour_mut().req_resp.send_request(&peer, request);
}
Message::Response {
request_id: _,
response,
} => {
info!(finalized_slot=%response.finalized.slot, head_slot=%response.head.slot, "Received status response from peer {peer}");
}
}
}

pub struct Bootnode {
ip: IpAddr,
quic_port: u16,
Expand Down
5 changes: 5 additions & 0 deletions crates/net/p2p/src/messages/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
pub mod status;

const MAX_PAYLOAD_SIZE: usize = 10 * 1024 * 1024; // 10 MB

// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#max_message_size
const MAX_COMPRESSED_PAYLOAD_SIZE: usize = 32 + MAX_PAYLOAD_SIZE + MAX_PAYLOAD_SIZE / 6 + 1024; // ~12 MB
135 changes: 85 additions & 50 deletions crates/net/p2p/src/messages/status.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use std::io;

use ethlambda_types::state::Checkpoint;
use ethrex_common::{H256, U256};
use libp2p::futures::{AsyncRead, AsyncReadExt, AsyncWrite};
use libp2p::futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use snap::read::FrameEncoder;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use tracing::trace;

use crate::messages::MAX_COMPRESSED_PAYLOAD_SIZE;

pub const STATUS_PROTOCOL_V1: &str = "/leanconsensus/req/status/1/ssz_snappy";

Expand Down Expand Up @@ -32,6 +37,17 @@ impl libp2p::request_response::Codec for StatusCodec {
where
T: AsyncRead + Unpin + Send,
{
let mut result = 0_u8;
io.read_exact(std::slice::from_mut(&mut result)).await?;

// TODO: send errors to event loop?
if result != 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"non-zero result in response",
));
}

let payload = decode_payload(io).await?;
let status = deserialize_payload(payload)?;
Ok(status)
Expand All @@ -46,7 +62,18 @@ impl libp2p::request_response::Codec for StatusCodec {
where
T: AsyncWrite + Unpin + Send,
{
todo!();
trace!(?req, "Writing status request");

let encoded = req.as_ssz_bytes();
let mut compressor = FrameEncoder::new(&encoded[..]);

let mut buf = Vec::new();
io::Read::read_to_end(&mut compressor, &mut buf)?;

let mut size_buf = [0; 5];
let varint_buf = encode_varint(buf.len() as u32, &mut size_buf);
io.write(varint_buf).await?;
io.write(&buf).await?;

Ok(())
}
Expand All @@ -60,7 +87,19 @@ impl libp2p::request_response::Codec for StatusCodec {
where
T: AsyncWrite + Unpin + Send,
{
todo!();
// Send result byte
io.write(&[0]).await?;

let encoded = resp.as_ssz_bytes();
let mut compressor = FrameEncoder::new(&encoded[..]);

let mut buf = Vec::new();
io::Read::read_to_end(&mut compressor, &mut buf)?;

let mut size_buf = [0; 5];
let varint_buf = encode_varint(buf.len() as u32, &mut size_buf);
io.write(varint_buf).await?;
io.write(&buf).await?;

Ok(())
}
Expand All @@ -70,12 +109,21 @@ async fn decode_payload<T>(io: &mut T) -> io::Result<Vec<u8>>
where
T: AsyncRead + Unpin + Send,
{
let mut varint_buf = [0; std::mem::size_of::<usize>()];
// TODO: limit bytes received
let mut varint_buf = [0; 5];

io.take(varint_buf.len() as u64)
let read = io
.take(varint_buf.len() as u64)
.read(&mut varint_buf)
.await?;
let (size, rest) = decode_varint(&varint_buf);
let (size, rest) = decode_varint(&varint_buf[..read])?;

if (size as usize) < rest.len() || size as usize > MAX_COMPRESSED_PAYLOAD_SIZE {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid message size",
));
}

let mut message = vec![0; size as usize];
if rest.is_empty() {
Expand All @@ -93,49 +141,30 @@ where
}

fn deserialize_payload(payload: Vec<u8>) -> io::Result<Status> {
if payload.len() != 80 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid status message length",
));
}

let finalized_root = H256(
payload[..32]
.try_into()
.expect("slice with incorrect length"),
);
let finalized_slot = u64::from_be_bytes(
payload[32..40]
.try_into()
.expect("slice with incorrect length"),
);

let head_root = H256(
payload[40..72]
.try_into()
.expect("slice with incorrect length"),
);
let head_slot = u64::from_be_bytes(
payload[72..]
.try_into()
.expect("slice with incorrect length"),
);

let status = Status {
finalized: Checkpoint {
root: finalized_root,
slot: U256::from(finalized_slot),
},
head: Checkpoint {
root: head_root,
slot: U256::from(head_slot),
},
};
let status = Status::from_ssz_bytes(&payload)
// We turn to string since DecodeError does not implement std::error::Error
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")))?;
Ok(status)
}

fn decode_varint(buf: &[u8]) -> (u32, &[u8]) {
/// Encodes a u32 as a varint into the provided buffer, returning a slice of the buffer
/// containing the encoded bytes.
fn encode_varint(mut value: u32, dst: &mut [u8; 5]) -> &[u8] {
for i in 0..5 {
let mut byte = (value & 0x7F) as u8;
value >>= 7;
if value != 0 {
byte |= 0x80;
}
dst[i] = byte;
if value == 0 {
return &dst[..=i];
}
}
&dst[..]
}

fn decode_varint(buf: &[u8]) -> io::Result<(u32, &[u8])> {
let mut result = 0_u32;
let mut read_size = 0;

Expand All @@ -147,10 +176,16 @@ fn decode_varint(buf: &[u8]) -> (u32, &[u8]) {
break;
}
}
(result, &buf[read_size..])
if read_size == 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"message size is bigger than 28 bits",
));
}
Ok((result, &buf[read_size..]))
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Encode, Decode)]
pub struct Status {
pub finalized: Checkpoint,
pub head: Checkpoint,
Expand All @@ -164,7 +199,7 @@ mod tests {
fn test_decode_varint() {
// Example from https://protobuf.dev/programming-guides/encoding/
let buf = [0b10010110, 0b00000001];
let (value, rest) = decode_varint(&buf);
let (value, rest) = decode_varint(&buf).unwrap();
assert_eq!(value, 150);

let expected: &[u8] = &[];
Expand Down