We’re now in a state whereby we can handle, that is request and receive, pieces. What we now need is a mechanism through which we can decide what piece to download next and a place to store it - the end goal being to download all pieces and make the original file(s) available.

We’re building up from the go-bt repository. Clone it if you want to follow along!

Piece selection

To fully parallelise downloads we want to request a different piece from each peer. And as soon as a piece is retrieved, we want to request the next one. But it’s not necessarily (but it can be!) as simple as requesting a missing piece from a peer who has it. Take the following example:

piece: 0 1 2 3 4 5 6
peerA: X . X X X . X
peerB: . X . X X X X

Which pieces do you request first? Which peer do you request piece 3 from?

There are a few algorithms available - for instance we should probably prioritise piece 0, 1, 2 and 5 - as their availability is limited and if either peer goes offline, we won’t be able to complete our download. But once those have been downloaded, what about pieces 3, 4, 6? Piece 6 is likely the smallest (it doesn’t necessarily fill up the whole piece length), so we can probably keep it for last. Also some types of media files can be processed sequentially, even if it’s missing later data.

For now we’ll keep it simple. Iterating through all the pieces we need, we’ll simply punt it to the first available peer (i.e. a peer needs to both have the piece as well as be in a good state).

func (p *PeerManager) DownloadNextPiece() boolean {
	didAnything := false
	for pieceNum := range p.BitField.NumPieces() {
		if !p.BitField.HasPiece(pieceNum) {
			for peerId, handler := range p.PeerHandlers {
				if handler.State == UNCHOKED && handler.BitField.HasPiece(pieceNum) {
					log.Printf("peer %x is UNCHOKED and has piece %d", peerId, pieceNum)
					// usually we'd request PIECE_LENGTH, but if this is e.g. the last
					// piece, the size of the piece may be less than the piece size
					// specified in the info dict
					handler.RequestPiece(pieceNum, min(p.Torrent.Info.GetPieceSize(pieceNum), PIECE_LENGTH))
					didAnything := true
				}
			}
		}
	}
	return didAnything
}

Piece storage

Once a piece has been downloaded we need to store it somewhere (keeping it in memory ain’t it). Given how we’re restricting ourselves to the standard library we don’t exactly have tons of options. With this in mind the approach we’ll take is to create empty files of the required sizes - and we’ll write each piece at the corresponding offset.

It sounds easy enough in practice but pieces at file boundaries need to be handled carefully. Consider the following (contrived) example:

piece: |    10    |    10    |    10    |
files: |     12     |  4 |   7    |

Total size across all 3 files is 12 + 4 + 7 = 23, and for a piece size of 10 that will be split across 3 pieces.

Let’s first write a test that replicates the above:

func TestGetSegmentsForPiece(t *testing.T) {
	// total size is 23 bytes for a total of 3 pieces
	binfo := &data.BEInfo{
		Files: []data.BEFile{
			{
				Path:   []string{"file1"},
				Length: 12,
			},
			{
				Path:   []string{"file2"},
				Length: 4,
			},
			{
				Path:   []string{"file3"},
				Length: 7,
			},
		},
		PieceLength: 10,
	}

	expected := map[int][]torrent.Segment{
		0: {
			{
				Filename: "file1",
				Offset:   uint64(0),
				Length:   10,
			}},
		// this is the most interesting piece - it spans 3 files!
		1: {
			{
				Filename: "file1",
				Offset:   uint64(10),
				Length:   2,
			},
			{
				Filename: "file2",
				Offset:   uint64(0),
				Length:   4,
			},
			{
				Filename: "file3",
				Offset:   uint64(0),
				Length:   4,
			}},
		2: {
			{
				Filename: "file3",
				Offset:   uint64(4),
				Length:   3,
			}},
	}

	for pieceIdx, expectedSegments := range expected {
		segments := torrent.GetSegmentsForPiece(binfo, uint32(pieceIdx))
		if !reflect.DeepEqual(segments, expectedSegments) {
			t.Errorf("expected %+v, got %+v ", expectedSegments, segments)
		}
	}
}

And this is the implementation of GetSegmentsForPiece. It’s a linear search through the files - we could keep an intermediate datastructure to keep track of offsets between files but it’s simple enough to recompute on the fly:

