Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Remaining Peer Protocol Features #31

Merged
merged 92 commits into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
9cb9eed
Add validation method
Ashuh Nov 12, 2023
9c5c72a
Rename constant
Ashuh Nov 12, 2023
ed14608
Add import
Ashuh Nov 12, 2023
a9c6fc5
Add method to get PeerMessage size in bytes
Ashuh Nov 12, 2023
efa1047
Include all downloaded bytes in peer download rate
Ashuh Nov 12, 2023
2c8fd32
Implement getPayloadSize in BlockMessage
Ashuh Nov 13, 2023
a4aa809
Finalize method
Ashuh Nov 13, 2023
20bba37
Update peer related log messages
Ashuh Nov 13, 2023
8dd4720
Add Bitfield factory method
Ashuh Nov 13, 2023
0f16593
Fix Bitfield packing
Ashuh Nov 17, 2023
f4ec795
Refactor Bitfield unpacking
Ashuh Nov 17, 2023
787482d
Add Bitfield javadoc
Ashuh Nov 17, 2023
3959f2c
Add javadoc for Bitfield & KeepAlive message structure
Ashuh Nov 17, 2023
258ae5b
Add getters for BlockMessage
Ashuh Nov 17, 2023
87a6729
Replace PeerHandler.Listener with PeerHandler.EventHandler
Ashuh Nov 17, 2023
da5d366
Change executor to non-static
Ashuh Nov 17, 2023
48ae484
Fix wrong method called
Ashuh Nov 17, 2023
5d25be6
Fix Bitfield handling
Ashuh Nov 18, 2023
dfba7c3
Move peer connection logic from TorrentHandler to PeerHandler
Ashuh Nov 18, 2023
25695bc
Make PeerMessageUnpacker a utility class
Ashuh Nov 18, 2023
2b67a91
Refactor incoming connection handling
Ashuh Nov 21, 2023
48b7fe1
Refactor incoming connection handling
Ashuh Nov 21, 2023
f6d3f64
Rearrange code
Ashuh Nov 24, 2023
8f59029
Fix log msg
Ashuh Nov 24, 2023
0e5eb75
Fix condition
Ashuh Nov 24, 2023
4d8d0f5
Add log msgs
Ashuh Nov 24, 2023
b0f4f4b
Simplify code
Ashuh Nov 24, 2023
713f98a
Rename IncomingConnectionListener to InboundConnectionListener
Ashuh Nov 24, 2023
032914b
Rename HandleIncomingPeerConnectionsTask to HandleInboundConnectionsTask
Ashuh Nov 24, 2023
01426c0
Stop HandleInboundConnectionsTask on shutdown
Ashuh Nov 24, 2023
90158c6
Move PeriodicFixedDelayTask to utils
Ashuh Nov 25, 2023
d5a547e
Rename PeriodicFixedDelayTask to PeriodicTask
Ashuh Nov 25, 2023
afbeb19
Add check to prevent PeriodicTask from being started twice
Ashuh Nov 25, 2023
026654c
Refactor PeriodicTask
Ashuh Nov 25, 2023
667eeee
Periodically send KeepAlive
Ashuh Nov 25, 2023
9a4b994
Update toString
Ashuh Nov 25, 2023
e78e66a
Kill peers that have not been seen for more than 2 minutes
Ashuh Nov 25, 2023
746e78e
Move sendInterested() into PeerSocket
Ashuh Nov 25, 2023
2f926b2
Move sendRequest() into PeerSocket
Ashuh Nov 25, 2023
9e95800
Move sendChoke() & sendUnchoke() into PeerSocket
Ashuh Nov 25, 2023
bb691f7
Refactor KeepAlive handling
Ashuh Nov 25, 2023
c6ed4a0
Add send methods in PeerSocket
Ashuh Nov 25, 2023
0a6ecfb
Handle Interested & NotInterested
Ashuh Nov 26, 2023
fba3699
Set Peer last seen when constructing PeerHandler
Ashuh Nov 26, 2023
b37be4a
Handle incoming Piece using CompletableFuture
Ashuh Nov 26, 2023
84b6bcb
Return instead of throwing exception
Ashuh Nov 26, 2023
8f16acb
Implement multiple concurrent requests per Peer
Ashuh Nov 30, 2023
52d8669
Calculate torrent progress using number of verified pieces
Ashuh Nov 30, 2023
4cd8d7a
Optimize PieceTracker
Ashuh Nov 30, 2023
648f418
Handle peer disconnect
Ashuh Nov 30, 2023
5b38f7f
Add timeout for requested blocks
Ashuh Nov 30, 2023
d835372
Stop PeerHandler if assigning block throws exception
Ashuh Nov 30, 2023
ea8158b
Update log msgs
Ashuh Nov 30, 2023
94c0706
Remove future from map upon timeout
Ashuh Nov 30, 2023
04df7c0
Use verified bytes instead of pieces to compute progress
Ashuh Nov 30, 2023
315b617
Use verified bytes to calculate ETA
Ashuh Nov 30, 2023
64b2d9b
Fix CombinedObservable not firing when all sources are removed
Ashuh Dec 1, 2023
740cc76
Handle case where idle PeerHandler gets choked
Ashuh Dec 1, 2023
22a54f2
Refactor & optimize PieceTracker & WorkDispatcher
Ashuh Dec 1, 2023
ef689e5
Add getPieceHash method
Ashuh Dec 2, 2023
e001d78
Use class name as default thread name for BackgroundTasks
Ashuh Dec 2, 2023
865dcc0
Move connection handling logic back into PeerHandler
Ashuh Dec 5, 2023
021c5c3
Refactor logic for sending 'interested' on connection success
Ashuh Dec 7, 2023
3a8365f
Move task scheduling to start() in PeerHandler
Ashuh Dec 7, 2023
43be1aa
Fix Bitfield packing
Ashuh Dec 9, 2023
ce956cd
Modify connection success handling
Ashuh Dec 9, 2023
9dd44a4
Send Bitfield on connection success
Ashuh Dec 9, 2023
6af6641
Return copies of Bitsets
Ashuh Dec 9, 2023
91d3553
Implement Request & Cancel handling
Ashuh Dec 9, 2023
2f09655
Remove empty line
Ashuh Dec 9, 2023
c4ff567
Rename method
Ashuh Dec 9, 2023
8bd8fc6
Rename method
Ashuh Dec 9, 2023
f2eaf7f
Notify peers of new pieces upon verification
Ashuh Dec 9, 2023
87210fb
Replace synchronized(this) with synchronized(pieceStateLock)
Ashuh Dec 9, 2023
51bff55
Remove unused methods
Ashuh Dec 9, 2023
ce5c589
Replace ExecutorService pool with custom ConnectionThreadPool
Ashuh Dec 9, 2023
e819a04
Rearrange code
Ashuh Dec 9, 2023
447ed99
Refactor & fix unchoking logic
Ashuh Dec 10, 2023
4084e86
Add cancel request method to PeerHandler
Ashuh Dec 10, 2023
5b346f1
Refactor methods for sending peer messages
Ashuh Dec 10, 2023
6a52aeb
Refactor DurationWindow to RateTracker
Ashuh Dec 10, 2023
97b9670
Implement upload rate tracking
Ashuh Dec 12, 2023
6ab8981
Send DHT port upon connection if DHT flag in received Handshake is set
Ashuh Dec 12, 2023
050ac19
Remove unused import
Ashuh Dec 12, 2023
291a6d6
Send Interested only if torrent is not complete
Ashuh Dec 12, 2023
09128d5
Send Not Interested when torrent is complete
Ashuh Dec 12, 2023
e8466df
Select peers to unchoke based on upload rate if torrent is complete
Ashuh Dec 12, 2023
e3fc557
Verify files before starting torrents
Ashuh Dec 15, 2023
6acefd6
Fix Event to String mapping for HTTP announcements
Ashuh Dec 15, 2023
6e47508
Announce completed when torrent download is complete
Ashuh Dec 16, 2023
d3ca5ec
Remove log msgs
Ashuh Dec 16, 2023
ce3c869
Fix infinite recursion
Ashuh Dec 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 37 additions & 20 deletions src/main/java/jtorrent/application/domain/Client.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package jtorrent.application.domain;

