Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 164 additions & 52 deletions SocketRocket/Internal/Proxy/SRProxyConnect.m
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,39 @@ -(instancetype)initWithURL:(NSURL *)url
_writeQueue = dispatch_queue_create("com.facebook.socketrocket.proxyconnect.write", DISPATCH_QUEUE_SERIAL);
_inputQueue = [NSMutableArray arrayWithCapacity:2];

_receivedHTTPHeaders = NULL;

return self;
}

- (void)dealloc
{
// If we get deallocated before the socket open finishes - we need to cleanup everything.

[self.inputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
self.inputStream.delegate = nil;
[self.inputStream close];
self.inputStream = nil;
// Remove streams from runloop and close them
@synchronized(self) {
if (self.inputStream) {
self.inputStream.delegate = nil;
[self.inputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
[self.inputStream close];
self.inputStream = nil;
}
if (self.outputStream) {
self.outputStream.delegate = nil;
[self.outputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
[self.outputStream close];
self.outputStream = nil;
}

self.outputStream.delegate = nil;
[self.outputStream close];
self.outputStream = nil;
// Clear input queue
[_inputQueue removeAllObjects];
}

// Release any pending CF object
if (_receivedHTTPHeaders) {
CFRelease(_receivedHTTPHeaders);
_receivedHTTPHeaders = NULL;
}
}

///--------------------------------------
Expand All @@ -92,13 +110,16 @@ - (void)openNetworkStreamWithCompletion:(SRProxyConnectCompletion)completion
- (void)_didConnect
{
SRDebugLog(@"_didConnect, return streams");

if (_connectionRequiresSSL) {
if (_httpProxyHost) {
// Must set the real peer name before turning on SSL
SRDebugLog(@"proxy set peer name to real host %@", self.url.host);
[self.outputStream setProperty:self.url.host forKey:@"_kCFStreamPropertySocketPeerName"];
// Use CF property key for peer name if desired. The original code used a string.
[self.outputStream setProperty:self.url.host forKey:(id)kCFStreamPropertySocketPeerName];
}
}

if (_receivedHTTPHeaders) {
CFRelease(_receivedHTTPHeaders);
_receivedHTTPHeaders = NULL;
Expand All @@ -107,14 +128,22 @@ - (void)_didConnect
NSInputStream *inputStream = self.inputStream;
NSOutputStream *outputStream = self.outputStream;

// Clear properties before returning streams
self.inputStream = nil;
self.outputStream = nil;

[inputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
inputStream.delegate = nil;
outputStream.delegate = nil;
if (inputStream) {
[inputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
inputStream.delegate = nil;
}
if (outputStream) {
[outputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
outputStream.delegate = nil;
}

_completion(nil, inputStream, outputStream);
if (_completion) {
_completion(nil, inputStream, outputStream);
}
}

- (void)_failWithError:(NSError *)error
Expand All @@ -129,16 +158,28 @@ - (void)_failWithError:(NSError *)error
_receivedHTTPHeaders = NULL;
}

self.inputStream.delegate = nil;
self.outputStream.delegate = nil;
// Remove delegates and close streams safely
@synchronized(self) {
if (self.inputStream) {
self.inputStream.delegate = nil;
[self.inputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
[self.inputStream close];
self.inputStream = nil;
}
if (self.outputStream) {
self.outputStream.delegate = nil;
[self.outputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
[self.outputStream close];
self.outputStream = nil;
}

[self.inputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop]
forMode:NSDefaultRunLoopMode];
[self.inputStream close];
[self.outputStream close];
self.inputStream = nil;
self.outputStream = nil;
_completion(error, nil, nil);
// Clear queued input
[_inputQueue removeAllObjects];
}

if (_completion) {
_completion(error, nil, nil);
}
}

// get proxy setting from device setting
Expand Down Expand Up @@ -189,14 +230,14 @@ - (void)_readProxySettingWithType:(NSString *)proxyType settings:(NSDictionary *
_httpProxyHost = settings[(NSString *)kCFProxyHostNameKey];
NSNumber *portValue = settings[(NSString *)kCFProxyPortNumberKey];
if (portValue) {
_httpProxyPort = [portValue intValue];
_httpProxyPort = (uint32_t)[portValue unsignedIntValue];
}
}
if ([proxyType isEqualToString:(NSString *)kCFProxyTypeSOCKS]) {
_socksProxyHost = settings[(NSString *)kCFProxyHostNameKey];
NSNumber *portValue = settings[(NSString *)kCFProxyPortNumberKey];
if (portValue)
_socksProxyPort = [portValue intValue];
_socksProxyPort = (uint32_t)[portValue unsignedIntValue];
_socksProxyUsername = settings[(NSString *)kCFProxyUsernameKey];
_socksProxyPassword = settings[(NSString *)kCFProxyPasswordKey];
}
Expand Down Expand Up @@ -239,6 +280,7 @@ - (void)_fetchPAC:(NSURL *)PACurl withProxySettings:(NSDictionary *)proxySetting
NSURLSession *session = [NSURLSession sharedSession];
[[session dataTaskWithRequest:request completionHandler:^(NSData *data, NSURLResponse *response, NSError *error) {
__strong typeof(wself) sself = wself;
if (!sself) return;
if (!error) {
NSString *script = [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding];
[sself _runPACScript:script withProxySettings:proxySettings];
Expand Down Expand Up @@ -284,10 +326,14 @@ - (void)_openConnection
{
[self _initializeStreams];

[self.inputStream scheduleInRunLoop:[NSRunLoop SR_networkRunLoop]
forMode:NSDefaultRunLoopMode];
//[self.outputStream scheduleInRunLoop:[NSRunLoop SR_networkRunLoop]
// forMode:NSDefaultRunLoopMode];
// Schedule both streams on the SR network run loop
if (self.inputStream) {
[self.inputStream scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
}
if (self.outputStream) {
[self.outputStream scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
}

[self.outputStream open];
[self.inputStream open];
}
Expand Down Expand Up @@ -389,32 +435,50 @@ - (void)_proxyDidConnect
///handles the incoming bytes and sending them to the proper processing method
- (void)_processInputStream
{
NSMutableData *buf = [NSMutableData dataWithCapacity:SRDefaultBufferSize()];
uint8_t *buffer = buf.mutableBytes;
NSInteger length = [_inputStream read:buffer maxLength:SRDefaultBufferSize()];
// Read up to default buffer size into a stack buffer to avoid mutableBytes issues
const NSInteger bufSize = SRDefaultBufferSize();
uint8_t buffer[bufSize];
NSInteger length = [self.inputStream read:buffer maxLength:bufSize];

if (length <= 0) {
return;
}

BOOL process = (_inputQueue.count == 0);
[_inputQueue addObject:[NSData dataWithBytes:buffer length:length]];
NSData *readData = [NSData dataWithBytes:buffer length:(NSUInteger)length];

BOOL shouldProcess = NO;
@synchronized(self) {
shouldProcess = (_inputQueue.count == 0);
[_inputQueue addObject:readData];
}

if (process) {
if (shouldProcess) {
[self _dequeueInput];
}
}

// dequeue the incoming input so it is processed in order

- (void)_dequeueInput
{
while (_inputQueue.count > 0) {
NSData *data = _inputQueue.firstObject;
[_inputQueue removeObjectAtIndex:0];
while (true) {
NSData *data = nil;
@synchronized(self) {
if (_inputQueue.count == 0) {
data = nil;
} else {
data = _inputQueue.firstObject;
[_inputQueue removeObjectAtIndex:0];
}
}

if (!data) {
break;
}

// No need to process any data further, we got the full header data.
if ([self _proxyProcessHTTPResponseWithData:data]) {
BOOL headerComplete = [self _proxyProcessHTTPResponseWithData:data];
if (headerComplete) {
// Stop processing further queued data now; remaining data (if any) will be handled by upper layer after connect
break;
}
}
Expand All @@ -426,9 +490,10 @@ - (BOOL)_proxyProcessHTTPResponseWithData:(NSData *)data
_receivedHTTPHeaders = CFHTTPMessageCreateEmpty(NULL, NO);
}

CFHTTPMessageAppendBytes(_receivedHTTPHeaders, (const UInt8 *)data.bytes, data.length);
CFHTTPMessageAppendBytes(_receivedHTTPHeaders, (const UInt8 *)data.bytes, (CFIndex)data.length);
if (CFHTTPMessageIsHeaderComplete(_receivedHTTPHeaders)) {
SRDebugLog(@"Finished reading headers %@", CFBridgingRelease(CFHTTPMessageCopyAllHeaderFields(_receivedHTTPHeaders)));
CFDictionaryRef headers = CFHTTPMessageCopyAllHeaderFields(_receivedHTTPHeaders);
SRDebugLog(@"Finished reading headers %@", CFBridgingRelease(headers));
[self _proxyHTTPHeadersDidFinish];
return YES;
}
Expand All @@ -441,44 +506,91 @@ - (void)_proxyHTTPHeadersDidFinish
NSInteger responseCode = CFHTTPMessageGetResponseStatusCode(_receivedHTTPHeaders);

if (responseCode >= 299) {
SRDebugLog(@"Connect to Proxy Request failed with response code %d", responseCode);
NSError *error = SRHTTPErrorWithCodeDescription(responseCode, 2132,
[NSString stringWithFormat:@"Received bad response code from proxy server: %d.",
(int)responseCode]);
SRDebugLog(@"Connect to Proxy Request failed with response code %ld", (long)responseCode);
NSError *error = SRHTTPErrorWithCodeDescription((NSInteger)responseCode, 2132,
[NSString stringWithFormat:@"Received bad response code from proxy server: %ld.",
(long)responseCode]);
[self _failWithError:error];
return;
}
SRDebugLog(@"proxy connect return %d, call socket connect", responseCode);
SRDebugLog(@"proxy connect return %ld, call socket connect", (long)responseCode);
[self _didConnect];
}

static NSTimeInterval const SRProxyConnectWriteTimeout = 5.0;

- (void)_writeData:(NSData *)data
{
const uint8_t * bytes = data.bytes;
__block NSInteger timeout = (NSInteger)(SRProxyConnectWriteTimeout * 1000000); // wait timeout before giving up
const uint8_t *bytes = data.bytes;
// Use microseconds for usleep; convert timeout to microseconds
__block NSInteger timeoutMicros = (NSInteger)(SRProxyConnectWriteTimeout * 1000000.0); // microseconds
__weak typeof(self) wself = self;
dispatch_async(_writeQueue, ^{
__strong typeof(wself) sself = self;
__strong typeof(wself) sself = wself;
if (!sself) {
return;
}
NSOutputStream *outStream = sself.outputStream;
if (!outStream) {
return;
}

while (![outStream hasSpaceAvailable]) {
usleep(100); //wait until the socket is ready
timeout -= 100;
if (timeout < 0) {
// sleep for 1000 microseconds (1ms) to reduce busy-waiting
usleep(1000);
timeoutMicros -= 1000;
if (timeoutMicros <= 0) {
NSError *error = SRHTTPErrorWithCodeDescription(408, 2132, @"Proxy timeout");
[sself _failWithError:error];
return; // ensure we break out after failure
} else if (outStream.streamError != nil) {
[sself _failWithError:outStream.streamError];
return; // ensure we break out after failure
}
}

NSInteger written = [outStream write:bytes maxLength:(NSInteger)data.length];
if (written < 0) {
if (outStream.streamError) {
[sself _failWithError:outStream.streamError];
} else {
NSError *error = SRHTTPErrorWithCodeDescription(500, 2132, @"Write failed");
[sself _failWithError:error];
}
} else if (written < (NSInteger)data.length) {
SRDebugLog(@"Partial write %ld of %lu bytes", (long)written, (unsigned long)data.length);
// For a partial write, attempt to write remaining bytes (simple loop, guarded by timeout)
const uint8_t *remainingPtr = bytes + written;
NSInteger remainingLen = (NSInteger)data.length - written;
while (remainingLen > 0) {
if (![outStream hasSpaceAvailable]) {
usleep(1000);
timeoutMicros -= 1000;
if (timeoutMicros <= 0) {
NSError *error = SRHTTPErrorWithCodeDescription(408, 2132, @"Proxy timeout (partial write)");
[sself _failWithError:error];
return;
}
if (outStream.streamError) {
[sself _failWithError:outStream.streamError];
return;
}
continue;
}
NSInteger w = [outStream write:remainingPtr maxLength:remainingLen];
if (w < 0) {
if (outStream.streamError) {
[sself _failWithError:outStream.streamError];
} else {
NSError *error = SRHTTPErrorWithCodeDescription(500, 2132, @"Write failed (partial)");
[sself _failWithError:error];
}
return;
}
remainingPtr += w;
remainingLen -= w;
}
}
[outStream write:bytes maxLength:data.length];
});
}

Expand Down