func GetSegmentsForPiece(i *data.BEInfo, index uint64) []Segment {
	segments := make([]Segment, 0)

	pieceStart := index * i.PieceLength
	bytesRemainingInPiece := i.PieceLength
	runningOffset := uint64(0)
	for _, file := range i.Files {
		if bytesRemainingInPiece == 0 || runningOffset > pieceStart+bytesRemainingInPiece {
			// we're done
			break
		} else if (runningOffset + uint64(file.Length)) < pieceStart {
			// this is beyond the current file's boundary
			runningOffset += uint64(file.Length)
		} else {
			// part of this piece belongs to this file
			fileBytesInPiece := min(runningOffset+uint64(file.Length)-pieceStart, bytesRemainingInPiece)
			segments = append(segments, Segment{
				Filename: file.Path[0],
				Offset:   pieceStart - runningOffset,
				Length:   fileBytesInPiece,
			})
			// this may well be 0 now
			bytesRemainingInPiece -= fileBytesInPiece
			pieceStart += fileBytesInPiece
			// if we're at a file boundary we should move on to the next one
			if pieceStart == (runningOffset + uint64(file.Length)) {
				runningOffset += uint64(file.Length)
			}
		}
	}
	return segments
}

Now that we can map a piece to different file segments, writing the contents out is relatively trivial:

func WriteSegments(segments []Segment, data []byte, baseDir string) {
	dataOffset := 0
	for _, segment := range segments {
		filePath := path.Join(baseDir, segment.Filename)
		file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0666)
		common.Check(err)
		defer file.Close()
		writer := io.NewOffsetWriter(file, int64(segment.Offset))
		writer.Write(data[dataOffset : dataOffset+int(segment.Length)])
		dataOffset += int(segment.Length)
	}
}

Resuming a torrent

Writing pieces to disk is all well and good, but what happens if our client dies unexpectedly? We shouldn’t have to start from scratch. However because we write files incrementally we need to fall back to the way we compute offsets for each piece.

For instance if file1 consists of pieces 4,5,6 - and we obtain piece 5, we’ll write file2 as follows:

! empty piece | piece 5   |

Meaning the file will be of 2 pieces’ length, not 3 - so we can’t quite compute pieces directly from what we have. Instead we should rely on what we expect. With this in mind, let’s leverage our file segments:

func (p *PeerManager) HashCheck() {
	numExistingPieces := 0
	info := p.Torrent.Info
	for pieceIdx := range info.GetNumPieces() {
		h := sha1.New()
		segments := torrent.GetSegmentsForPiece(&info, pieceIdx)
		for _, segment := range segments {
			fullPath := path.Join(p.BaseDirectory, segment.Filename)
			if _, err := os.Stat(fullPath); err == nil {
				file, err := os.Open(fullPath)
				common.Check(err)
				defer file.Close()
				reader := io.NewSectionReader(file, int64(segment.Offset), int64(segment.Length))
				buf := make([]byte, segment.Length)
				reader.Read(buf)
				h.Write(buf)
			}
		}
		if string(h.Sum(nil)) == info.Pieces[pieceIdx*20:(pieceIdx+1)*20] {
			p.BitField.SetPiece(pieceIdx)
			numExistingPieces += 1
		}
	}
	log.Printf("found %d existing pieces!", numExistingPieces)
}

It’s a little inefficient - for instance we open/close files multiple times - but it’s easy to reason about. And more importantly it works! So for something that’s only ever done at start-up, it’s a fair compromise methinks.

Torrent management

We have a number of pieces (haha) in place to build up to something that can download the full torrent.

Managing a peer

The PeerHandler handles all connectivity to a given peer, from the handshake down to message-passing. We can ask it to request a given piece and it will dutifully handle the back-and-forth with the peer. It also takes care of maintaining an internal state of which pieces the peer has available (further to the bitfield message, a peer can disseminate a have message when it comes in possession of a piece it didn’t have before).

Once a piece has been retrieved from a peer, it sets its state to PIECE_COMPLETE. The manager will periodically check each peer for this status - and write out the piece before setting the state back to READY (at which point it may be assigned another piece to download).

Orchestrating

The end goal is to download every piece. The manager knows which pieces are required and can iterate through its pool of peers to find the ones that have them available. It is then a matter of dispatching a request to the relevant peer. Once a piece becomes available (the PeerHandler will be in a PIECE_COMPLETE state) we can write the piece to file, update our bitfield, and reset the state accordingly.

It is therefore important for us to manage the pool of peers effectively. For instance if all of the peers in our pool are chocked, or none carry the pieces we require, we should eject them from the pool. We periodically refresh the list of peers via the tracker (which is usually a subset of all available peers) - and can use newly found ones to replenish it.

Let’s list down the steps. The order isn’t terribly important as long as each task gets done.

  1. Get a list of peers from the tracker
  2. Refresh our pool of peers
  3. Request pieces from peers
  4. Write downloaded pieces

