Message Encoding and Peer to Peer

16 minute read
Subscribe to the mailing list!

Project: Let's build Bitcoin

In the previous post, we explained what the proof of work and the mining process are.

In this post, we are going to talk about the following:

  • What is a peer-to-peer network (P2P)
  • How does the bitcoin network work
  • How does a node discover other peers
  • How to handle the connection to the peer nodes
  • Define the basic protocol for LearnCoin

What is a peer-to-peer network?

A peer-to-peer (P2P), network is a network architecture where all the participants are equals, i.e., peers. There is no server or client, nor any central place of authority.

The largest and the most successful application of P2P architecture is file sharing. BitTorrent is the most recent evolution of the architecture.

Message encoding

Before we discuss other concepts, let’s talk about the encoding of the messages used in the protocol. The nodes talk to each other via a TCP connection by sending payloads to each other.

A payload is a variable-size message containing information that a node wants to communicate to its peer. The node also sends a header message that includes metadata about the payload, for example, the size of the payload.

The implementation of the decoder is more straightforward if it knows the size of the payload upfront. For example, it knows the number of bytes required to decode the payload. If the node hasn’t received enough bytes from the peer, it shouldn’t attempt to deserialize the message.

We will be using serde and bincode libraries to encode/decode messages. Let’s first add these dependencies.

# learncoin/src/Cargo.toml

[dependencies]
# ...
bincode = "1.3.3"
serde = {version = "1.0.130", features = ["derive"]}

Most languages have libraries to encode/decode data as JSON. If you follow the implementation in a different programming language, search for a library that can help you with the encoding.

Let’s define the API of the header and payload messages. The payload message is an enum, also known as a variant in some other languages.

// learncoin/src/peer_message.rs

/// Metadata about the MessagePayload.
#[derive(Copy, Clone, Serialize, Deserialize)]
pub struct PeerMessageHeader {
    payload_size: u32,
}

impl PeerMessageHeader {
    pub const SIZE: usize = std::mem::size_of::<PeerMessageHeader>();

    pub fn new(payload_size: u32) -> Self {
        Self { payload_size }
    }

    pub fn payload_size(&self) -> u32 {
        self.payload_size
    }
}

/// Payload sent to and received from the peers.
#[derive(Serialize, Deserialize, Clone)]
pub enum PeerMessagePayload {
    // Intentionally left empty until we implement the protocol.
}

Note that we’ve introduced Serialize and Deserialize annotations in our messages, which provide default implementations for these traits. As a result, we get encoding and decoding for free.

Let’s define a new trait to use for encoding and decoding.

// learncoin/src/peer_message.rs

/// An API to encode and decode peer messages.
pub trait PeerMessageEncoding<T> {
    /// Encodes the message into the buffer.
    /// Returns a successful result, or a string describing the error.
    fn encode(&self, buffer: &mut [u8]) -> Result<(), String>;

    /// Returns the size of the encoded message.
    fn encoded_size(&self) -> Result<u64, String>;

    /// Decodes the message from the buffer.
    /// Returns the decoded message or a string describing the error.
    fn decode(buffer: &[u8]) -> Result<T, String>;
}

impl PeerMessageEncoding<PeerMessageHeader> for PeerMessageHeader {
    fn encode(&self, buffer: &mut [u8]) -> Result<(), String> {
        bincode::serialize_into(buffer, self).map_err(|e| e.to_string())
    }

    fn encoded_size(&self) -> Result<u64, String> {
        bincode::serialized_size(self).map_err(|e| e.to_string())
    }

    fn decode(buffer: &[u8]) -> Result<Self, String> {
        bincode::deserialize::<Self>(buffer).map_err(|e| e.to_string())
    }
}

impl PeerMessageEncoding<PeerMessagePayload> for PeerMessagePayload {
    fn encode(&self, buffer: &mut [u8]) -> Result<(), String> {
        bincode::serialize_into(buffer, self).map_err(|e| e.to_string())
    }

    fn encoded_size(&self) -> Result<u64, String> {
        bincode::serialized_size(self).map_err(|e| e.to_string())
    }

    fn decode(buffer: &[u8]) -> Result<Self, String> {
        bincode::deserialize::<Self>(buffer).map_err(|e| e.to_string())
    }
}

We are hiding the bincode details behind the PeerMessageEncoding API, which will make it easier to implement our encoding in the future. For the time being, we are using bincode::serialize and bincode::deserialize to encode and decode the messages.