import java.io.IOException;
import java.lang.System.Logger;
import java.lang.System.Logger.Level;
import java.net.InetAddress;
Expand All @@ -8,11 +9,12 @@
import java.util.HashMap;
import java.util.Map;

import jtorrent.common.domain.util.BackgroundTask;
import jtorrent.common.domain.util.Sha1Hash;
import jtorrent.common.domain.util.rx.RxObservableList;
import jtorrent.dht.domain.handler.DhtClient;
import jtorrent.dht.domain.handler.DhtManager;
import jtorrent.incoming.domain.IncomingConnectionManager;
import jtorrent.incoming.domain.InboundConnectionListener;
import jtorrent.lsd.domain.handler.LocalServiceDiscoveryManager;
import jtorrent.lsd.domain.model.Announce;
import jtorrent.peer.domain.communication.PeerSocket;
Expand All @@ -22,27 +24,28 @@
import jtorrent.torrent.domain.repository.PieceRepository;
import jtorrent.torrent.domain.repository.TorrentRepository;

public class Client implements IncomingConnectionManager.Listener, LocalServiceDiscoveryManager.Listener,
TorrentHandler.Listener, DhtManager.PeerDiscoveryListener {
public class Client implements LocalServiceDiscoveryManager.Listener, TorrentHandler.Listener,
DhtManager.PeerDiscoveryListener {

private static final Logger LOGGER = System.getLogger(Client.class.getName());

private final IncomingConnectionManager incomingConnectionManager;
private final InboundConnectionListener inboundConnectionListener;
private final LocalServiceDiscoveryManager localServiceDiscoveryManager;
private final DhtClient dhtManager;
private final Map<Sha1Hash, TorrentHandler> infoHashToTorrentHandler = new HashMap<>();
private final TorrentRepository torrentRepository;
private final PieceRepository pieceRepository;
private final HandleInboundConnectionsTask handleInboundConnectionsTask = new HandleInboundConnectionsTask();

public Client(TorrentRepository torrentRepository, PieceRepository pieceRepository,
IncomingConnectionManager incomingConnectionManager,
InboundConnectionListener inboundConnectionListener,
LocalServiceDiscoveryManager localServiceDiscoveryManager, DhtClient dhtClient) {
this.torrentRepository = torrentRepository;
this.pieceRepository = pieceRepository;

this.incomingConnectionManager = incomingConnectionManager;
this.incomingConnectionManager.addListener(this);
this.incomingConnectionManager.start();
this.inboundConnectionListener = inboundConnectionListener;
this.inboundConnectionListener.start();
handleInboundConnectionsTask.start();

this.localServiceDiscoveryManager = localServiceDiscoveryManager;
this.localServiceDiscoveryManager.addListener(this);
Expand Down Expand Up @@ -71,7 +74,8 @@ public Client(TorrentRepository torrentRepository, PieceRepository pieceReposito
}

public void shutdown() {
incomingConnectionManager.stop();
inboundConnectionListener.stop();
handleInboundConnectionsTask.stop();
localServiceDiscoveryManager.stop();
dhtManager.stop();
infoHashToTorrentHandler.values().forEach(TorrentHandler::stop);
Expand Down Expand Up @@ -99,17 +103,6 @@ public void stopTorrent(Torrent torrent) {
// TODO: remove from local service discovery
}

@Override
public void onIncomingPeerConnection(PeerSocket peerSocket, Sha1Hash infoHash) {
if (!infoHashToTorrentHandler.containsKey(infoHash)) {
LOGGER.log(Level.ERROR, "No torrent found for infohash " + infoHash);
return;
}

TorrentHandler torrentHandler = infoHashToTorrentHandler.get(infoHash);
torrentHandler.handleIncomingPeerConnection(peerSocket);
}

public RxObservableList<Torrent> getTorrents() {
return torrentRepository.getTorrents();
}
Expand Down Expand Up @@ -142,4 +135,28 @@ public void onPeersDiscovered(Sha1Hash infoHash, Collection<PeerContactInfo> pee
TorrentHandler torrentHandler = infoHashToTorrentHandler.get(infoHash);
peers.forEach(torrentHandler::handleDiscoveredPeerContact);
}

private class HandleInboundConnectionsTask extends BackgroundTask {

@Override
protected void execute() throws InterruptedException {
InboundConnectionListener.InboundConnection inboundConnection =
inboundConnectionListener.waitForIncomingConnection();

Sha1Hash infoHash = inboundConnection.getInfoHash();
if (!infoHashToTorrentHandler.containsKey(infoHash)) {
LOGGER.log(Level.ERROR, "No torrent found for infohash " + infoHash);
try {
inboundConnection.reject();
} catch (IOException e) {
LOGGER.log(Level.ERROR, "Error rejecting incoming connection", e);
}
return;
}

PeerSocket peerSocket = inboundConnection.accept();
TorrentHandler torrentHandler = infoHashToTorrentHandler.get(infoHash);
torrentHandler.handleInboundPeerConnection(peerSocket);
}
}
}
6 changes: 3 additions & 3 deletions src/main/java/jtorrent/application/presentation/JTorrent.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import jtorrent.application.presentation.viewmodel.ViewModel;
import jtorrent.common.domain.Constants;
import jtorrent.dht.domain.handler.DhtClient;
import jtorrent.incoming.domain.IncomingConnectionManager;
import jtorrent.incoming.domain.InboundConnectionListener;
import jtorrent.lsd.domain.handler.LocalServiceDiscoveryManager;
import jtorrent.torrent.data.repository.FilePieceRepository;
import jtorrent.torrent.data.repository.FileTorrentRepository;
Expand All @@ -26,13 +26,13 @@ public class JTorrent extends Application {
@Override
public void init() throws Exception {
ServerSocket serverSocket = new ServerSocket(Constants.PORT);
IncomingConnectionManager incomingConnectionManager = new IncomingConnectionManager(serverSocket);
InboundConnectionListener inboundConnectionListener = new InboundConnectionListener(serverSocket);

DhtClient dhtClient = new DhtClient(Constants.PORT);

TorrentRepository repository = new FileTorrentRepository();
PieceRepository pieceRepository = new FilePieceRepository();
client = new Client(repository, pieceRepository, incomingConnectionManager, new LocalServiceDiscoveryManager(),
client = new Client(repository, pieceRepository, inboundConnectionListener, new LocalServiceDiscoveryManager(),
dhtClient);
}

Expand Down
43 changes: 13 additions & 30 deletions src/main/java/jtorrent/common/domain/model/Block.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,25 @@

public class Block {

private final int index;
private final int offset;
private final int length;

public Block(int index, int offset, int length) {
this.index = index;
this.offset = offset;
this.length = length;
}
private final int pieceIndex;
private final int blockIndex;

public int getIndex() {
return index;
public Block(int pieceIndex, int blockIndex) {
this.pieceIndex = pieceIndex;
this.blockIndex = blockIndex;
}

public int getOffset() {
return offset;
public int getPieceIndex() {
return pieceIndex;
}

public int getLength() {
return length;
public int getBlockIndex() {
return blockIndex;
}

@Override
public int hashCode() {
return Objects.hash(index, offset, length);
return Objects.hash(pieceIndex, blockIndex);
}

@Override
Expand All @@ -40,23 +34,12 @@ public boolean equals(Object o) {
return false;
}
Block block = (Block) o;
return index == block.index
&& offset == block.offset
&& length == block.length;
return pieceIndex == block.pieceIndex
&& blockIndex == block.blockIndex;
}

@Override
public String toString() {
return "Block{"
+ "index=" + index
+ ", offset=" + offset
+ ", length=" + length
+ '}';
}

public enum Status {
MISSING,
REQUESTED,
RECEIVED
return String.format("[Piece %d, Block %d]", pieceIndex, blockIndex);
}
}
7 changes: 3 additions & 4 deletions src/main/java/jtorrent/common/domain/util/BackgroundTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.lang.System.Logger;
import java.lang.System.Logger.Level;
import java.util.Optional;

public abstract class BackgroundTask implements Runnable {

Expand All @@ -13,11 +12,11 @@ public abstract class BackgroundTask implements Runnable {

protected BackgroundTask() {
thread = new Thread(this);
getThreadName().ifPresent(thread::setName);
thread.setName(getThreadName());
}

protected Optional<String> getThreadName() {
return Optional.empty();
protected String getThreadName() {
return getClass().getName();
}

@Override
Expand Down
56 changes: 0 additions & 56 deletions src/main/java/jtorrent/common/domain/util/DurationWindow.java

This file was deleted.

53 changes: 53 additions & 0 deletions src/main/java/jtorrent/common/domain/util/PeriodicTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package jtorrent.common.domain.util;

import static jtorrent.common.domain.util.ValidationUtil.requireNonNull;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public abstract class PeriodicTask implements Runnable {

private final ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> scheduledFuture;

protected PeriodicTask(ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = requireNonNull(scheduledExecutorService);
}

public void scheduleWithFixedDelay(long delay, TimeUnit timeUnit) {
scheduleWithFixedDelay(0, delay, timeUnit);
}

public void scheduleWithFixedDelay(long initialDelay, long delay, TimeUnit timeUnit) {
checkNotRunning();
scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(this, initialDelay, delay, timeUnit);
}

public void scheduleAtFixedRate(long period, TimeUnit timeUnit) {
scheduleAtFixedRate(0, period, timeUnit);
}

public void scheduleAtFixedRate(long initialDelay, long period, TimeUnit timeUnit) {
checkNotRunning();
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, initialDelay, period, timeUnit);
}

private void checkNotRunning() {
if (isRunning()) {
throw new IllegalStateException("Task already running");
}
}

public void stop() {
if (!isRunning()) {
return;
}

scheduledFuture.cancel(true);
}

private boolean isRunning() {
return scheduledFuture != null && !scheduledFuture.isCancelled();
}
}
Loading
Loading