Peer scoring

Some steps are a little more involved - e.g. refreshing our pool of peers can be as simple as ejecting chocked peers or having some sort of priority-based system:

  1. Ejecting choked peers that don’t have any pieces we’re interested in
  2. Ejecting peers that don’t have any pieces we’re interested in
  3. Ejecting choked peers that have the least number of pieces we’re interested in 1. But keeping choked peers that have rare pieces

This allows us to rank peers, which makes it easier to decide which to eject. Let’s say we have the following availability: |pieceIdx|numPeersWithPiece| |-|-| |1|3| |2|1| |3|5|

Let’s say we have a peer with piece 1 and 3, vs one with just 3. Clearly the former is deemed more important. But what about 1 and 3 vs 2? As we only have one peer with piece 2 and pieces 1 and 3 can be obtained from multiple peers it is deemed the most imporant.

This little snippet helps us score a peer’s bitfield against others (note that availability only concerns itself with pieces we do not yet own):

// this is used to assess availability of pieces we're interested in
func (p *PeerManager) GetPiecesAvailability() map[uint32]uint32 {
	availability := map[uint32]uint32{}
	for idx := range p.BitField.NumPieces() {
		if !p.BitField.HasPiece(idx) {
			availability[idx] = 0
			for _, peerHandler := range p.PeerHandlers {
				if peerHandler.BitField.HasPiece(idx) {
					availability[idx] += 1
				}
			}
		}
	}
	return availability
}

func GetPiecesScore(b data.BitField, availability map[uint32]uint32, numPeers uint32) uint32 {
	score := uint32(0)
	for pieceIdx, numPeersWithPiece := range availability {
		if b.HasPiece(pieceIdx) {
			score += 1 + numPeers - numPeersWithPiece
		}
	}
	return score
}

Calculating the overall peer score uses the above with some heuristics to take into account peers that are choked:

func (p *PeerManager) GetPeerScore(availability map[uint32]uint32, h *PeerHandler) uint32 {
	// not yet unchocked!
	if p.PeerHasPieceOfInterest(h) {
		score := GetPiecesScore(h.BitField, availability, uint32(len(p.PeerHandlers)))
		if h.State == READY {
			// we're chocked - halve the score
			return score / 2
		} else {
			return score
		}
	} else if h.State == UNCHOKED {
		// peer is unchocked but it doesn't currently have
		// any piece we're interested in
		return 1
	} else {
		// no interest here!
		return 0
	}
}

Main loop

Our processing loop is kept relatively simple as most of the logic is encapsulated in the helper functions:

func (p *PeerManager) Run() {
	log.Printf("peerManager ID (ours): %s", hex.EncodeToString(p.PeerId[:]))
	ctx, cancelFunc := context.WithCancel(context.Background())
	defer func() {
		cancelFunc()
	}()
	p.Context = ctx

	// periodic ping to the tracker to ensure we still show up
	// as a valid peer
	// TODO: this should ideally have the right event sent out
	// to the tracker depending on which state we're in
	go func(ctx context.Context) {
		p.queryTrackerAndUpdatePeersList(ctx)
		time.Sleep(30 * time.Second)
	}(ctx)

	for {
		select {
		case <-ctx.Done():
			return
		default:

			p.processCompletedPieces()
			if p.DownloadNextPiece() {
				log.Print("found new piece(s) to download!")
			} else {
				log.Print("nothing to download - but are we complete?")
			}
		}
		time.Sleep(1 * time.Second)
	}
}

We don’t technically need time.Sleep after each iteration - but it makes it easier to see it action.

And here it is in action, including resume ‘n all! You can see how:

  • we recovered 67 existing pieces (it was a partial download)
  • we were first choked by the peer, but once we became unchoked we started downloading pieces
  • downloaded piece 68, validated its SHA1 digest