Implement Peer Connection

A node must establish a connection with its peers. Therefore, let’s develop an API that allows nodes to communicate with each other via TCP.

The API should provide methods to send and receive messages. To avoid starting a new thread for each peer connection, we will use the non-blocking API.

The connection can be established in two ways:

  • A node wants to connect to a peer given the peer address.
  • A node listens for new peer connections at a predefined port.
// learncoin/src/peer_connection.rs

/// A TCP connection to the peer in the LearnCoin network.
pub struct PeerConnection {
    peer_address: String,
    tcp_stream: TcpStream,
    // An implementation detail of the receive method.
    buffer: FlipBuffer,
}

impl PeerConnection {
    /// Establishes a TCP connection with a peer at the given address.
    pub fn connect(peer_address: String, recv_buffer_size: usize) -> Result<Self, String> {
        let mut tcp_stream = TcpStream::connect(&peer_address).map_err(|e| e.to_string())?;
        tcp_stream
            .set_nonblocking(true)
            .map_err(|e| e.to_string())?;
        Ok(Self {
            peer_address,
            tcp_stream,
            buffer: FlipBuffer::new(recv_buffer_size),
        })
    }

    /// Creates a PeerConnection from the already established TCP connection.
    /// A common use-case is a TCP connection with a peer established by listening for
    /// new TCP connections.
    pub fn from_established_tcp(
        address: SocketAddr,
        tcp_stream: TcpStream,
        recv_buffer_size: usize,
    ) -> Self {
        Self {
            peer_address: address.to_string(),
            tcp_stream,
            buffer: FlipBuffer::new(recv_buffer_size),
        }
    }
}

Now that we can establish a connection to a peer, let’s add logic to receive data. Each peer sends a header message followed by a payload message. If this is not the case, the peer is misbehaving.

We are going to implement the following algorithm:

  1. Read data from the TCP socket into the local buffer. New data is appended to the buffer, it doesn’t overwrite it.
  2. Try to parse the header message, starting at the beginning of the buffer. If there is not enough data to parse the header, return none.
  3. Try to parse the payload message, starting at the index at the end of the header message. If there is not enough data to parse the payload, return none.
  4. At this point, our logic was able to parse both the header and the payload. The parsed payload is what the function returns, and the consumed data is cleared. Note that we didn’t clear the buffer on step 2) after successfully parsing the header. The reason is that we need to keep the header to know how much data is required to parse the payload next time the method is called.
  5. Note that the TCP socket may read an arbitrary number of bytes that doesn’t match the boundaries of the messages. Therefore, there are remaining bytes in the buffer after clearing the used data. We will move the remaining bytes to the beginning to make sure the next read doesn’t overflow.

The diagram showing the algorithm to read data from the network

The above diagram shows the algorithm to read data from the network.

Let’s have a closer look at how the buffer stores the data, what happens when data is consumed and finally what does a flip do.

Reading data from the network into the flip buffer

The above diagram shows how the read buffer changes.

There are other possible implementations, but I chose this one because it seemed simple. The header message is parsed twice in the worst case, which we don’t necessarily have to do. However, performance is not the goal, so we lean towards simplicity. Furthermore, I would not optimize this even if we cared about performance before seeing the data that shows this is a problem.

With that said, let’s move forward and implement the receive method. We are going to flip the buffer at the beginning of the function since it’s more convenient and it’s equivalent to the algorithm described above.

// learncoin/src/peer_connection.rs

impl PeerConnection {
    // ...

    /// Attempts to read a new payload from the peer.
    /// Returns Ok(Some(payload)) if the payload exists, Ok(None) if there are no new payloads,
    /// or a string describing the error.
    /// For example, an error may happen if the connection to the peer has been lost.
    pub fn receive(&mut self) -> Result<Option<PeerMessagePayload>, String> {
        // Ensure that the buffer data is at the beginning so it can never overflow.
        self.buffer.flip();

        self.read()?;
        match self.decode_header()? {
            None => Ok(None),
            Some(header) => match self.decode_payload(header.payload_size())? {
                None => Ok(None),
                Some(payload) => {
                    // Now that we have decoded the payload, we can drop the used data from
                    // the buffer.
                    self.buffer
                        .consume_data(PeerMessageHeader::SIZE + header.payload_size() as usize);
                    Ok(Some(payload))
                }
            },
        }
    }
}

