Now that we have a basic framework in place to receive messages, let’s look at what it’ll take to request a piece.

We’re building up from the go-bt repository. Clone it if you want to follow along!

Pieces vs blocks

As a quick recap, a .torrent’s info dict defines one or more files, their size (in bytes) along with a special piece length attribute. If we sum up the size of all files, divide by piece length and round up, that will tell us how many pieces are required to make the torrent whole:

/V/r/g/src ❯❯❯ go run ./main.go bencode -decode=/tmp/files.torrent | head -c 1000
{
  "announce": "http://localhost:8080/announce",
  "created by": "qBittorrent v4.6.2",
  "creation date": 1728558152,
  "info": {
    "files": [
      {
        "length": 7000000,
        "path": [
          "file1"
        ]
      },
      {
        "length": 2000000,
        "path": [
          "file2"
        ]
      },
      {
        "length": 3000000,
        "path": [
          "file3"
        ]
      }
    ],
    "name": "files",
    "piece length": 65536,

Which gives us 184 pieces:

>>> (7000000+2000000+3000000)/65536
183.10546875

Say we wanted the piece at index 0 - each piece is 65536 bytes - and the maximum size of a TCP packet is 65535 bytes (yes, one less - I didn’t choose that number at random ^_^). There’s no way the whole piece can be sent as a single unit, it’ll need to be broken down. A single transmission “unit” is called a block - and depending on the implementation this could have an upper bound of 16K (16384 bytes). The client would expect 4 blocks to be transmitted to complete the piece.

Note that during that time, the piece is tagged as pending. It’s very much an all or nothing kind of thing - if the client shuts down before receiving all 4 blocks the (partial) piece is discarded. Practically it means the piece is only stored to disk once completed, which incidentally also means running SHA1 to ensure it matches with the relevant offset in info.pieces.

Let’s have a closer look as to what that looks like in practice.

Relevant messages

A brief note on choking

When two peers establish a connection, they both “choke” one another - which is a way to tell the other party that no download requests will be entertained. There are a couiple of reasons for this but this stems from the fact that upload bandwidth is a limited resource - and each peer is free to decide whom to allocate a “slot” to (e.g. perhaps one who has a piece that’s not available elsewhere).

Choking is tracked bilaterally by each peer. We may unchoke a peer (be ready to process its piece requests) but they don’t need to unchoke us (process our requests).

choke

A choke message doesn’t have any payload:

<len=0001>
<id=0>

This is our default state after connecting to a peer, so you may not see an incoming choke message until after some time.

unchoke

When a peer decides to unchoke us (or vice-versa), the below will be sent out:

<len=0001>
<id=1>

If we receive this from a peer it means we can start sending in request messages!

interested

After a handshake with a peer, a bitfield message is usually sent - this gives us a good indication as to whether the peer has pieces that are of interest. If it has, we send the below to express interest in having them upload data to us:

<len=0001>
<id=2>

We can send this message regardless of whether the peer has choked us - and we should. If we’re not interested in what this peer has to offer then the opposite party doesn’t have any incentive to prioritise our requests.

Note we may send this at any time. As we’ll see later when a new piece has been downloaded by a peer it can broadcast a have message - and this turns out to be a piece we need we can express our interest. It’s therefore important to have an accurate represenation of which peers carry which pieces.

requests

After expressing our interest in a piece from a peer, if we get allocated a slot (unchoked), we can start requesting data. As described above we can’t simply request a whole piece - instead we divvy up our requests into blocks which will get concatenated to form the necessary piece.

A request must therefore include not only the piece index, but also a starting offset into the piece along with a length (which by convention is 2^14 - anything larger may be ignored by the peer):

<len=0013>
<id=6>
<index>
<begin>
<length>

The 3 important fields are index, begin and length. Taking the example above for the piece at index 0 we’ll need to make 4 separate requests:

index=0
begin=0
length=16384
-
index=0
begin=16385
length=16384
-
index=0
begin=32768
length=16384
-
index=0
begin=49152
length=16384

Those requests can be consecutive but don’t need to be - though you’d probably wait for a response for each before requesting another.

What about the last piece though, the one at index 183 (184th, 0-index)?

>>> (7000000+2000000+3000000)%65536
6912

We only need to request 6912 bytes - which should fit right in a single request message:

index=183
begin=0
length=6912

piece

After submitting a request, we wait for peer to send us the required data. This will be in the following format:

<len=0009+X>
<id=7>
<index>
<begin>
<block>

The peer should send us what we requested, but they’re free to send us a different amount (usually smaller?). We’ll need to update our internal state based on what was received.

Putting it all together

So now we have a way to:

  • tell the peer we’re interested in their pieces
  • wait for us to get allocated an upload slot on their end
  • request for blocks to make up a whole piece

Let’s code it up!

Loop

We need to beef up our processing loop. In particular is the idea of dispatching the receiving/sending to goroutines. This is necessary because we can’t block on the same channel we’re trying to send to!

func (p *PeerHandler) Loop(ctx context.Context) {
	...
	for {
		select {
		case <-ctx.Done():
			log.Printf("Context is done, closing connection to %s", hex.EncodeToString([]byte(p.Peer.Id)))
			p.Connection.Close()
			return
		case msg := <-p.Incoming:
			log.Printf("msg received: %x", msg.MessageId)
			go p.processIncoming(msg)
		case msg := <-p.Outgoing:
			log.Printf("msg to send: %x", msg.MessageId)
			p.send(msg.ToBytes())
		}
	}
}

(we should ideally check our internal state periodically but let’s leave that for another time)

Coding up send is simple:

func (p *PeerHandler) send(data []byte) {
	bytesWritten, err := p.Connection.Write(data)
	if err != nil {
		log.Printf("error writing to peer! %s", err)
	}
	if bytesWritten != len(data) {
		log.Printf("only wrote %d bytes for a message %d bytes long", bytesWritten, len(data))
		p.State = ERROR
	}
	log.Printf("send %d bytes to peer", bytesWritten)
}

Dealing with incoming messages is essentially a switch on the message ID, updating some internal state:

func (p *PeerHandler) processIncoming(msg *data.Message) {

	switch msg.MessageId {
	case data.MsgBitfield:
		p.BitField = data.BitField{
			Field: msg.Payload,
		}
	case data.MsgPiece:
		p.receiveBlock(msg.Payload)
	case data.MsgUnchoke:
		log.Printf("unchocked!")
		p.State = UNCHOKED
	default:
		log.Printf("don't know what to do with this message!")
	}
}

We saw Listen in the previous post so we’ll skip it.

Interested

Letting the peer know we’re interested is as simple as:

func (p *PeerHandler) Interested() {
	length := make([]byte, 4)
	binary.BigEndian.PutUint32(length, 1)
	msg := &data.Message{
		Length:    [4]byte(length),
		MessageId: data.MsgInterested,
	}
	p.Outgoing <- msg
}

RequestPiece

When requesting a piece we need to know its size - for all pieces except (perhaps) the last one this will be the number of bytes defined in piece_length in the info dict. To help us track an incomplete piece:

type PendingPiece struct {
	TotalSize  uint32
	Data       []byte
	NextOffset uint32
	Index      uint32
}

We’ll also update our state to ensure we only request a single piece at a time (there’s no reason we technically can’t, but then we’d need to ensure we build the right piece when receiving a response from the peer - this complication is unnecessary at this point).

func (p *PeerHandler) RequestPiece(idx uint32, pieceLength uint32) {
	log.Printf("requesting piece %d from peer", idx)

	// so we don't request a new piece until we're back to a READY state
	p.State = REQUESTING_PIECE
	p.PendingPiece = PendingPiece{
		TotalSize: pieceLength,
		Index:     idx,
	}
	// can't set it above otherwise when it gets copied, the capacity is zero!
	p.Data = make([]byte, p.TotalSize)

	amountOfDataToRequest := min(p.TotalSize, uint32(math.Pow(2, 14)-1))
	p.Outgoing <- data.Request(idx, 0, amountOfDataToRequest)
}

Note how we request up to min(p.TotalSize, uint32(math.Pow(2, 14)-1)) data in our first request - this is to cater for when the last piece is itself less than 2^14 bytes.

receiveBlock

When receiving a block from a peer we need to copy the incoming data into our PendingPiece and get the next request ready if we’re still missing data to form the complete piece.

func (p *PeerHandler) receiveBlock(payload []byte) {
	// extract the relevant information
	index := binary.BigEndian.Uint32(payload[:4])
	begin := binary.BigEndian.Uint32(payload[4:8])
	// 4 bytes for the index, 4 bytes for the offset
	blockLength := len(payload) - 8
	log.Printf("received block for index %d from %d with length %d", index, begin, blockLength)

	// copy the data into our piece buffer
	copy(p.PendingPiece.Data[begin:begin+uint32(blockLength)], payload[8:])
	p.PendingPiece.NextOffset = begin + uint32(blockLength)

	if p.PendingPiece.IsComplete() {
		log.Printf("block %d is complete", p.PendingPiece.Index)
		// sha1 validation!
		h := sha1.New()
		h.Write(p.PendingPiece.Data)
		log.Printf("hash: %s", hex.EncodeToString(h.Sum(nil)))
	} else if p.PendingPiece.NextOffset < p.PendingPiece.TotalSize {
		// we need to request another piece
		// at most we'll get 16KB
		pieceLength := min(uint32(math.Pow(2, 14)), p.PendingPiece.TotalSize-p.PendingPiece.NextOffset)
		msg := data.Request(p.PendingPiece.Index, p.PendingPiece.NextOffset, pieceLength)
		p.Outgoing <- msg
	} else {
		log.Printf("downloaded more than we should have! next:%d vs total:%d resetting...", p.PendingPiece.NextOffset, p.PendingPiece.TotalSize)
		// up to us as to what we do
		// e.g. clean up the pending block and set the state accordingly
	}
}

When a piece is complete we should perform SHA1 validation to ensure it matches the corresponding digest in the pieces array from the info dict. If it isn’t we should discard it and possibly request the same piece from another peer (or request it again).

A peer can send us less than what we requested - so we should take that into account too when updating our offset.

Seeing it in action

Our downloader is now going through all the necessary steps to download (and validate - somewhat) a full piece! Namely:

  • parsing the torrent
  • getting a list of peers from the tracker
  • connect to a peer
  • request a piece
  • validate it
/V/r/g/src ❯❯❯ go run ./main.go download -torrent=/tmp/files.torrent
2024/11/16 15:56:24 hash of idx 183: c86b8537f69e8f48ce338ee264ba56927f4b9f79
2024/11/16 15:56:24 peerManager ID (ours): 38de8dd9f9a8f57898474646596105290bc45a79
2024/11/16 15:56:24 querying tracker: http://localhost:8080/announce?info_hash=%3C%5E%11%8ES%28%D8ezT%16%40%EB%F3%24%94%09%D0%C3%D6&peer_id=8%DE%8D%D9%F9%A8%F5x%98GFFYa%05%29%0B%C4Zy&port=6688&uploaded=0&downloaded=0&left=0&numwant=0
2024/11/16 15:56:24 tracker responded
2024/11/16 15:56:24 enquing peer 2d5452343036302d6e396c346879783434396f79 - 127.0.0.1:51413
2024/11/16 15:56:24 peerHandler: remote peer 2d5452343036302d6e396c346879783434396f79, state=0
2024/11/16 15:56:29 connected to 127.0.0.1:51413
2024/11/16 15:56:29 lock 'n load!
2024/11/16 15:56:30 msg received: 5 # bitfield msg received from the peer
2024/11/16 15:56:30 msg to send: 2 # sending interested to the peer
2024/11/16 15:56:30 send 5 bytes to peer
2024/11/16 15:56:37 msg received: 1 # received unchocked from the peer
2024/11/16 15:56:37 unchocked!
2024/11/16 15:56:38 requesting piece 0 from peer
2024/11/16 15:56:38 msg to send: 6 # initial request
2024/11/16 15:56:38 send 17 bytes to peer
2024/11/16 15:56:38 msg received: 7
2024/11/16 15:56:38 received block for index 0 from 0 with length 16383
2024/11/16 15:56:38 msg to send: 6
2024/11/16 15:56:38 send 17 bytes to peer
2024/11/16 15:56:39 msg received: 7
2024/11/16 15:56:39 received block for index 0 from 16383 with length 16384
2024/11/16 15:56:39 msg to send: 6
2024/11/16 15:56:39 send 17 bytes to peer
2024/11/16 15:56:39 msg received: 7
2024/11/16 15:56:39 received block for index 0 from 32767 with length 16384
2024/11/16 15:56:39 msg to send: 6
2024/11/16 15:56:39 send 17 bytes to peer
2024/11/16 15:56:40 msg received: 7
2024/11/16 15:56:40 received block for index 0 from 49151 with length 16384
2024/11/16 15:56:40 msg to send: 6
2024/11/16 15:56:40 send 17 bytes to peer
2024/11/16 15:56:40 msg received: 7
2024/11/16 15:56:40 received block for index 0 from 65535 with length 1
2024/11/16 15:56:40 piece 0 is complete
2024/11/16 15:56:40 hash: 294faa783957ea41ff3641f1b52fa85cf4bec89a
^Csignal: interrupt

Which matches the digest of the first piece in the info dict.

And voila. We’re far from done (we need to store this piece somewhere, request a new one, build the file on disk, respond to requests etc…) but we now have the building blocks to download all the pieces in a torrent.

Taking it further

We’ve mainly catered for the happy path here - there’s a whole bunch of state-checking we should be doing periodically in case we end up in an error state.

There’s also more work down the line to have a kind of “peer manager” to handle various peers, piece requests and aggregation. We’ll also want to tie down our state transition so e.g. we don’t end up requesting a piece when one is already in-flight.