/V/r/g/src ❯❯❯ go run ./main.go download -torrent=/Users/axiomiety/tmp/files.torrent
2024/12/28 15:49:51 found 67 existing pieces!
2024/12/28 15:49:51 hash of idx 0: 294faa783957ea41ff3641f1b52fa85cf4bec89a
2024/12/28 15:49:51 peerManager ID (ours): d318d4188fc4a9f0df503c376e55b18d1e2a5734
2024/12/28 15:49:51 nothing to download - but are we complete?
2024/12/28 15:49:51 querying tracker: http://localhost:8080/announce?info_hash=%3C%5E%11%8ES%28%D8ezT%16%40%EB%F3%24%94%09%D0%C3%D6&peer_id=%D3%18%D4%18%8F%C4%A9%F0%DFP%3C7nU%B1%8D%1E%2AW4&port=6688&uploaded=0&downloaded=0&left=0&numwant=0
2024/12/28 15:49:51 tracker responded
2024/12/28 15:49:51 enquing peer 2d5452343036302d6e396c346879783434396f79 - [::1]:51413
2024/12/28 15:49:51 peerHandler: remote peer 2d5452343036302d6e396c346879783434396f79, state=0
2024/12/28 15:49:51 connected to [::1]:51413
2024/12/28 15:49:51 lock 'n load!
2024/12/28 15:49:52 msg received: 5
2024/12/28 15:49:52 bitfield: [255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255 255] 23
2024/12/28 15:49:52 nothing to download - but are we complete?
2024/12/28 15:49:53 nothing to download - but are we complete?
2024/12/28 15:49:54 nothing to download - but are we complete?
2024/12/28 15:49:55 nothing to download - but are we complete?
2024/12/28 15:49:56 nothing to download - but are we complete?
2024/12/28 15:49:57 nothing to download - but are we complete?
2024/12/28 15:49:58 nothing to download - but are we complete?
2024/12/28 15:49:59 nothing to download - but are we complete?
2024/12/28 15:50:00 nothing to download - but are we complete?
2024/12/28 15:50:00 msg received: 1
2024/12/28 15:50:00 unchocked!
2024/12/28 15:50:01 peer 2d5452343036302d6e396c346879783434396f79 is UNCHOKED and has piece 67
2024/12/28 15:50:01 requesting piece 67 from peer
2024/12/28 15:50:01 found new piece(s) to download!
2024/12/28 15:50:01 msg to send: 6
2024/12/28 15:50:01 send 17 bytes to peer
2024/12/28 15:50:01 msg received: 7
2024/12/28 15:50:01 received block for index 67 from 0 with length 16383
2024/12/28 15:50:01 msg to send: 6
2024/12/28 15:50:01 send 17 bytes to peer
2024/12/28 15:50:02 msg received: 7
2024/12/28 15:50:02 received block for index 67 from 16383 with length 16384
2024/12/28 15:50:02 msg to send: 6
2024/12/28 15:50:02 send 17 bytes to peer
2024/12/28 15:50:02 nothing to download - but are we complete?
2024/12/28 15:50:02 msg received: 7
2024/12/28 15:50:02 received block for index 67 from 32767 with length 16384
2024/12/28 15:50:02 msg to send: 6
2024/12/28 15:50:02 send 17 bytes to peer
2024/12/28 15:50:03 msg received: 7
2024/12/28 15:50:03 received block for index 67 from 49151 with length 16384
2024/12/28 15:50:03 msg to send: 6
2024/12/28 15:50:03 send 17 bytes to peer
2024/12/28 15:50:03 nothing to download - but are we complete?
2024/12/28 15:50:03 msg received: 7
2024/12/28 15:50:03 received block for index 67 from 65535 with length 1
2024/12/28 15:50:03 piece 67 is complete
2024/12/28 15:50:03 hash: 21b794386367745ea431b50762a14054d9196d5d
2024/12/28 15:50:04 downloaded piece 67 from peer 2d5452343036302d6e396c346879783434396f79 with sha1: 21b794386367745ea431b50762a14054d9196d5d
2024/12/28 15:50:04 peer 2d5452343036302d6e396c346879783434396f79 is UNCHOKED and has piece 68
2024/12/28 15:50:04 requesting piece 68 from peer
2024/12/28 15:50:04 found new piece(s) to download!
2024/12/28 15:50:04 msg to send: 6
2024/12/28 15:50:04 send 17 bytes to peer
2024/12/28 15:50:04 msg received: 7
2024/12/28 15:50:04 received block for index 68 from 0 with length 16383
...

This is safe to Ctrl-C, and we’ll pick up from where we left off.

What happens after?

Once all pieces have been downloaded/written, our torrent is now complete. We’ll keep heartbeating to the tracker and possibly refreshing peers, but for all intended purposes the torrent is now complete. At this point we’re classified as a seed and can start serving leeches in earnest.

Taking it further

PeerHandler state transitions

Currently the manager round-robins across peers and performs actions based on their state. The polling approach is relatively simple and works well when we need to find a peer for a specific piece, but we could definitely have a channel back to the manager to handle this more effectively (though if that channel is shared, the message would need to include the handler’s ID or something similar to get).

Download order

Starting 2 leeches afresh with one seed, both would start asking for pieces in the same order - meaning they can’t exactly help one another. If the order was random instead, they’d be more likely to be able to download from one another.