What is the flip buffer? It’s a data structure that stores unused data and provides an API to move the unused data to the beginning of the underlying buffer. Let’s see how it’s implemented.

// learncoin/src/flip_buffer.rs

pub struct FlipBuffer {
    buffer: Vec<u8>,
    start_index: usize,
    end_index: usize,
}

impl FlipBuffer {
    pub fn new(capacity: usize) -> Self {
        let mut data = Vec::<u8>::new();
        data.resize(capacity, 0);
        Self {
            buffer: Vec::with_capacity(capacity),
            start_index: 0,
            end_index: 0,
        }
    }

    /// Moves the data to the beginning of the buffer, which frees the consumed space.
    pub fn flip(&mut self) {
        self.buffer.copy_within(self.start_index..self.end_index, 0);
        self.start_index -= self.start_index;
        self.end_index = 0;
    }

    /// Returns the mutable slice from the underlying buffer that comes immediately after
    /// all unconsumed data.
    /// When the data is written to the slice, the buffer must be notified via `consume_free_space`
    /// method.
    pub fn free_space_slice_mut(&mut self) -> &mut [u8] {
        &mut self.buffer[self.end_index..]
    }

    /// Returns the number of bytes that can be written to the buffer.
    pub fn free_space_size(&self) -> usize {
        self.buffer.capacity() - self.data().len()
    }

    /// Returns the slice of the unconsumed data.
    pub fn data(&self) -> &[u8] {
        &self.buffer[self.start_index..self.end_index]
    }

    /// Consumes the given number of bytes from the data.
    /// Note that the consumed bytes are not freed and can't be used unless the flip() method is called.
    ///
    /// Preconditions:
    ///   - Number of consumed bytes must be <= size of the data().
    pub fn consume_data(&mut self, num_bytes: usize) {
        self.start_index += num_bytes;
        assert!(self.start_index <= self.end_index);
    }

    /// Moves part of the free space into the unconsumed data.
    pub fn consume_free_space(&mut self, num_bytes: usize) {
        self.end_index += num_bytes
    }
}

Please let me know in the comments if there’s anything that’s not clear.

In the future, we may implement Read and Write traits for the flip buffer to read directly into it and won’t have to worry about moving data at the higher level.

The next step is to implement the function to receive all the messages from the peer, which is easy given that we know how to read a single message from the peer, so I’ll leave that to you as an exercise.

How does the Bitcoin network work?

The term Bitcoin network refers to the collection of nodes running the bitcoin P2P protocol. There are other lightweight protocols that Bitcoin uses for mining and mobile wallets, but we will only focus on P2P. Furthermore, bitcoin has a so-called Bitcoin Relay Network used to propagate the proof-of-work messages as quickly as possible since they are latency-sensitive. Bitcoin Relay Network was replaced with the introduction of FIBRE in 2016.

Bitcoin nodes both provide and consume services, which we will discuss in the next blog post when we implement the full LearnCoin node.

How does a node discover other peers?

When a node starts, it must discover other nodes on the network to participate. It must find at least one existing node on the network and connect to it. The node can be chosen at random.

One option is to query several DNS servers that provide a list of IP addresses that run bitcoin servers.

A second option is to provide the node with the IP address of at least one peer to which it connects. The node can then find out about other peers through further introductions.

In our implementation, we will provide the node with a list of peers to connect to. Initially, we will not change the peers, which may lead to problems if all peers disconnect. However, we will fix this issue in future blog posts.

Implement LearnCoin network

We have implemented the API that allows a node to communicate with a single peer. However, LearnCoin is a distributed system, so we need an API to send and receive messages to and from the whole network.

Network configuration

Let’s start with introducing the basic network configuration that each node would take at startup. A node needs to know its own address (hostname and port) and the list of peers to connect to.

// learncoin/src/learncoin_network.rs

pub struct NetworkParams {
    // Address at which TCP server (which listens for peer connections) runs.
    server_address: String,
    // List of peer addresses to connect to.
    peers: Vec<String>,    // Size of the receive buffer for each peer connection.    recv_buffer_size: usize,
}

Connect to peers

The next step is to connect to the network. Let’s implement a function that connects to all the peers, known at startup, and starts a TCP server that listens for incoming peer connections. Same as with the peer connection, the TCP server will use the non-blocking API.

// learncoin/src/learncoin_network.rs

