From a5a016556a387c913750f06c6a95b8cb0565e95d Mon Sep 17 00:00:00 2001 From: jatking <53228426+Jatkingmodern@users.noreply.github.com> Date: Mon, 3 Nov 2025 19:01:10 +0530 Subject: [PATCH] Enhance stream management and cleanup in SRProxyConnect Refactor stream handling and improve cleanup process. --- SocketRocket/Internal/Proxy/SRProxyConnect.m | 216 ++++++++++++++----- 1 file changed, 164 insertions(+), 52 deletions(-) diff --git a/SocketRocket/Internal/Proxy/SRProxyConnect.m b/SocketRocket/Internal/Proxy/SRProxyConnect.m index fcb4c866..1b2ad8c3 100644 --- a/SocketRocket/Internal/Proxy/SRProxyConnect.m +++ b/SocketRocket/Internal/Proxy/SRProxyConnect.m @@ -58,6 +58,8 @@ -(instancetype)initWithURL:(NSURL *)url _writeQueue = dispatch_queue_create("com.facebook.socketrocket.proxyconnect.write", DISPATCH_QUEUE_SERIAL); _inputQueue = [NSMutableArray arrayWithCapacity:2]; + _receivedHTTPHeaders = NULL; + return self; } @@ -65,14 +67,30 @@ - (void)dealloc { // If we get deallocated before the socket open finishes - we need to cleanup everything. - [self.inputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode]; - self.inputStream.delegate = nil; - [self.inputStream close]; - self.inputStream = nil; + // Remove streams from runloop and close them + @synchronized(self) { + if (self.inputStream) { + self.inputStream.delegate = nil; + [self.inputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode]; + [self.inputStream close]; + self.inputStream = nil; + } + if (self.outputStream) { + self.outputStream.delegate = nil; + [self.outputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode]; + [self.outputStream close]; + self.outputStream = nil; + } - self.outputStream.delegate = nil; - [self.outputStream close]; - self.outputStream = nil; + // Clear input queue + [_inputQueue removeAllObjects]; + } + + // Release any pending CF object + if (_receivedHTTPHeaders) { + CFRelease(_receivedHTTPHeaders); + _receivedHTTPHeaders = NULL; + } } ///-------------------------------------- @@ -92,13 +110,16 @@ - (void)openNetworkStreamWithCompletion:(SRProxyConnectCompletion)completion - (void)_didConnect { SRDebugLog(@"_didConnect, return streams"); + if (_connectionRequiresSSL) { if (_httpProxyHost) { // Must set the real peer name before turning on SSL SRDebugLog(@"proxy set peer name to real host %@", self.url.host); - [self.outputStream setProperty:self.url.host forKey:@"_kCFStreamPropertySocketPeerName"]; + // Use CF property key for peer name if desired. The original code used a string. + [self.outputStream setProperty:self.url.host forKey:(id)kCFStreamPropertySocketPeerName]; } } + if (_receivedHTTPHeaders) { CFRelease(_receivedHTTPHeaders); _receivedHTTPHeaders = NULL; @@ -107,14 +128,22 @@ - (void)_didConnect NSInputStream *inputStream = self.inputStream; NSOutputStream *outputStream = self.outputStream; + // Clear properties before returning streams self.inputStream = nil; self.outputStream = nil; - [inputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode]; - inputStream.delegate = nil; - outputStream.delegate = nil; + if (inputStream) { + [inputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode]; + inputStream.delegate = nil; + } + if (outputStream) { + [outputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode]; + outputStream.delegate = nil; + } - _completion(nil, inputStream, outputStream); + if (_completion) { + _completion(nil, inputStream, outputStream); + } } - (void)_failWithError:(NSError *)error @@ -129,16 +158,28 @@ - (void)_failWithError:(NSError *)error _receivedHTTPHeaders = NULL; } - self.inputStream.delegate = nil; - self.outputStream.delegate = nil; + // Remove delegates and close streams safely + @synchronized(self) { + if (self.inputStream) { + self.inputStream.delegate = nil; + [self.inputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode]; + [self.inputStream close]; + self.inputStream = nil; + } + if (self.outputStream) { + self.outputStream.delegate = nil; + [self.outputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode]; + [self.outputStream close]; + self.outputStream = nil; + } - [self.inputStream removeFromRunLoop:[NSRunLoop SR_networkRunLoop] - forMode:NSDefaultRunLoopMode]; - [self.inputStream close]; - [self.outputStream close]; - self.inputStream = nil; - self.outputStream = nil; - _completion(error, nil, nil); + // Clear queued input + [_inputQueue removeAllObjects]; + } + + if (_completion) { + _completion(error, nil, nil); + } } // get proxy setting from device setting @@ -189,14 +230,14 @@ - (void)_readProxySettingWithType:(NSString *)proxyType settings:(NSDictionary * _httpProxyHost = settings[(NSString *)kCFProxyHostNameKey]; NSNumber *portValue = settings[(NSString *)kCFProxyPortNumberKey]; if (portValue) { - _httpProxyPort = [portValue intValue]; + _httpProxyPort = (uint32_t)[portValue unsignedIntValue]; } } if ([proxyType isEqualToString:(NSString *)kCFProxyTypeSOCKS]) { _socksProxyHost = settings[(NSString *)kCFProxyHostNameKey]; NSNumber *portValue = settings[(NSString *)kCFProxyPortNumberKey]; if (portValue) - _socksProxyPort = [portValue intValue]; + _socksProxyPort = (uint32_t)[portValue unsignedIntValue]; _socksProxyUsername = settings[(NSString *)kCFProxyUsernameKey]; _socksProxyPassword = settings[(NSString *)kCFProxyPasswordKey]; } @@ -239,6 +280,7 @@ - (void)_fetchPAC:(NSURL *)PACurl withProxySettings:(NSDictionary *)proxySetting NSURLSession *session = [NSURLSession sharedSession]; [[session dataTaskWithRequest:request completionHandler:^(NSData *data, NSURLResponse *response, NSError *error) { __strong typeof(wself) sself = wself; + if (!sself) return; if (!error) { NSString *script = [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding]; [sself _runPACScript:script withProxySettings:proxySettings]; @@ -284,10 +326,14 @@ - (void)_openConnection { [self _initializeStreams]; - [self.inputStream scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] - forMode:NSDefaultRunLoopMode]; - //[self.outputStream scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] - // forMode:NSDefaultRunLoopMode]; + // Schedule both streams on the SR network run loop + if (self.inputStream) { + [self.inputStream scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode]; + } + if (self.outputStream) { + [self.outputStream scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode]; + } + [self.outputStream open]; [self.inputStream open]; } @@ -389,32 +435,50 @@ - (void)_proxyDidConnect ///handles the incoming bytes and sending them to the proper processing method - (void)_processInputStream { - NSMutableData *buf = [NSMutableData dataWithCapacity:SRDefaultBufferSize()]; - uint8_t *buffer = buf.mutableBytes; - NSInteger length = [_inputStream read:buffer maxLength:SRDefaultBufferSize()]; + // Read up to default buffer size into a stack buffer to avoid mutableBytes issues + const NSInteger bufSize = SRDefaultBufferSize(); + uint8_t buffer[bufSize]; + NSInteger length = [self.inputStream read:buffer maxLength:bufSize]; if (length <= 0) { return; } - BOOL process = (_inputQueue.count == 0); - [_inputQueue addObject:[NSData dataWithBytes:buffer length:length]]; + NSData *readData = [NSData dataWithBytes:buffer length:(NSUInteger)length]; + + BOOL shouldProcess = NO; + @synchronized(self) { + shouldProcess = (_inputQueue.count == 0); + [_inputQueue addObject:readData]; + } - if (process) { + if (shouldProcess) { [self _dequeueInput]; } } // dequeue the incoming input so it is processed in order - - (void)_dequeueInput { - while (_inputQueue.count > 0) { - NSData *data = _inputQueue.firstObject; - [_inputQueue removeObjectAtIndex:0]; + while (true) { + NSData *data = nil; + @synchronized(self) { + if (_inputQueue.count == 0) { + data = nil; + } else { + data = _inputQueue.firstObject; + [_inputQueue removeObjectAtIndex:0]; + } + } + + if (!data) { + break; + } // No need to process any data further, we got the full header data. - if ([self _proxyProcessHTTPResponseWithData:data]) { + BOOL headerComplete = [self _proxyProcessHTTPResponseWithData:data]; + if (headerComplete) { + // Stop processing further queued data now; remaining data (if any) will be handled by upper layer after connect break; } } @@ -426,9 +490,10 @@ - (BOOL)_proxyProcessHTTPResponseWithData:(NSData *)data _receivedHTTPHeaders = CFHTTPMessageCreateEmpty(NULL, NO); } - CFHTTPMessageAppendBytes(_receivedHTTPHeaders, (const UInt8 *)data.bytes, data.length); + CFHTTPMessageAppendBytes(_receivedHTTPHeaders, (const UInt8 *)data.bytes, (CFIndex)data.length); if (CFHTTPMessageIsHeaderComplete(_receivedHTTPHeaders)) { - SRDebugLog(@"Finished reading headers %@", CFBridgingRelease(CFHTTPMessageCopyAllHeaderFields(_receivedHTTPHeaders))); + CFDictionaryRef headers = CFHTTPMessageCopyAllHeaderFields(_receivedHTTPHeaders); + SRDebugLog(@"Finished reading headers %@", CFBridgingRelease(headers)); [self _proxyHTTPHeadersDidFinish]; return YES; } @@ -441,14 +506,14 @@ - (void)_proxyHTTPHeadersDidFinish NSInteger responseCode = CFHTTPMessageGetResponseStatusCode(_receivedHTTPHeaders); if (responseCode >= 299) { - SRDebugLog(@"Connect to Proxy Request failed with response code %d", responseCode); - NSError *error = SRHTTPErrorWithCodeDescription(responseCode, 2132, - [NSString stringWithFormat:@"Received bad response code from proxy server: %d.", - (int)responseCode]); + SRDebugLog(@"Connect to Proxy Request failed with response code %ld", (long)responseCode); + NSError *error = SRHTTPErrorWithCodeDescription((NSInteger)responseCode, 2132, + [NSString stringWithFormat:@"Received bad response code from proxy server: %ld.", + (long)responseCode]); [self _failWithError:error]; return; } - SRDebugLog(@"proxy connect return %d, call socket connect", responseCode); + SRDebugLog(@"proxy connect return %ld, call socket connect", (long)responseCode); [self _didConnect]; } @@ -456,11 +521,12 @@ - (void)_proxyHTTPHeadersDidFinish - (void)_writeData:(NSData *)data { - const uint8_t * bytes = data.bytes; - __block NSInteger timeout = (NSInteger)(SRProxyConnectWriteTimeout * 1000000); // wait timeout before giving up + const uint8_t *bytes = data.bytes; + // Use microseconds for usleep; convert timeout to microseconds + __block NSInteger timeoutMicros = (NSInteger)(SRProxyConnectWriteTimeout * 1000000.0); // microseconds __weak typeof(self) wself = self; dispatch_async(_writeQueue, ^{ - __strong typeof(wself) sself = self; + __strong typeof(wself) sself = wself; if (!sself) { return; } @@ -468,17 +534,63 @@ - (void)_writeData:(NSData *)data if (!outStream) { return; } + while (![outStream hasSpaceAvailable]) { - usleep(100); //wait until the socket is ready - timeout -= 100; - if (timeout < 0) { + // sleep for 1000 microseconds (1ms) to reduce busy-waiting + usleep(1000); + timeoutMicros -= 1000; + if (timeoutMicros <= 0) { NSError *error = SRHTTPErrorWithCodeDescription(408, 2132, @"Proxy timeout"); [sself _failWithError:error]; + return; // ensure we break out after failure } else if (outStream.streamError != nil) { [sself _failWithError:outStream.streamError]; + return; // ensure we break out after failure + } + } + + NSInteger written = [outStream write:bytes maxLength:(NSInteger)data.length]; + if (written < 0) { + if (outStream.streamError) { + [sself _failWithError:outStream.streamError]; + } else { + NSError *error = SRHTTPErrorWithCodeDescription(500, 2132, @"Write failed"); + [sself _failWithError:error]; + } + } else if (written < (NSInteger)data.length) { + SRDebugLog(@"Partial write %ld of %lu bytes", (long)written, (unsigned long)data.length); + // For a partial write, attempt to write remaining bytes (simple loop, guarded by timeout) + const uint8_t *remainingPtr = bytes + written; + NSInteger remainingLen = (NSInteger)data.length - written; + while (remainingLen > 0) { + if (![outStream hasSpaceAvailable]) { + usleep(1000); + timeoutMicros -= 1000; + if (timeoutMicros <= 0) { + NSError *error = SRHTTPErrorWithCodeDescription(408, 2132, @"Proxy timeout (partial write)"); + [sself _failWithError:error]; + return; + } + if (outStream.streamError) { + [sself _failWithError:outStream.streamError]; + return; + } + continue; + } + NSInteger w = [outStream write:remainingPtr maxLength:remainingLen]; + if (w < 0) { + if (outStream.streamError) { + [sself _failWithError:outStream.streamError]; + } else { + NSError *error = SRHTTPErrorWithCodeDescription(500, 2132, @"Write failed (partial)"); + [sself _failWithError:error]; + } + return; + } + remainingPtr += w; + remainingLen -= w; } } - [outStream write:bytes maxLength:data.length]; }); }