In part 3 we looked at leveraging trackers to obtain a list of peers serving a given torrent. In this post we’ll look at connecting to peers and figure out which ones have the blocks we require.

We’re building up from the go-bt repository. Clone it if you want to follow along!

I am using the Transmission client as by “validation” peer - I had moderate success with qBittorrent but it all came crashing down when it refused connections from localhost, which made testing more complicated than it should have been. It’s a shame because qBittorrent is very (too?) customisable and exposes many internals that are not always available elsewhere.

Peers representation

A tracker returns a list of peers as per the below:

/V/r/g/src ❯❯❯ curl -s -X GET 'https://torrent.ubuntu.com/announce?info_hash=A%E6%CDP%CC%ECU%CDW%04%C5%E3%D1v%E7%B5%93%17%A3%FB&peer_id=%FC%93%15%9A%3A%B0as%F2%91%A4-%7F%BE%3A%60%D2l74&port=6688&uploaded=0&downloaded=0&left=0' | go run ./main.go bencode -decode=- | head -20
{
  "complete": 713,
  "incomplete": 26,
  "interval": 1800,
  "peers": [
    {
      "ip": "2607:5300:60:623::1",
      "peer id": "-TR2940-nvogl7ewmfwf",
      "port": 51413
    },
    {
      "ip": "2001:41d0:2:94d1::1",
      "peer id": "-lt0D80-\u0016\ufffdO \u0019ڷ\ufffd\ufffd\ufffd o",
      "port": 6882
    },
    {
      "ip": "2a03:6880:10e7:2a00:c0ab:7cff:febd:274a",
      "peer id": "-TR3000-a0xk5a66l1xz",
      "port": 51413
    },

An IP (which could be IPv4 or IPv6) and a port is all we need.

Message formats

The message format used by the BitTorrent protocol is length-prefixed and generally defined as such:

length of the message # 4 bytes
message id # 1 byte
payload # could be empty!

The only exception to this is the keep-alive message which doesn’t have a message ID - it’s just 0x00 0x00 as the length (it’s a special case when we serialise it out - sigh).

We model this as such:

type Message struct {
	Length    [4]byte
	MessageId byte
	Payload   []byte
}

func (m *Message) ToBytes() []byte {
	buffer := new(bytes.Buffer)
	buffer.Write(m.Length[:])
	// handle the special keep-alive case
	if m.Length == [4]byte{0, 0, 0, 0} {
		return buffer.Bytes()
	}
	buffer.WriteByte(m.MessageId)
	buffer.Write(m.Payload)
	return buffer.Bytes()
}

We’ll only look at 2 messages in this post and leave the rest for the next one.

Keep-alive

This message has length set to zero and must be sent periodically to a peer if we want to maintain a connection despite not communication with said peer (perhaps it doesn’t have any of the blocks we’re interested in).

Creating one is dead easy - because all fields are 0-initialised:

keepaliveMsg := &Message{}

Bitfield

This message enables us to find out which blocks are available from the given peer (and for the peer to find out which blocks are available from us). The format of the bitfield message is as follows:

<len=0001+len(bitfield)> # in bytes
<id=5>
<bitfield>

Let’s work through an example. Suppose our torrent has 12 blocks - that would require a bitfield of 12 bits, which is 2 bytes. If all blocks were available we would send a total of 7 bytes. 4 for the length (that’s predefined in the spec), 1 for the message ID, and 2 for the bitfield (we can’t send 12 bits so we round this up to the next power of 2):

3 # 1 + 2 bytes, stored as 4 bytes
5 # the message id
0xff 0xf0 # unused bits are set to 0

If instead we were missing the first block, the bitfield would be 0x7f 0xf0 (0x7f is 0b01111111).

Here’s a live example - here the peer is telling us that all blocks are available:

2024/10/26 16:16:05 msg received: 5
2024/10/26 16:16:05 payload: [255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255]

That’s 23 x 255 - or 23 x 0xff for a total of 184 blocks, which makes the peer a seed.

For simplicity our bitfield will store the same byte array we receive from the peer (that is, `.Field := msg.Payload) - but let’s add two helper functions to make dealing with the raw bytes easier:

type BitField struct {
	NumBlocks uint64
	Field     []byte
}

func (b *BitField) HasBlock(idx uint64) bool {
	if idx > b.NumBlocks {
		panic(fmt.Sprintf("We only have %d blocks but requested block number %d", b.NumBlocks, idx))
	}

	// find the relevant byte
	byteIdx := idx / 8
	// blocks are 0-indexed
	offset := byte(1 << (8 - (idx % 8) - 1))
	return b.Field[byteIdx]&offset > 0
}

func (b *BitField) SetBlock(idx uint64) {
	if idx > b.NumBlocks {
		panic(fmt.Sprintf("We only have %d blocks but tried to set block number %d", b.NumBlocks, idx))
	}

	// find the relevant byte
	byteIdx := idx / 8
	// blocks are 0-indexed
	offset := byte(1 << (8 - (idx % 8) - 1))
	b.Field[byteIdx] |= offset
}

Establishing a connection to a peer and reading messages

The connection to a peer is both bidirectional and somewhat asynchronous. It’s not simply a case of sending a message and listening for a response - communication from a peer can happen at any time, which is e.g. the case with with the choke message. Note that all messages are binary in essence - this is not a text-based protocol.

We’ll deal with connecting to a peer separately from accepting connections, but the mechanism is similar overall.

Here’s the struct we’ll use to represent a peer. It’s a bit verbose but mostly encapsulates what we need:

type PeerHandler struct {
	Peer       *data.BEPeer
	PeerId     [20]byte
	InfoHash   [20]byte
	Connection net.Conn
	State      StateType
	Incoming   chan *data.Message
	Outgoing   chan *data.Message
	BitField   data.BitField
}

The steps required before our connection to the peer is “ready” are:

  • establish a TCP connection
  • send a handshake
  • receive a handshake
  • (optionally) send/receive a bitfield
  • ready!

Loop

Let’s start with the top-level, which is essentially the event loop (this isn’t the final version, just a starting point):

func (p *PeerHandler) Loop(ctx context.Context) {
	p.Connect()
	if p.State == ERROR {
		return
	}
	defer p.Connection.Close()
	p.Handshake()
	if p.State == ERROR {
		return
	}
	log.Printf("lock 'n load!")
	go p.Listen(ctx)
	for {
		select {
		case <-ctx.Done():
			log.Printf("Context is done, closing connection to %s", hex.EncodeToString([]byte(p.Peer.Id)))
			p.Connection.Close()
			return
		case msg := <-p.Incoming:
			log.Printf("msg received: %x", msg.MessageId)
			log.Printf("payload: %v", msg.Payload)
		case msg := <-p.Outgoing:
			log.Printf("msg to send: %x", msg.MessageId)
		}
	}
}

In due time we’ll improve this to add keep-alive messages along with better error handling.

Listen

Working backwards, the Listen function is used to read from the socket and essentially go from raw data to something in the Message format:

func (p *PeerHandler) Listen(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			log.Printf("shutting down listener")
		default:
			msg, err := getMessage(p.Connection)
			if err != nil {
				log.Printf("error: %s", err)
				break
			}
			p.Incoming <- msg
		}
	}
}

The meat of this function is the private getMessage - reading a message essentially consists of:

  • reading 4 bytes # length of the message
  • if non-zero, a further read for that number of bytes
  • fill in the Message struct, separating the message ID and the payload

We also add a timeout to ensure we don’t block forever (the keep-alive message is meant to address that):

func getMessage(conn net.Conn) (*data.Message, error) {

	timeoutWaitDuration := 2 * time.Minute
	conn.SetReadDeadline(time.Now().Add(timeoutWaitDuration))
	header := make([]byte, 4)
	numBytesRead, err := io.ReadFull(conn, header)

	processBadResponse := func(err error, numBytesRead int) (*data.Message, error) {
		if numBytesRead == 0 {
			log.Printf("no data!")
			return &data.Message{}, errors.New("no data")
		} else if os.IsTimeout(err) {
			log.Println("timed out reading length header from client")
			return &data.Message{}, err
		} else {
			return &data.Message{}, err
		}
	}

	if (err != nil && err != io.EOF) || numBytesRead == 0 {
		return processBadResponse(err, numBytesRead)
	}

	length := binary.BigEndian.Uint32(header[:])

	// keep-alive
	if length == 0 {
		return &data.Message{}, nil
	}

	buffer := make([]byte, length)
	numBytesRead, err = io.ReadFull(conn, buffer)
	if (err != nil && err != io.EOF) || numBytesRead == 0 {
		return processBadResponse(err, numBytesRead)
	}

	msg := &data.Message{
		Length:    [4]byte(header),
		MessageId: buffer[0],
	}

	// some messages don't have a payload
	if len(buffer) > 1 {
		msg.Payload = buffer[1:]
	}

	return msg, nil
}

Handshake

The handshake message must be the first one being sent upon establishing a connection with a peer, before sending any other kinds of messages, and is of the following format:

type Handshake struct {
	PstrLen  byte
	Pstr     []byte
	Reserved [8]byte
	InfoHash [20]byte
	PeerId   [20]byte
}

Note that the handshake is of a fixed size - unlike all other messages, it isn’t lengh-prefixed.

Here PstrLen defines the length (in bytes) of Pstr. The 8 Reserved bytes are currently all 0, and the rest are essentially details about both ourselves (the PeerId) and the torrent we wish to download (InfoHash).

Converting a hanshake to bytes over the wire is relatively simple:

func (h *Handshake) ToBytes() []byte {
	buffer := new(bytes.Buffer)
	buffer.WriteByte(h.PstrLen)
	buffer.Write(h.Pstr)
	buffer.Write(h.Reserved[:])
	buffer.Write(h.InfoHash[:])
	buffer.Write(h.PeerId[:])
	return buffer.Bytes()
}

A handshake is both sent and received, but sent first by the client establishing the connection. For one, the InfoHash needs to match. And if the PeerId received is different from the one that was sent out, we’re free to drop the connection (we would have received the PeerId and its connection details from the tracker, so it’s odd that somehow the PeerId would have changed). The reverse is also true - if the InfoHash we send out doesn’t match a torrent served by the peer we’re trying to connect to, they’re free to drop it.

Once all details match we can say we are properly “connected” to the peer and we can start exchanging messages!

func (p *PeerHandler) Handshake() {
	// a handshake consists of both sending and receiving one!
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		handshakeMsg := data.GetHanshake(p.PeerId, p.InfoHash)
		// fmt.Printf("%+v", handshakeMsg)
		numBytesWritten, err := p.Connection.Write(handshakeMsg.ToBytes())
		if err != nil || numBytesWritten == 0 {
			p.State = ERROR
		}
	}()

	go func() {
		defer wg.Done()
		buf := make([]byte, 1)
		// it really shouldn't take the peer that long to get back with
		// a handshake - if it does, we're probably not getting anything from them
		p.Connection.SetReadDeadline(time.Now().Add(5 * time.Second))
		numBytesRead, err := io.ReadFull(p.Connection, buf)
		if err != nil && err != io.EOF || numBytesRead == 0 {
			log.Printf("handshake error (pstrlen): %s", err)
			p.State = ERROR
			return
		}
		pstrLength := buf[0]
		buf = make([]byte, 49+pstrLength-1)
		numBytesRead, err = p.Connection.Read(buf)
		if err != nil && err != io.EOF || numBytesRead == 0 {
			log.Printf("handshake error: %s", err)
			p.State = ERROR
			return
		}
		peerHandShake := data.Handshake{
			PstrLen:  pstrLength,
			Pstr:     buf[1:pstrLength],
			Reserved: [8]byte(buf[pstrLength : pstrLength+8]),
			InfoHash: [20]byte(buf[pstrLength+8 : pstrLength+8+20]),
			PeerId:   [20]byte(buf[pstrLength+8+20:]),
		}
		// validate it all matches
		if peerHandShake.InfoHash != p.InfoHash {
			log.Printf("info_hash doesn't match!")
			p.State = ERROR
		}
		// peer spoofing?
		// if string(peerHandShake.PeerId[:]) != p.Peer.Id {
		// 	log.Printf("peer_id doesn't match!")
		// 	p.State = ERROR
		// }
	}()
	wg.Wait()
	// if we reach here, we're ready!
	if p.State != ERROR {
		p.State = READY
	}
}

The implementation uses sync.WaitGroup but it’s really up to us to send our handshake first and have the peer respond if all details line up. Once we start accepting connections we’ll need to ensure the handshake we receive is valid before we send ours back.

Connect

Connecting is simplest - note that net.JoinHostPost is IP-version agnostic. It works the same regardless of whether this is ipv4 or ipv6 (and nowadays more peers tend to be ipv6):

func (p *PeerHandler) Connect() {
	address := net.JoinHostPort(p.Peer.IP, fmt.Sprintf("%d", p.Peer.Port))
	conn, err := net.DialTimeout("tcp", address, time.Second*5)
	if err != nil {
		log.Printf("error connecting to peer %s: %s", hex.EncodeToString([]byte(p.Peer.Id)), err)
		p.State = ERROR
		return
	}
	p.Connection = conn
	log.Printf("connected to %s", address)
}

This function doesn’t return anything, it just sets the connection that is used by all the invocations above.

Putting it all together

To truly unleash the power of the BitTorrent protocol, we won’t contend ourselves with connecting to just one peer. We’ll want to maintain connections to peers that have blocks we require, along with periodically polling the tracker to find new peers (and drop existing ones that are either in a bad state/don’t have any of the blocks we require). That means having some sort of “peer manager” that will manage that lifecycle.

That said the scaffholding to start sending and receiving messages is in place! You can kick it off and see the peer sending a bitfield after a successful handshake:

/V/r/g/src ❯❯❯ go run ./main.go download -torrent=tmp/files.torrent
2024/10/29 17:39:56 peerManager ID: fe55a6c5e40651c3537b242f4115c20c3eb1aa08
2024/10/29 17:39:56 querying tracker: http://localhost:8080/announce?info_hash=%3C%5E%11%8ES%28%D8ezT%16%40%EB%F3%24%94%09%D0%C3%D6&peer_id=%FEU%A6%C5%E4%06Q%C3S%7B%24%2FA%15%C2%0C%3E%B1%AA%08&port=6688&uploaded=0&downloaded=0&left=0&numwant=0
2024/10/29 17:39:56 tracker responded
2024/10/29 17:39:56 enquing peer 2d5452343036302d377267343076317977696874 - 127.0.0.1:51413
2024/10/29 17:39:56 peerHandler: remote peer 2d5452343036302d377267343076317977696874, state=0
2024/10/29 17:39:56 connected to 127.0.0.1:51413
2024/10/29 17:39:57 lock 'n load!
2024/10/29 17:39:57 msg received: 5
2024/10/29 17:39:57 payload: [255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255]