pub struct LearnCoinNetwork {
    params: NetworkParams,
    // A list of all peer connections known to this node.
    peer_connections: Vec<PeerConnection>,
    tcp_listener: TcpListener,        inactive_peers: HashSet<String>,
}

impl LearnCoinNetwork {
    /// Connects to the LearnCoin network.
    /// This function starts a TCP server that listens to new incoming peer connections,
    /// and it connects to the peers specified in the network params.
    pub fn connect(params: NetworkParams, rect_buffer_size: usize) -> Result<Self, String> {
        let tcp_listener = TcpListener::bind(&params.server_address).map_err(|e| e.to_string())?;
        tcp_listener
            .set_nonblocking(true)
            .map_err(|e| e.to_string())?;

        let mut peer_connections = Vec::new();
        for address in &params.peers {
            let peer_connection = PeerConnection::connect(address.clone(), rect_buffer_size)?;
            peer_connections.push(peer_connection);
        }
        Ok(Self {
            params,
            peer_connections,
            tcp_listener,
            inactive_peers: HashSet::new(),
        })
    }
}

Accept new peers

The LearnCoin network provides the async (non-blocking) API, so it has to provide a method to accept new peers. The method checks if any new peer connections are waiting to be accepted. If so, new peers are added to the network.

// learncoin/src/learncoin_network.rs

impl LearnCoinNetwork {
    // ...

    /// Accepts new incoming peer connections and adds them to the network.
    pub fn accept_new_peers(&mut self) -> Result<Vec<String>, String> {
        let mut new_peers = vec![];
        loop {
            match self.tcp_listener.accept() {
                Ok((tcp_stream, socket_address)) => {
                    new_peers.push(socket_address.to_string());
                    self.on_new_peer_connected(socket_address, tcp_stream);
                }
                Err(e) => match e.kind() {
                    ErrorKind::WouldBlock => {
                        // No new peers are awaiting.
                        break;
                    }
                    _ => {
                        return Err(e.to_string());
                    }
                },
            }
        }
        Ok(new_peers)
    }
}

We are not done yet. A new peer connection must be added to the network when it connects.

// learncoin/src/learncoin_network.rs

impl LearnCoinNetwork {
    // ...

    fn on_new_peer_connected(&mut self, socket_address: SocketAddr, tcp_stream: TcpStream) {
        let peer_connection = PeerConnection::from_established_tcp(
            socket_address,
            tcp_stream,
            self.params.recv_buffer_size,
        );
        self.peer_connections.push(peer_connection);
    }
}

Hopefully, the above makes sense. Let me know in the comments if you have any questions.

Drop inactive peer connections

Let’s implement a counterpart that drops inactive peer connections. Inactive peer connections are the ones that have caused an error while reading from or writing to them.

// learncoin/src/learncoin_network.rs

impl LearnCoinNetwork {
    // ...

    /// Forgets about all the peers that caused an error while reading or writing data.
    pub fn drop_inactive_peers(&mut self) {
        let Self {
            peer_connections,
            inactive_peers,
            ..
        } = self;
        for peer_address in inactive_peers.iter() {
            Self::drop_connection(peer_connections, peer_address);
        }
        self.inactive_peers.clear();
    }


    fn drop_connection(peer_connections: &mut Vec<PeerConnection>, dropped_peer_address: &str) {
        for i in 0..peer_connections.len() {
            let peer_connection = peer_connections.get(i).unwrap();
            if peer_connection.peer_address() == dropped_peer_address {
                // It is sufficient to remove the connection because Rust automatically closes
                // the TCP connection when the object is destroyed.
                peer_connection.peer_connections.remove(i);
                break;
            }
        }
    }
}

Close connection

In some cases, the node may want to close the connection to the peer. For example, if the handshake fails due to incompatible versions.

// learncoin/src/learncoin_network.rs

impl LearnCoinNetwork {
    // ...

    pub fn close_peer_connection(&mut self, peer_address: &str) {
        Self::drop_connection(&mut self.peer_connections, peer_address)
    }
}

The reason for extracting the logic to drop connection into a separate function is to avoid borrowing “self” multiple times, which the Rust compiler doesn’t like.

Receive messages

We are now going to implement a function that receives all messages from a peer. If reading from the peer fails, we mark the peer as inactive because any errors are non-recoverable.

// learncoin/src/learncoin_network.rs

