28
28
import jtorrent .domain .model .dht .message .response .FindNodeResponse ;
29
29
import jtorrent .domain .model .dht .message .response .GetPeersResponse ;
30
30
import jtorrent .domain .model .dht .message .response .PingResponse ;
31
+ import jtorrent .domain .model .dht .node .Node ;
32
+ import jtorrent .domain .model .dht .node .NodeContactInfo ;
31
33
import jtorrent .domain .util .BackgroundTask ;
32
34
33
35
public class DhtSocket {
@@ -127,13 +129,13 @@ private Method getTransactionId(TransactionId transactionId) {
127
129
128
130
public interface QueryHandler {
129
131
130
- void handle (Ping ping );
132
+ void handle (Ping ping , Node node );
131
133
132
- void handle (FindNode findNode );
134
+ void handle (FindNode findNode , Node node );
133
135
134
- void handle (AnnouncePeer announcePeer );
136
+ void handle (GetPeers getPeers , Node node );
135
137
136
- void handle (GetPeers getPeers );
138
+ void handle (AnnouncePeer announcePeer , Node node );
137
139
}
138
140
139
141
private class HandleIncomingMessagesTask extends BackgroundTask {
@@ -150,8 +152,8 @@ private HandleIncomingMessagesTask(QueryHandler queryHandler) {
150
152
protected void execute () {
151
153
try {
152
154
LOGGER .log (Level .DEBUG , "[DHT] Waiting for message" );
153
- DhtMessage message = receiveMessage ();
154
- handleMessage (message );
155
+ IncomingMessage incomingMessage = receiveMessage ();
156
+ handleMessage (incomingMessage . getMessage (), incomingMessage . getAddress () );
155
157
} catch (IOException e ) {
156
158
LOGGER .log (Level .ERROR , "[DHT] Error while receiving message: {0}" , e .getMessage ());
157
159
HandleIncomingMessagesTask .this .stop ();
@@ -167,19 +169,19 @@ protected void execute() {
167
169
* @return the received message
168
170
* @throws IOException if an I/O error occurs
169
171
*/
170
- private DhtMessage receiveMessage () throws IOException {
172
+ private IncomingMessage receiveMessage () throws IOException {
171
173
byte [] buffer = new byte [1024 ];
172
174
DatagramPacket packet = new DatagramPacket (buffer , buffer .length );
173
175
socket .receive (packet );
176
+ InetSocketAddress address = new InetSocketAddress (packet .getAddress (), packet .getPort ());
174
177
DhtMessage message = dhtMessageDecoder .decode (packet .getData ());
175
- LOGGER .log (Level .DEBUG , "[DHT] Received {0}: {1}" , message .getMessageType (), message );
176
- return message ;
178
+ return new IncomingMessage (address , message );
177
179
}
178
180
179
- private void handleMessage (DhtMessage message ) {
181
+ private void handleMessage (DhtMessage message , InetSocketAddress address ) {
180
182
switch (message .getMessageType ()) {
181
183
case QUERY :
182
- handleQuery ((Query ) message );
184
+ handleQuery ((Query ) message , address );
183
185
break ;
184
186
case RESPONSE :
185
187
handleResponse ((DefinedResponse ) message );
@@ -192,21 +194,22 @@ private void handleMessage(DhtMessage message) {
192
194
}
193
195
}
194
196
195
- private void handleQuery (Query query ) {
197
+ private void handleQuery (Query query , InetSocketAddress address ) {
196
198
LOGGER .log (Level .DEBUG , "[DHT] Received {0} query: {1}" , query .getMethod (), query );
197
-
199
+ NodeContactInfo nodeContactInfo = new NodeContactInfo (query .getId (), address );
200
+ Node node = Node .seenNowWithContactInfo (nodeContactInfo );
198
201
switch (query .getMethod ()) {
199
202
case PING :
200
- queryHandler .handle ((Ping ) query );
203
+ queryHandler .handle ((Ping ) query , node );
201
204
break ;
202
205
case FIND_NODE :
203
- queryHandler .handle ((FindNode ) query );
206
+ queryHandler .handle ((FindNode ) query , node );
204
207
break ;
205
208
case ANNOUNCE_PEER :
206
- queryHandler .handle ((AnnouncePeer ) query );
209
+ queryHandler .handle ((AnnouncePeer ) query , node );
207
210
break ;
208
211
case GET_PEERS :
209
- queryHandler .handle ((GetPeers ) query );
212
+ queryHandler .handle ((GetPeers ) query , node );
210
213
break ;
211
214
default :
212
215
throw new AssertionError (String .format (FORMAT_UNKNOWN_METHOD , query .getMethod ()));
@@ -248,4 +251,23 @@ private void logNoOutstandingQueryFound(TransactionId transactionId) {
248
251
LOGGER .log (Level .ERROR , "[DHT] No outstanding query found for transaction id: {0}" , transactionId );
249
252
}
250
253
}
254
+
255
+ private static class IncomingMessage {
256
+
257
+ private final InetSocketAddress address ;
258
+ private final DhtMessage message ;
259
+
260
+ public IncomingMessage (InetSocketAddress address , DhtMessage message ) {
261
+ this .address = requireNonNull (address );
262
+ this .message = requireNonNull (message );
263
+ }
264
+
265
+ public InetSocketAddress getAddress () {
266
+ return address ;
267
+ }
268
+
269
+ public DhtMessage getMessage () {
270
+ return message ;
271
+ }
272
+ }
251
273
}
0 commit comments