impl LearnCoinNetwork {
    // ...

    /// Receives all the messages from the peer connection.
    /// If the read fails, the peer connection is scheduled to be dropped next time
    /// `drop_inactive_peers` is called.
    fn receive_all_from_peer(
        inactive_peers: &mut HashSet<String>,
        peer_connection: &mut PeerConnection,
    ) -> Vec<PeerMessagePayload> {
        match peer_connection.receive_all() {
            Ok(messages) => messages,
            Err(e) => {
                eprintln!("{}", e);
                inactive_peers.insert(peer_connection.peer_address().to_string());
                vec![]
            }
        }
    }
}

Great, we are making some good progress!

It would be very convenient if the network provided an API to get all the messages across all the peers in the network.

// learncoin/src/learncoin_network.rs

impl LearnCoinNetwork {
    // ...

    /// Receives all payloads from the network.
    pub fn receive_all(&mut self) -> Vec<(String, Vec<PeerMessagePayload>)> {
        let Self {
            peer_connections,
            inactive_peers,
            ..
        } = self;

        let mut all_messages = vec![];
        for peer_connection in peer_connections {
            all_messages.push((
                peer_connection.peer_address().to_string(),
                Self::receive_all_from_peer(inactive_peers, peer_connection),
            ));
        }
        all_messages
    }
}

We’re almost there. We have completed the logic to receive messages from the whole network.

Send messages

The next step is to provide functionality for sending data. Unlike receive all, sending data doesn’t need a batch-like API.

Let’s start with logic to send data to a single peer. Then we can implement the other functions on top of that one. Like the receive method, the network drops the peer connection if there is an error while sending the message.

// learncoin/src/learncoin_network.rs

impl LearnCoinNetwork {
    // ...
   
  /// Sends the payload to the given peer connection.
    ///
    /// The payload may not be sent due to the flow-control.
    /// If there is an error while writing to the peer or the peer's receive buffer is full,
    /// i.e. the flow control pushes back, the peer connection is marked as inactive.
    /// It is dropped next time `drop_inactive_peers` is called.
    fn send_to_peer_connection(
        peer_connection: &mut PeerConnection,
        payload: &PeerMessagePayload,
        inactive_peers: &mut HashSet<String>,
    ) {
        match peer_connection.send(payload) {
            Ok(true) => (),
            Ok(false) => {
                inactive_peers.insert(peer_connection.peer_address().to_string());
                eprintln(format!(
                    "Flow-control backoff while sending a message to: {}",
                    peer_connection.peer_address()
                ));
            }
            Err(error) => {
                inactive_peers.insert(peer_connection.peer_address().to_string());
                eprintln(format!(
                    "Error while trying to send payload: {:#?}. Reason: {}",
                    error.to_string()
                ));
            }
        }
    }
}

send_to_peer_connection doesn’t take a reference to self to avoid borrowing the mutable reference while also borrowing the immutable one – this would make the compiler unhappy.

Let’s provide the API that uses the network to send a message to the given peer and the convenience API to send the payload to all peers.

// learncoin/src/learncoin_network.rs

impl LearnCoinNetwork {
    // ...

    /// Sends the payload to the peer.
    /// If send fails or the flow-control pushes back, mark the peer as inactive.
    pub fn send(&mut self, peer_address: &str, payload: &PeerMessagePayload) {
        let Self {
            peer_connections,
            inactive_peers,
            ..
        } = self;
        for connection in peer_connections {
            if connection.peer_address() == peer_address {
                Self::send_to_peer_connection(connection, &payload, inactive_peers);
                return;
            }
        }
        panic!(format!("Called send for unknown peer: {}", peer_address));
    }

    /// Sends the payload to all peers.
    /// See docs for `Self::send`.
    pub fn send_to_all(&mut self, payload: &PeerMessagePayload) {
        let Self {
            peer_connections,
            inactive_peers,
            ..
        } = self;
        for connection in peer_connections {
            Self::send_to_peer_connection(connection, &payload, inactive_peers);
        }
    }
}

Summary

In this post, we talked about P2P networks and protocol encoding. We’ve implemented abstractions for peers to connect to and communicate with each other.

Here’s the full source code.

What’s next?

In the next post, we will talk about node roles and the handshake protocol.

comments powered by Disqus

Join my mailing list

Subscribe to get new notifications about new content. It takes time to write quality posts, so you can expect at most 1-2 emails per month.